新闻中心

PRESS CENTER 纵横智控
你的位置: 首页 新闻 技术应用
纵横智控

EG系列网关网络节点-MQTT订阅-发布

2025-02-24 11:00:47 阅读: 发布人:纵横智控

原文:MQTT 订阅/发布

概述

EG8200系列边缘计算网关支持MQTT通信,根据功能分为两个节点,一个是MQTT发布节点, 用于向指定的Topic发送消息;一个是MQTT订阅节点,用于接收指定Topic的消息。

两个节点都可以创建新的或者使用已创建的mqtt-Broker配置节点。配置节点的功能前面讲过,此处mqtt-Broker配置节点指的是和服务器的MQTT连接,一个配置节点表示一个MQTT连接,网关支持连接多个MQTT服务器。如果网关往一个MQTT服务器上报数据,MQTT发布MQTT订阅使用同一个配置即可。如果网关往多个MQTT服务器上报数据,MQTT发布MQTT订阅需要使用不同的配置。

MQTT协议中文说明.pdf

输入

输入(MQTT发布)
msg.payload

#要发布消息的有效数据,通常为Json字符串

msg.topic

#要发布的MQTT主题

msg.qos

#QOS消息质量等级: 0-最多一次 1-最少一次 2-只有一次

msg.retain

#设置服务器是否保留消息,建议设置为false


MQTT5属性此处不讲解

输出

输出(MQTT订阅)
msg.payload

#消息的有效数据,通常为Json字符串

msg.topic

#消息来自哪个MQTT主题

msg.qos

#QOS消息质量等级: 0-最多一次 1-最少一次 2-只有一次

msg.retain

#值为true时,消息可能是之前的消息


MQTT5属性此处不讲解

功能

1. mqtt-broker 配置项

服务端:MQTT服务器的IP或者域名

端口:MQTT服务器的端口,通常为1883

自动重连:连接断开后自动重连,建议选择

使用TLS:根据服务器要求是否使用加密

协议版本:支持MQTT 3.1.1和MQTT 5

客户端ID:根据服务器要求填写

Keepalive:根据服务器要求填写

Session:是否使用新的会话,强烈建议勾选


用户名:根据服务器要求填写

密码:根据服务器要求填写

#注意:考虑安全因素,用户名和密码在流程导出时不会携带,重新导入流程时需要手动录入


Birth Message:按需配置

Close Message:按需配置

Will Message:按需配置


2. mqtt 订阅 配置项

服务端:选择要使用的MQTT连接

策略:订阅一个主题需手动录入,动态订阅参考动态配置

Qos:消息质量等级


3. mqtt 发布 配置项

服务端:选择要使用的MQTT连接

主题:发布的Topic,如果为空,则可以通过msg.topic设置

Qos:消息质量等级,如果为空,则可以通过msg.qos设置

保留:是否保留消息,如果为空,则可以通过msg.retain设置


4. 动态配置

有些特定场景下,MQTT的连接信息是需要动态配置的。比如:客户端ID使用的是网关的SN,批量生产时,如果采用默认配置的方法,导入的都是模板的SN,导入后还需要依次修改,这种量产效率很低。动态配置支持通过msg传入消息携带参数来修改MQTT的连接信息,从而实现量产效率大大提升。

又比如前面讲到的需要动态订阅Topic,MQTT订阅节点会多一个输入口,支持以下配置项:

msg.action

#要执行的操作:支持connect/disconnect/subscribe/unsubscribe

msg.broker

#对于操作connect,此属性携带的参数将会覆盖任何已配置的参数

msg.broker.url

msg.broker.port

msg.broker.cliendid

msg.broker.username

msg.broker.password

msg.broker.force

#如果连接已经存在时设置msg.broker属性,则会记录一个错误.除非设置了force属性才会断开旧连接创建新连接

msg.topic

#对于操作subscribe/unsubscribe,此属性用来设置Topic.类型可以是字符串或者字符串数字

示例

1.MQTT订阅发布消息

我司提供MQTT测试服务器

MQTT Broker:139.129.229.113:1873

clientid:自定义

username:网关SN

password:网关密码

1.MQTT订阅发布消息

[{"id":"f4a210260d18ff83","type":"mqtt out","z":"6ca1ff8bb99e320a","name":"","topic":"/test","qos":"0","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"31b45d2ad62272a2","x":470,"y":200,"wires":[]},{"id":"b60640e8007e6958","type":"inject","z":"6ca1ff8bb99e320a","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":310,"y":200,"wires":[["f4a210260d18ff83"]]},{"id":"ccda781161e3dd59","type":"mqtt in","z":"6ca1ff8bb99e320a","name":"","topic":"/test","qos":"0","datatype":"auto-detect","broker":"31b45d2ad62272a2","nl":false,"rap":true,"rh":0,"inputs":0,"x":310,"y":280,"wires":[["d145f447d20156a7"]]},{"id":"d145f447d20156a7","type":"debug","z":"6ca1ff8bb99e320a","name":"调试 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":470,"y":280,"wires":[]},{"id":"31b45d2ad62272a2","type":"mqtt-broker","name":"","broker":"139.129.229.113","port":"1873","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]


2. MQTT连接配置

这里以连接我们测试服务器进行一个数据收发的配置操作演示:

2. MQTT连接配置


[{"id":"f4a210260d18ff83","type":"mqtt out","z":"6ca1ff8bb99e320a","name":"","topic":"/test","qos":"0","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"31b45d2ad62272a2","x":470,"y":200,"wires":[]},{"id":"b60640e8007e6958","type":"inject","z":"6ca1ff8bb99e320a","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":310,"y":200,"wires":[["f4a210260d18ff83"]]},{"id":"ccda781161e3dd59","type":"mqtt in","z":"6ca1ff8bb99e320a","name":"","topic":"/test","qos":"0","datatype":"auto-detect","broker":"31b45d2ad62272a2","nl":false,"rap":true,"rh":0,"inputs":0,"x":310,"y":280,"wires":[["d145f447d20156a7"]]},{"id":"d145f447d20156a7","type":"debug","z":"6ca1ff8bb99e320a","name":"调试 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":470,"y":280,"wires":[]},{"id":"31b45d2ad62272a2","type":"mqtt-broker","name":"","broker":"139.129.229.113","port":"1873","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]

3.动态配置

以下示例是一个动态配置的流程,实现的功能为:通过系统信息节点获取到网关的SN,以SN为用户名和客户端ID去连接MQTT服务器,使用状态变化节点来捕获MQTT连接信息:如果连接失败,则会以1秒的频率不断重连;如果连接成功,则动态订阅Topic,示例中动态订阅了/test1 /test2 /test3 ,添加3个注入节点,分别往/test1 /test2 /test33个Topic发送消息,观察是否能正确收到数据:

3.动态配置

热门产品