mqtt_client
mqtt_client包提供了一个ROS节点或ROS 2组件节点,使连接的基于ROS的设备或机器人能够通过MQTT协议使用MQTT代理交换ROS消息。这适用于任意ROS消息类型。mqtt_client还可以与运行在非ROS设备上的MQTT客户端交换原始消息。
[!重要]
本仓库由亚琛工业大学汽车工程研究所(ika)开源维护。
V2X通信是我们车辆智能与自动驾驶领域众多研究主题之一。
如果您想了解更多关于我们如何支持您的自动驾驶或机器人项目,欢迎联系我们!
:email: opensource@ika.rwth-aachen.de
安装
mqtt_client包作为官方ROS / ROS 2包发布,可以通过包管理器轻松安装。
sudo apt update
sudo apt install ros-$ROS_DISTRO-mqtt-client
如果您想从源代码安装mqtt_client,只需将此仓库克隆到您的ROS工作空间中。所有在ROS package.xml
中列出的依赖项都可以使用rosdep安装。
# mqtt_client$
rosdep install -r --ignore-src --from-paths .
# ROS
# workspace$
catkin build -DCMAKE_BUILD_TYPE=Release mqtt_client
# ROS 2
# workspace$
colcon build --packages-up-to mqtt_client --cmake-args -DCMAKE_BUILD_TYPE=Release
docker-ros
mqtt_client也可以作为Docker镜像使用,通过docker-ros容器化。
# ROS
docker run --rm ghcr.io/ika-rwth-aachen/mqtt_client:ros
# ROS 2
docker run --rm ghcr.io/ika-rwth-aachen/mqtt_client:ros2
使用
mqtt_client可以轻松集成到现有的基于ROS的系统中。以下首先提供了在单台机器上测试mqtt_client的快速入门指南。然后,在更复杂的应用中如何启动和配置它的更多细节将被呈现。
快速入门
按照以下步骤快速启动一个工作的mqtt_client,它通过MQTT代理将ROS消息发送给自己。
演示代理
假设MQTT代理(如Mosquitto)正在localhost:1883
上运行。
对于此演示,您可以使用Docker轻松启动带有默认配置的Mosquitto。
docker run --rm --network host --name mosquitto eclipse-mosquitto
有关设置自己的代理的更高级说明,请查看我们在Docker中运行启用了身份验证和加密的MQTT代理的说明这里。
演示配置
mqtt_client最好使用ROS参数yaml文件进行配置。下面显示的配置(另请参见params.yaml
/ params.ros2.yaml
)允许如下交换消息:
- 在本地ROS主题
/ping/ros
上接收的ROS消息被发送到代理的MQTT主题pingpong/ros
上; - 从代理的MQTT主题
pingpong/ros
接收的MQTT消息在本地ROS主题/pong/ros
上发布; - 在本地ROS主题
/ping/primitive
上接收的原始ROS消息作为原始(字符串)消息发送到代理的MQTT主题pingpong/primitive
上; - 从代理的MQTT主题
pingpong/primitive
接收的MQTT消息作为原始ROS消息在本地ROS主题/pong/primitive
上发布。
broker:
host: localhost
port: 1883
bridge:
ros2mqtt:
- ros_topic: /ping/ros
mqtt_topic: pingpong/ros
- ros_topic: /ping/primitive
mqtt_topic: pingpong/primitive
primitive: true
mqtt2ros:
- mqtt_topic: pingpong/ros
ros_topic: /pong/ros
- mqtt_topic: pingpong/primitive
ros_topic: /pong/primitive
primitive: true
演示客户端启动
构建ROS工作空间后,使用预配置的演示参数通过roslaunch启动mqtt_client节点,应该会产生以下输出。
# ROS
roslaunch mqtt_client standalone.launch
# ROS 2
ros2 launch mqtt_client standalone.launch.ros2.xml
[ WARN] [1665575657.358869079]: 参数'broker/tls/enabled'未设置,默认为'0'
[ WARN] [1665575657.359798329]: 参数'client/id'未设置,默认为''
[ WARN] [1665575657.359810889]: 客户端ID为空时无法启用客户端缓冲
[ WARN] [1665575657.360300703]: 参数'client/clean_session'未设置,默认为'1'
[ WARN] [1665575657.360576344]: 参数'client/keep_alive_interval'未设置,默认为'60.000000'
[ WARN] [1665575657.360847295]: 参数'client/max_inflight'未设置,默认为'65535'
[ INFO] [1665575657.361281461]: 正在将ROS主题'/ping/ros'桥接到MQTT主题'pingpong/ros' [ INFO] [1665575657.361303380]: 正在将原始ROS主题'/ping/primitive'桥接到MQTT主题'pingpong/primitive' [ INFO] [1665575657.361352809]: 正在将MQTT主题'pingpong/ros'桥接到ROS主题'/pong/ros' [ INFO] [1665575657.361370558]: 正在将MQTT主题'pingpong/primitive'桥接到原始ROS主题'/pong/primitive' [ INFO] [1665575657.362153083]: 正在连接代理'tcp://localhost:1883'... [ INFO] [1665575657.462622065]: 已连接到代理'tcp://localhost:1883'
请注意,*mqtt_client*成功连接到代理并回显了正在桥接的ROS/MQTT主题。为了测试*mqtt_client*与自身及其他MQTT客户端之间的通信,请打开五个新终端。
为了测试*mqtt_clients*之间的通信,在ROS主题`/ping/ros`上发布任何ROS消息,并等待ROS主题`/pong/ros`上的响应。
```bash
# 第1个终端:向/ping发布ROS消息
# ROS
rostopic pub -r 1 /ping/ros std_msgs/String "Hello MQTT"
# ROS 2
ros2 topic pub /ping/ros std_msgs/msg/String "{data: \"Hello MQTT\"}"
# 第2个终端:监听/pong上的ROS消息
# ROS
rostopic echo /pong/ros
# ROS 2
ros2 topic echo /pong/ros
为了测试mqtt_client与其他MQTT客户端之间的通信,在ROS主题/ping/primitive
上发布原始ROS消息,直接在MQTT主题pingpong/primitive
上发布原始MQTT消息,并等待ROS主题/pong/primitive
上的响应。请注意,您需要使用不同的配置文件重新启动ROS 2的mqtt_client。
# ROS 2
# mqtt_client$
ros2 launch mqtt_client standalone.launch.ros2.xml params_file:=$(ros2 pkg prefix mqtt_client)/share/mqtt_client/config/params.ros2.primitive.yaml
# 第3个终端:向/ping/primitive发布原始ROS消息
# ROS
rostopic pub -r 1 /ping/primitive std_msgs/Int32 42
# ROS2
ros2 topic pub /ping/primitive std_msgs/msg/Int32 "{data: 42}"
# 第4个终端:监听/pong/primitive上的原始ROS消息
# ROS
rostopic echo /pong/primitive
# ROS2
ros2 topic echo /pong/primitive
# 第5个终端:使用mosquitto_pub直接向pingpong/primitive发布原始MQTT消息
docker run --rm --network host eclipse-mosquitto mosquitto_pub -h localhost -t "pingpong/primitive" --repeat 20 --repeat-delay 1 -m 69
如果一切正常,第二个终端应以1Hz的频率打印消息,而第四个终端应以1Hz的频率打印两个不同的消息。
启动
您可以使用以下命令启动mqtt_client节点:
# ROS
roslaunch mqtt_client standalone.launch
# ROS 2
ros2 launch mqtt_client standalone.launch.ros2.xml
这将自动加载提供的演示params.yaml
/ params.ros2.yaml
。如果您想加载自定义配置文件,只需传递params_file
。
# ROS
roslaunch mqtt_client standalone.launch params_file:="</PATH/TO/PARAMS.YAML>"
# ROS 2
ros2 launch mqtt_client standalone.launch.ros2.xml params_file:="</PATH/TO/PARAMS.YAML>"
为了充分利用mqtt_client作为ROS节点/ROS 2组件的优势,将节点/组件加载到您自己的节点管理器/组件容器中。
配置
以下列出了mqtt_client支持的所有可用ROS参数及其默认值(在[]
中)。
代理参数
broker:
host: # [localhost] 运行MQTT代理的机器的IP地址或主机名
port: # [1883] MQTT代理监听的端口
user: # 用于向代理进行身份验证的用户名(如果为空,将尝试匿名连接)
pass: # 用于向代理进行身份验证的密码
tls:
enabled: # [false] 是否通过SSL/TLS连接
ca_certificate: # [/etc/ssl/certs/ca-certificates.crt] 客户端信任的CA证书文件(相对于ROS_HOME)
客户端参数
client:
id: # 用于标识客户端的唯一ID字符串(代理可能允许空ID并自动生成一个)
buffer:
size: # [0] 未连接到代理时桥接缓冲的最大消息数(仅当客户端ID不为空时可用)
directory: # [buffer] 未连接到代理时用于缓冲消息的目录(相对于ROS_HOME)
last_will:
topic: # 用于此客户端遗嘱消息的主题(如果未指定,则无遗嘱)
message: # [offline] 遗嘱消息
qos: # [0] 遗嘱消息的QoS值
retained: # [false] 是否保留遗嘱消息
clean_session: # [true] 是否为此客户端使用清理会话
keep_alive_interval: # [60.0] 保活间隔(秒)
max_inflight: # [65535] 最大同时发送的消息数
tls:
certificate: # 客户端证书文件(仅当代理要求客户端证书时需要;相对于ROS_HOME)
key: # 客户端私钥文件(相对于ROS_HOME)
password: # 客户端私钥密码
version: # TLS版本(https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305)
verify: # 验证客户端是否应进行连接后检查
alpn_protos: # ALPN协议列表(https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_alpn_protos.html)
桥接参数
ROS
bridge:
ros2mqtt: # 指定将哪些ROS主题映射到哪些MQTT主题的数组
- ros_topic: # 其消息被转换为MQTT消息的ROS主题
mqtt_topic: # 相应的ROS消息被发送到代理的MQTT主题
primitive: # [false] 是否作为原始消息发布
inject_timestamp: # [false] 是否在ROS2MQTT负载中附加时间戳(用于接收端的延迟计算)
advanced:
ros:
queue_size: # [1] ROS订阅者队列大小
mqtt:
qos: # [0] MQTT QoS值
retained: # [false] 是否保留MQTT消息
mqtt2ros: # 指定将哪些MQTT主题映射到哪些ROS主题的数组
- mqtt_topic: # 从代理接收消息的MQTT主题
ros_topic: # 相应的MQTT消息被发布的ROS主题
primitive: # [false] 是否作为原始消息发布(如果来自非ROS MQTT客户端)
advanced:
mqtt:
qos: # [0] MQTT QoS值
ros:
queue_size: # [1] ROS发布者队列大小
latched: # [false] 是否锁存ROS消息
##### ROS 2
```yaml
bridge:
ros2mqtt: # 指定将哪些ROS主题映射到哪些MQTT主题的对象
ros_topics: # 指定要桥接的ROS主题的数组
- {{ ros_topic_name }} # 应该被桥接的ROS主题,对应YAML中的子对象
{{ ros_topic_name }}:
mqtt_topic: # 相应的ROS消息发送到代理的MQTT主题
primitive: # [false] 是否作为原始消息发布
ros_type: # [*空*] 如果设置,将使用提供的ROS消息类型。如果为空,则通过发布者自动推断类型
inject_timestamp: # [false] 是否在ROS2MQTT负载中附加时间戳(用于接收端计算延迟)
advanced:
ros:
queue_size: # [1] ROS订阅者队列大小
qos:
reliability: # [auto] "auto"、"system_default"、"reliable"、"best_effort"之一。如果为auto,则通过发布者自动确定QoS
durability: # [auto] "auto"、"system_default"、"volatile"、"transient_local"之一。如果为auto,则通过发布者自动确定QoS
mqtt:
qos: # [0] MQTT QoS值
retained: # [false] 是否保留MQTT消息
mqtt2ros: # 指定将哪些MQTT主题映射到哪些ROS主题的对象
mqtt_topics: # 指定要桥接的ROS主题的数组
- {{ mqtt_topic_name }} # 应该被桥接的MQTT主题,对应YAML中的子对象
{{ mqtt_topic_name }}:
ros_topic: # 相应的MQTT消息发布的ROS主题
ros_type: # [*空*] 如果设置,将使用提供的ROS消息类型。如果为空,则通过MQTT消息自动推断类型
primitive: # [false] 是否作为原始消息发布(如果来自非ROS MQTT客户端)
advanced:
mqtt:
qos: # [0] MQTT QoS值
ros:
queue_size: # [1] ROS发布者队列大小
latched: # [false] 是否锁存ROS消息
qos:
reliability: # [system_default] "system_default"、"reliable"、"best_effort"之一
durability: # [system_default] "system_default"、"volatile"、"transient_local"之一
原始消息
如快速入门所示,mqtt_client不仅可以与其他mqtt_clients交换任意ROS消息,还可以与其他非mqtt_client MQTT客户端交换原始消息数据。这允许基于ROS的设备与不基于ROS的设备交换原始消息。primitive
参数可以为ROS到MQTT(bridge/ros2mqtt
)和MQTT到ROS(bridge/mqtt2ros
)传输设置。
如果将ROS到MQTT传输配置为primitive
,并且ROS消息类型是支持的原始ROS消息类型之一,则原始数据将作为字符串发布。支持的原始ROS消息类型有std_msgs/String
、std_msgs/Bool
、std_msgs/Char
、std_msgs/UInt8
、std_msgs/UInt16
、std_msgs/UInt32
、std_msgs/UInt64
、std_msgs/Int8
、std_msgs/Int16
、std_msgs/Int32
、std_msgs/Int64
、std_msgs/Float32
、std_msgs/Float32
。
如果将MQTT到ROS传输配置为primitive
,则MQTT消息将被解释并发布为原始数据类型(如果可能)。消息按以下顺序探测:bool
(std_msgs/Bool
)、int
(std_msgs/Int32
)、float
(std_msgs/Float32
)、string
(std_msgs/String
)。
延迟计算
mqtt_client提供内置功能,用于测量通过MQTT代理将ROS消息传输回ROS的延迟。注意,此功能仅适用于非原始消息(参见原始消息)。为此,发送客户端将当前时间戳注入MQTT消息中。接收客户端随后可以计算消息接收时间与注入时间戳之间的延迟。请注意,这仅在发送和接收机器上的时钟同步程度内准确。
为了将当前时间戳注入传出的MQTT消息中,必须为相应的bridge/ros2mqtt
条目设置inject_timestamp
参数。接收的mqtt_client随后会自动将测量的延迟(以秒为单位)作为ROS std_msgs/Float64
消息发布在主题/<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>
上。
这些延迟可以使用rostopic echo轻松打印
# ROS
rostopic echo --clear /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data
# ROS 2
ros2 topic echo /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data
或使用rqt_plot绘制:
# ROS
rosrun rqt_plot rqt_plot /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data
# ROS 2
ros2 run rqt_plot rqt_plot /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data
包摘要
这个简短的包摘要按照ROS Wiki风格指南记录了该包。
ROS
Nodelets
mqtt_client/MqttClient
使连接的基于ROS的设备或机器人能够通过MQTT代理使用MQTT协议交换ROS消息。
订阅的主题
<bridge/ros2mqtt[*]/ros_topic>
(topic_tools/ShapeShifter
) ROS主题,其消息被转换为MQTT消息并发送到MQTT代理。可以有任意ROS消息类型。
发布的主题
<bridge/mqtt2ros[*]/ros_topic>
(topic_tools/ShapeShifter
) 从MQTT代理接收到的MQTT消息在此ROS主题上发布。可以有任意ROS消息类型。~/latencies/<bridge/mqtt2ros[*]/ros_topic>
(std_msgs/Float64
) 如果接收到的消息中注入了时间戳(参见延迟计算),则在此处发布传输到<bridge/mqtt2ros[*]/ros_topic>
的消息的延迟测量结果。
服务
~is_connected
(mqtt_client/srv/IsConnected
) 返回客户端是否已连接到MQTT代理。
参数
参见配置。
ROS 2
组件
mqtt_client/MqttClient
使连接的基于ROS的设备或机器人能够通过MQTT代理使用MQTT协议交换ROS消息。
订阅的主题
<bridge/ros2mqtt/ros_topic>
(rclcpp::SerializedMessage
) 将其消息转换为MQTT消息并发送到MQTT代理的ROS主题。可以是任意ROS消息类型。
发布的主题
<bridge/mqtt2ros/ros_topic>
(rclcpp::SerializedMessage
) 在此ROS主题上发布从MQTT代理接收到的MQTT消息。可以是任意ROS消息类型。~/latencies/<bridge/mqtt2ros/ros_topic>
(std_msgs/Float64
) 如果接收到的消息中注入了时间戳(参见延迟计算),则在此处发布传输到<bridge/mqtt2ros/ros_topic>
的消息的延迟测量结果。
服务
-
~/is_connected
(mqtt_client/srv/IsConnected
) 返回客户端是否已连接到MQTT代理。 -
~/new_ros2mqtt_bridge
(mqtt_client/srv/NewRos2MqttBridge
) 返回是否创建了新的ROS -> MQTT桥接。 -
~/new_mqtt2ros_bridge
(mqtt_client/srv/NewMqtt2RosBridge
) 返回是否创建了新的MQTT -> ROS桥接。
参数
参见配置。
工作原理
ROS
mqtt_client 能够将任意消息类型的ROS消息桥接到MQTT代理。为此,它需要使用在运行时才确定形状的通用ROS订阅者和发布者。
这些通用的ROS订阅者和发布者通过topic_tools::ShapeShifter实现。对于在bridge/ros2mqtt/
下指定的每对ros_topic
和mqtt_topic
,都会设置一个具有以下回调签名的ROS订阅者:
void ros2mqtt(topic_tools::ShapeShifter::ConstPtr&, std::string&)
在回调内部,使用ros::serialization对在ros_topic
上接收到的通用消息进行序列化。然后,序列化后的形式就可以在指定的mqtt_topic
上发送到MQTT代理了。
在检索MQTT消息时,它会作为ROS消息在ROS网络上重新发布。为此,使用topic_tools::ShapeShifter::morph使ShapeShifter发布者采用特定ROS消息类型的形状。
但是,有关ROS消息类型的必要元信息只能在发布mqtt_client的ROS订阅者回调中通过调用topic_tools::ShapeShifter::getMD5Sum、topic_tools::ShapeShifter::getDataType和topic_tools::ShapeShifter::getMessageDefinition来提取。这些属性被包装在自定义类型mqtt_client::RosMsgType的ROS消息中,使用ros::serialization进行序列化,并通过MQTT代理在特殊主题上共享。
当mqtt_client接收到这样的ROS消息类型元信息时,它会使用topic_tools::ShapeShifter::morph配置相应的ROS ShapeShifter发布者。
mqtt_client还提供了测量通过MQTT代理将ROS消息传输回ROS的延迟的功能。为此,发送客户端将当前时间戳注入MQTT消息中。接收客户端随后可以计算消息接收时间与注入时间戳之间的延迟。有关是否注入时间戳的信息也包含在之前发送的自定义mqtt_client::RosMsgType消息中。实际的std::vector<uint8>
消息负载采用以下形式之一:
[... 序列化的时间戳 ... | ... 序列化的ROS消息 ...]
[... 序列化的ROS消息 ...]
总结一下,数据流如下:
- 在ROS主题
<ros2mqtt_ros_topic>
上接收到任意类型的ROS消息,并传递给通用回调- 提取ROS消息类型信息并包装为
RosMsgType
- 序列化ROS消息类型信息,并通过MQTT代理在MQTT主题
mqtt_client/ros_msg_type/<ros2mqtt_mqtt_topic>
上发送 - 序列化实际的ROS消息
- 如果
inject_timestamp
为真,则序列化当前时间戳并与消息连接 - 通过MQTT代理在MQTT主题
<ros2mqtt_mqtt_topic>
上发送实际的MQTT消息
- 提取ROS消息类型信息并包装为
- 在MQTT主题
mqtt_client/ros_msg_type/<ros2mqtt_mqtt_topic>
上接收包含ROS消息类型信息的MQTT消息- 提取消息类型信息并配置ShapeShifter ROS发布者
- 存储特定主题是否注入时间戳的信息
- 接收包含实际ROS消息的MQTT消息
- 根据是否注入时间戳,将其解码为序列化的ROS消息和序列化的时间戳
- 如果消息包含时间戳,则计算延迟并在ROS主题
~/latencies/<mqtt2ros_ros_topic>
上发布 - 使用ShapeShifter在ROS主题
<mqtt2ros_ros_topic>
上发布序列化的ROS消息
致谢
本研究在6GEM(项目编号:FKZ 16KISK036K)和UNICARagil(项目编号:FKZ 16EMO0284K)项目框架内完成。我们感谢德国联邦教育与研究部(BMBF)对这些项目的财政支持。