新闻中心
PRESS CENTER原文:MQTT 订阅/发布
EG8200系列边缘计算网关支持MQTT通信,根据功能分为两个节点,一个是MQTT发布节点, 用于向指定的Topic发送消息;一个是MQTT订阅节点,用于接收指定Topic的消息。
两个节点都可以创建新的或者使用已创建的mqtt-Broker配置节点。配置节点的功能前面讲过,此处mqtt-Broker配置节点指的是和服务器的MQTT连接,一个配置节点表示一个MQTT连接,网关支持连接多个MQTT服务器。如果网关往一个MQTT服务器上报数据,MQTT发布MQTT订阅使用同一个配置即可。如果网关往多个MQTT服务器上报数据,MQTT发布MQTT订阅需要使用不同的配置。
输入(MQTT发布) |
msg.payload #要发布消息的有效数据,通常为Json字符串 msg.topic #要发布的MQTT主题 msg.qos #QOS消息质量等级: 0-最多一次 1-最少一次 2-只有一次 msg.retain #设置服务器是否保留消息,建议设置为false MQTT5属性此处不讲解 |
输出(MQTT订阅) |
msg.payload #消息的有效数据,通常为Json字符串 msg.topic #消息来自哪个MQTT主题 msg.qos #QOS消息质量等级: 0-最多一次 1-最少一次 2-只有一次 msg.retain #值为true时,消息可能是之前的消息 MQTT5属性此处不讲解 |
1. mqtt-broker 配置项
服务端:MQTT服务器的IP或者域名 端口:MQTT服务器的端口,通常为1883 自动重连:连接断开后自动重连,建议选择 使用TLS:根据服务器要求是否使用加密 协议版本:支持MQTT 3.1.1和MQTT 5 客户端ID:根据服务器要求填写 Keepalive:根据服务器要求填写 Session:是否使用新的会话,强烈建议勾选 用户名:根据服务器要求填写 密码:根据服务器要求填写 #注意:考虑安全因素,用户名和密码在流程导出时不会携带,重新导入流程时需要手动录入 Birth Message:按需配置 Close Message:按需配置 Will Message:按需配置 |
2. mqtt 订阅 配置项
服务端:选择要使用的MQTT连接 策略:订阅一个主题需手动录入,动态订阅参考动态配置 Qos:消息质量等级 |
3. mqtt 发布 配置项
服务端:选择要使用的MQTT连接 主题:发布的Topic,如果为空,则可以通过msg.topic设置 Qos:消息质量等级,如果为空,则可以通过msg.qos设置 保留:是否保留消息,如果为空,则可以通过msg.retain设置 |
4. 动态配置
有些特定场景下,MQTT的连接信息是需要动态配置的。比如:客户端ID使用的是网关的SN,批量生产时,如果采用默认配置的方法,导入的都是模板的SN,导入后还需要依次修改,这种量产效率很低。动态配置支持通过msg传入消息携带参数来修改MQTT的连接信息,从而实现量产效率大大提升。
又比如前面讲到的需要动态订阅Topic,MQTT订阅节点会多一个输入口,支持以下配置项:
msg.action #要执行的操作:支持connect/disconnect/subscribe/unsubscribe msg.broker #对于操作connect,此属性携带的参数将会覆盖任何已配置的参数 msg.broker.url msg.broker.port msg.broker.cliendid msg.broker.username msg.broker.password msg.broker.force #如果连接已经存在时设置msg.broker属性,则会记录一个错误.除非设置了force属性才会断开旧连接创建新连接 msg.topic #对于操作subscribe/unsubscribe,此属性用来设置Topic.类型可以是字符串或者字符串数字 |
1.MQTT订阅发布消息
我司提供MQTT测试服务器 |
MQTT Broker:139.129.229.113:1873 clientid:自定义 username:网关SN password:网关密码 |
[{"id":"f4a210260d18ff83","type":"mqtt out","z":"6ca1ff8bb99e320a","name":"","topic":"/test","qos":"0","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"31b45d2ad62272a2","x":470,"y":200,"wires":[]},{"id":"b60640e8007e6958","type":"inject","z":"6ca1ff8bb99e320a","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":310,"y":200,"wires":[["f4a210260d18ff83"]]},{"id":"ccda781161e3dd59","type":"mqtt in","z":"6ca1ff8bb99e320a","name":"","topic":"/test","qos":"0","datatype":"auto-detect","broker":"31b45d2ad62272a2","nl":false,"rap":true,"rh":0,"inputs":0,"x":310,"y":280,"wires":[["d145f447d20156a7"]]},{"id":"d145f447d20156a7","type":"debug","z":"6ca1ff8bb99e320a","name":"调试 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":470,"y":280,"wires":[]},{"id":"31b45d2ad62272a2","type":"mqtt-broker","name":"","broker":"139.129.229.113","port":"1873","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}] |
2. MQTT连接配置
这里以连接我们测试服务器进行一个数据收发的配置操作演示:
[{"id":"f4a210260d18ff83","type":"mqtt out","z":"6ca1ff8bb99e320a","name":"","topic":"/test","qos":"0","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"31b45d2ad62272a2","x":470,"y":200,"wires":[]},{"id":"b60640e8007e6958","type":"inject","z":"6ca1ff8bb99e320a","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":310,"y":200,"wires":[["f4a210260d18ff83"]]},{"id":"ccda781161e3dd59","type":"mqtt in","z":"6ca1ff8bb99e320a","name":"","topic":"/test","qos":"0","datatype":"auto-detect","broker":"31b45d2ad62272a2","nl":false,"rap":true,"rh":0,"inputs":0,"x":310,"y":280,"wires":[["d145f447d20156a7"]]},{"id":"d145f447d20156a7","type":"debug","z":"6ca1ff8bb99e320a","name":"调试 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":470,"y":280,"wires":[]},{"id":"31b45d2ad62272a2","type":"mqtt-broker","name":"","broker":"139.129.229.113","port":"1873","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}] |
3.动态配置
以下示例是一个动态配置的流程,实现的功能为:通过系统信息节点获取到网关的SN,以SN为用户名和客户端ID去连接MQTT服务器,使用状态变化节点来捕获MQTT连接信息:如果连接失败,则会以1秒的频率不断重连;如果连接成功,则动态订阅Topic,示例中动态订阅了/test1 /test2 /test3 ,添加3个注入节点,分别往/test1 /test2 /test33个Topic发送消息,观察是否能正确收到数据: