Project Icon

mqtt_client

ROS与MQTT的通用消息交换桥接库

mqtt_client是一个开源ROS软件包,为ROS和ROS 2系统提供MQTT协议通信功能。它支持任意ROS消息类型的交换,也可与非ROS设备进行原始消息通信。该库提供ROS节点和组件,支持包管理器安装、源码编译和Docker部署。mqtt_client具有易配置、加密连接和消息缓冲等特性,适用于需要ROS与MQTT互操作的项目。

mqtt_client

mqtt_client包提供了一个ROS节点或ROS 2组件节点,使连接的基于ROS的设备或机器人能够通过MQTT协议使用MQTT代理交换ROS消息。这适用于任意ROS消息类型。mqtt_client还可以与运行在非ROS设备上的MQTT客户端交换原始消息。

[!重要]
本仓库由亚琛工业大学汽车工程研究所(ika)开源维护。
V2X通信是我们车辆智能与自动驾驶领域众多研究主题之一。
如果您想了解更多关于我们如何支持您的自动驾驶或机器人项目,欢迎联系我们!
:email: opensource@ika.rwth-aachen.de

安装

mqtt_client包作为官方ROS / ROS 2包发布,可以通过包管理器轻松安装。

sudo apt update
sudo apt install ros-$ROS_DISTRO-mqtt-client

如果您想从源代码安装mqtt_client,只需将此仓库克隆到您的ROS工作空间中。所有在ROS package.xml中列出的依赖项都可以使用rosdep安装。

# mqtt_client$
rosdep install -r --ignore-src --from-paths .

# ROS
# workspace$
catkin build -DCMAKE_BUILD_TYPE=Release mqtt_client

# ROS 2
# workspace$
colcon build --packages-up-to mqtt_client --cmake-args -DCMAKE_BUILD_TYPE=Release

docker-ros

mqtt_client也可以作为Docker镜像使用,通过docker-ros容器化。

# ROS
docker run --rm ghcr.io/ika-rwth-aachen/mqtt_client:ros

# ROS 2
docker run --rm ghcr.io/ika-rwth-aachen/mqtt_client:ros2

使用

mqtt_client可以轻松集成到现有的基于ROS的系统中。以下首先提供了在单台机器上测试mqtt_client的快速入门指南。然后,在更复杂的应用中如何启动和配置它的更多细节将被呈现。

快速入门

按照以下步骤快速启动一个工作的mqtt_client,它通过MQTT代理将ROS消息发送给自己。

演示代理

假设MQTT代理(如Mosquitto)正在localhost:1883上运行。

对于此演示,您可以使用Docker轻松启动带有默认配置的Mosquitto

docker run --rm --network host --name mosquitto eclipse-mosquitto

有关设置自己的代理的更高级说明,请查看我们在Docker中运行启用了身份验证和加密的MQTT代理的说明这里

演示配置

mqtt_client最好使用ROS参数yaml文件进行配置。下面显示的配置(另请参见params.yaml / params.ros2.yaml)允许如下交换消息:

  • 在本地ROS主题/ping/ros上接收的ROS消息被发送到代理的MQTT主题pingpong/ros上;
  • 从代理的MQTT主题pingpong/ros接收的MQTT消息在本地ROS主题/pong/ros上发布;
  • 在本地ROS主题/ping/primitive上接收的原始ROS消息作为原始(字符串)消息发送到代理的MQTT主题pingpong/primitive上;
  • 从代理的MQTT主题pingpong/primitive接收的MQTT消息作为原始ROS消息在本地ROS主题/pong/primitive上发布。
broker:
  host: localhost
  port: 1883
bridge:
  ros2mqtt:
    - ros_topic: /ping/ros
      mqtt_topic: pingpong/ros
    - ros_topic: /ping/primitive
      mqtt_topic: pingpong/primitive
      primitive: true
  mqtt2ros:
    - mqtt_topic: pingpong/ros
      ros_topic: /pong/ros
    - mqtt_topic: pingpong/primitive
      ros_topic: /pong/primitive
      primitive: true

演示客户端启动

构建ROS工作空间后,使用预配置的演示参数通过roslaunch启动mqtt_client节点,应该会产生以下输出。

# ROS
roslaunch mqtt_client standalone.launch

# ROS 2
ros2 launch mqtt_client standalone.launch.ros2.xml
[ WARN] [1665575657.358869079]: 参数'broker/tls/enabled'未设置,默认为'0'
[ WARN] [1665575657.359798329]: 参数'client/id'未设置,默认为''
[ WARN] [1665575657.359810889]: 客户端ID为空时无法启用客户端缓冲
[ WARN] [1665575657.360300703]: 参数'client/clean_session'未设置,默认为'1'
[ WARN] [1665575657.360576344]: 参数'client/keep_alive_interval'未设置,默认为'60.000000'
[ WARN] [1665575657.360847295]: 参数'client/max_inflight'未设置,默认为'65535'

[ INFO] [1665575657.361281461]: 正在将ROS主题'/ping/ros'桥接到MQTT主题'pingpong/ros' [ INFO] [1665575657.361303380]: 正在将原始ROS主题'/ping/primitive'桥接到MQTT主题'pingpong/primitive' [ INFO] [1665575657.361352809]: 正在将MQTT主题'pingpong/ros'桥接到ROS主题'/pong/ros' [ INFO] [1665575657.361370558]: 正在将MQTT主题'pingpong/primitive'桥接到原始ROS主题'/pong/primitive' [ INFO] [1665575657.362153083]: 正在连接代理'tcp://localhost:1883'... [ INFO] [1665575657.462622065]: 已连接到代理'tcp://localhost:1883'


请注意,*mqtt_client*成功连接到代理并回显了正在桥接的ROS/MQTT主题。为了测试*mqtt_client*与自身及其他MQTT客户端之间的通信,请打开五个新终端。

为了测试*mqtt_clients*之间的通信,在ROS主题`/ping/ros`上发布任何ROS消息,并等待ROS主题`/pong/ros`上的响应。

```bash
# 第1个终端:向/ping发布ROS消息

# ROS
rostopic pub -r 1 /ping/ros std_msgs/String "Hello MQTT"

# ROS 2
ros2 topic pub /ping/ros std_msgs/msg/String "{data: \"Hello MQTT\"}"
# 第2个终端:监听/pong上的ROS消息

# ROS
rostopic echo /pong/ros

# ROS 2
ros2 topic echo /pong/ros

为了测试mqtt_client与其他MQTT客户端之间的通信,在ROS主题/ping/primitive上发布原始ROS消息,直接在MQTT主题pingpong/primitive上发布原始MQTT消息,并等待ROS主题/pong/primitive上的响应。请注意,您需要使用不同的配置文件重新启动ROS 2的mqtt_client

# ROS 2
# mqtt_client$
ros2 launch mqtt_client standalone.launch.ros2.xml params_file:=$(ros2 pkg prefix mqtt_client)/share/mqtt_client/config/params.ros2.primitive.yaml
# 第3个终端:向/ping/primitive发布原始ROS消息

# ROS
rostopic pub -r 1 /ping/primitive std_msgs/Int32 42

# ROS2
ros2 topic pub /ping/primitive std_msgs/msg/Int32 "{data: 42}"
# 第4个终端:监听/pong/primitive上的原始ROS消息

# ROS
rostopic echo /pong/primitive

# ROS2
ros2 topic echo /pong/primitive
# 第5个终端:使用mosquitto_pub直接向pingpong/primitive发布原始MQTT消息
docker run --rm --network host eclipse-mosquitto mosquitto_pub -h localhost -t "pingpong/primitive" --repeat 20 --repeat-delay 1 -m 69

如果一切正常,第二个终端应以1Hz的频率打印消息,而第四个终端应以1Hz的频率打印两个不同的消息。

启动

您可以使用以下命令启动mqtt_client节点:

# ROS
roslaunch mqtt_client standalone.launch

# ROS 2
ros2 launch mqtt_client standalone.launch.ros2.xml

这将自动加载提供的演示params.yaml / params.ros2.yaml。如果您想加载自定义配置文件,只需传递params_file

# ROS
roslaunch mqtt_client standalone.launch params_file:="</PATH/TO/PARAMS.YAML>"

# ROS 2
ros2 launch mqtt_client standalone.launch.ros2.xml params_file:="</PATH/TO/PARAMS.YAML>"

为了充分利用mqtt_client作为ROS节点/ROS 2组件的优势,将节点/组件加载到您自己的节点管理器/组件容器中。

配置

以下列出了mqtt_client支持的所有可用ROS参数及其默认值(在[]中)。

代理参数

broker:
  host:              # [localhost] 运行MQTT代理的机器的IP地址或主机名
  port:              # [1883] MQTT代理监听的端口
  user:              # 用于向代理进行身份验证的用户名(如果为空,将尝试匿名连接)
  pass:              # 用于向代理进行身份验证的密码
  tls:
    enabled:           # [false] 是否通过SSL/TLS连接
    ca_certificate:    # [/etc/ssl/certs/ca-certificates.crt] 客户端信任的CA证书文件(相对于ROS_HOME)

客户端参数

client:
  id:                   # 用于标识客户端的唯一ID字符串(代理可能允许空ID并自动生成一个)
  buffer:
    size:                 # [0] 未连接到代理时桥接缓冲的最大消息数(仅当客户端ID不为空时可用)
    directory:            # [buffer] 未连接到代理时用于缓冲消息的目录(相对于ROS_HOME)
  last_will:
    topic:                # 用于此客户端遗嘱消息的主题(如果未指定,则无遗嘱)
    message:              # [offline] 遗嘱消息
    qos:                  # [0] 遗嘱消息的QoS值
    retained:             # [false] 是否保留遗嘱消息
  clean_session:        # [true] 是否为此客户端使用清理会话
  keep_alive_interval:  # [60.0] 保活间隔(秒)
  max_inflight:         # [65535] 最大同时发送的消息数
  tls:
    certificate:          # 客户端证书文件(仅当代理要求客户端证书时需要;相对于ROS_HOME)
    key:                  # 客户端私钥文件(相对于ROS_HOME)
    password:             # 客户端私钥密码
    version:              # TLS版本(https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305)
    verify:               # 验证客户端是否应进行连接后检查
    alpn_protos:          # ALPN协议列表(https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_alpn_protos.html)

桥接参数

ROS
bridge:
  ros2mqtt:              # 指定将哪些ROS主题映射到哪些MQTT主题的数组
    - ros_topic:         # 其消息被转换为MQTT消息的ROS主题
      mqtt_topic:        # 相应的ROS消息被发送到代理的MQTT主题
      primitive:         # [false] 是否作为原始消息发布
      inject_timestamp:  # [false] 是否在ROS2MQTT负载中附加时间戳(用于接收端的延迟计算)
      advanced:
        ros:
          queue_size:    # [1] ROS订阅者队列大小
        mqtt:
          qos:           # [0] MQTT QoS值
          retained:      # [false] 是否保留MQTT消息
  mqtt2ros:              # 指定将哪些MQTT主题映射到哪些ROS主题的数组
    - mqtt_topic:        # 从代理接收消息的MQTT主题
      ros_topic:         # 相应的MQTT消息被发布的ROS主题
      primitive:         # [false] 是否作为原始消息发布(如果来自非ROS MQTT客户端)
      advanced:
        mqtt:
qos:           # [0] MQTT QoS值
        ros:
          queue_size:    # [1] ROS发布者队列大小
          latched:       # [false] 是否锁存ROS消息

##### ROS 2

```yaml
bridge:
  ros2mqtt:                # 指定将哪些ROS主题映射到哪些MQTT主题的对象
    ros_topics:            # 指定要桥接的ROS主题的数组
      - {{ ros_topic_name }} # 应该被桥接的ROS主题,对应YAML中的子对象
    {{ ros_topic_name }}:
      mqtt_topic:          # 相应的ROS消息发送到代理的MQTT主题
      primitive:           # [false] 是否作为原始消息发布
      ros_type:            # [*空*] 如果设置,将使用提供的ROS消息类型。如果为空,则通过发布者自动推断类型
      inject_timestamp:    # [false] 是否在ROS2MQTT负载中附加时间戳(用于接收端计算延迟)
      advanced:
        ros:
          queue_size:      # [1] ROS订阅者队列大小
          qos:
            reliability:   # [auto] "auto"、"system_default"、"reliable"、"best_effort"之一。如果为auto,则通过发布者自动确定QoS
            durability:    # [auto] "auto"、"system_default"、"volatile"、"transient_local"之一。如果为auto,则通过发布者自动确定QoS
        mqtt:
          qos:             # [0] MQTT QoS值
          retained:        # [false] 是否保留MQTT消息
  mqtt2ros:                # 指定将哪些MQTT主题映射到哪些ROS主题的对象
    mqtt_topics:           # 指定要桥接的ROS主题的数组
      - {{ mqtt_topic_name }} # 应该被桥接的MQTT主题,对应YAML中的子对象
    {{ mqtt_topic_name }}:
      ros_topic:           # 相应的MQTT消息发布的ROS主题
      ros_type:            # [*空*] 如果设置,将使用提供的ROS消息类型。如果为空,则通过MQTT消息自动推断类型
      primitive:           # [false] 是否作为原始消息发布(如果来自非ROS MQTT客户端)
      advanced:
        mqtt:
          qos:             # [0] MQTT QoS值
        ros:
          queue_size:      # [1] ROS发布者队列大小
          latched:         # [false] 是否锁存ROS消息
          qos:
            reliability:   # [system_default] "system_default"、"reliable"、"best_effort"之一
            durability:    # [system_default] "system_default"、"volatile"、"transient_local"之一

原始消息

快速入门所示,mqtt_client不仅可以与其他mqtt_clients交换任意ROS消息,还可以与其他非mqtt_client MQTT客户端交换原始消息数据。这允许基于ROS的设备与不基于ROS的设备交换原始消息。primitive参数可以为ROS到MQTT(bridge/ros2mqtt)和MQTT到ROS(bridge/mqtt2ros)传输设置。

如果将ROS到MQTT传输配置为primitive,并且ROS消息类型是支持的原始ROS消息类型之一,则原始数据将作为字符串发布。支持的原始ROS消息类型有std_msgs/Stringstd_msgs/Boolstd_msgs/Charstd_msgs/UInt8std_msgs/UInt16std_msgs/UInt32std_msgs/UInt64std_msgs/Int8std_msgs/Int16std_msgs/Int32std_msgs/Int64std_msgs/Float32std_msgs/Float32

如果将MQTT到ROS传输配置为primitive,则MQTT消息将被解释并发布为原始数据类型(如果可能)。消息按以下顺序探测:boolstd_msgs/Bool)、intstd_msgs/Int32)、floatstd_msgs/Float32)、stringstd_msgs/String)。

延迟计算

mqtt_client提供内置功能,用于测量通过MQTT代理将ROS消息传输回ROS的延迟。注意,此功能仅适用于非原始消息(参见原始消息)。为此,发送客户端将当前时间戳注入MQTT消息中。接收客户端随后可以计算消息接收时间与注入时间戳之间的延迟。请注意,这仅在发送和接收机器上的时钟同步程度内准确。

为了将当前时间戳注入传出的MQTT消息中,必须为相应的bridge/ros2mqtt条目设置inject_timestamp参数。接收的mqtt_client随后会自动将测量的延迟(以秒为单位)作为ROS std_msgs/Float64消息发布在主题/<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>上。

这些延迟可以使用rostopic echo轻松打印

# ROS
rostopic echo --clear /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data

# ROS 2
ros2 topic echo /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data

或使用rqt_plot绘制:

# ROS
rosrun rqt_plot rqt_plot /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data

# ROS 2
ros2 run rqt_plot rqt_plot /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data

包摘要

这个简短的包摘要按照ROS Wiki风格指南记录了该包。

ROS

Nodelets

mqtt_client/MqttClient

使连接的基于ROS的设备或机器人能够通过MQTT代理使用MQTT协议交换ROS消息。

订阅的主题
  • <bridge/ros2mqtt[*]/ros_topic> (topic_tools/ShapeShifter) ROS主题,其消息被转换为MQTT消息并发送到MQTT代理。可以有任意ROS消息类型。
发布的主题
  • <bridge/mqtt2ros[*]/ros_topic> (topic_tools/ShapeShifter) 从MQTT代理接收到的MQTT消息在此ROS主题上发布。可以有任意ROS消息类型。
  • ~/latencies/<bridge/mqtt2ros[*]/ros_topic> (std_msgs/Float64) 如果接收到的消息中注入了时间戳(参见延迟计算),则在此处发布传输到 <bridge/mqtt2ros[*]/ros_topic> 的消息的延迟测量结果。
服务
参数

参见配置

ROS 2

组件

mqtt_client/MqttClient

使连接的基于ROS的设备或机器人能够通过MQTT代理使用MQTT协议交换ROS消息。

订阅的主题
  • <bridge/ros2mqtt/ros_topic> (rclcpp::SerializedMessage) 将其消息转换为MQTT消息并发送到MQTT代理的ROS主题。可以是任意ROS消息类型。
发布的主题
  • <bridge/mqtt2ros/ros_topic> (rclcpp::SerializedMessage) 在此ROS主题上发布从MQTT代理接收到的MQTT消息。可以是任意ROS消息类型。
  • ~/latencies/<bridge/mqtt2ros/ros_topic> (std_msgs/Float64) 如果接收到的消息中注入了时间戳(参见延迟计算),则在此处发布传输到 <bridge/mqtt2ros/ros_topic> 的消息的延迟测量结果。
服务
参数

参见配置

工作原理

ROS

mqtt_client 能够将任意消息类型的ROS消息桥接到MQTT代理。为此,它需要使用在运行时才确定形状的通用ROS订阅者和发布者。

这些通用的ROS订阅者和发布者通过topic_tools::ShapeShifter实现。对于在bridge/ros2mqtt/下指定的每对ros_topicmqtt_topic,都会设置一个具有以下回调签名的ROS订阅者:

void ros2mqtt(topic_tools::ShapeShifter::ConstPtr&, std::string&)

在回调内部,使用ros::serialization对在ros_topic上接收到的通用消息进行序列化。然后,序列化后的形式就可以在指定的mqtt_topic上发送到MQTT代理了。

在检索MQTT消息时,它会作为ROS消息在ROS网络上重新发布。为此,使用topic_tools::ShapeShifter::morph使ShapeShifter发布者采用特定ROS消息类型的形状。

但是,有关ROS消息类型的必要元信息只能在发布mqtt_client的ROS订阅者回调中通过调用topic_tools::ShapeShifter::getMD5Sumtopic_tools::ShapeShifter::getDataTypetopic_tools::ShapeShifter::getMessageDefinition来提取。这些属性被包装在自定义类型mqtt_client::RosMsgType的ROS消息中,使用ros::serialization进行序列化,并通过MQTT代理在特殊主题上共享。

mqtt_client接收到这样的ROS消息类型元信息时,它会使用topic_tools::ShapeShifter::morph配置相应的ROS ShapeShifter发布者。

mqtt_client还提供了测量通过MQTT代理将ROS消息传输回ROS的延迟的功能。为此,发送客户端将当前时间戳注入MQTT消息中。接收客户端随后可以计算消息接收时间与注入时间戳之间的延迟。有关是否注入时间戳的信息也包含在之前发送的自定义mqtt_client::RosMsgType消息中。实际的std::vector<uint8>消息负载采用以下形式之一:

[... 序列化的时间戳 ... | ... 序列化的ROS消息 ...]
[... 序列化的ROS消息 ...]

总结一下,数据流如下:

  • 在ROS主题<ros2mqtt_ros_topic>上接收到任意类型的ROS消息,并传递给通用回调
    • 提取ROS消息类型信息并包装为RosMsgType
    • 序列化ROS消息类型信息,并通过MQTT代理在MQTT主题mqtt_client/ros_msg_type/<ros2mqtt_mqtt_topic>上发送
    • 序列化实际的ROS消息
    • 如果inject_timestamp为真,则序列化当前时间戳并与消息连接
    • 通过MQTT代理在MQTT主题<ros2mqtt_mqtt_topic>上发送实际的MQTT消息
  • 在MQTT主题mqtt_client/ros_msg_type/<ros2mqtt_mqtt_topic>上接收包含ROS消息类型信息的MQTT消息
    • 提取消息类型信息并配置ShapeShifter ROS发布者
    • 存储特定主题是否注入时间戳的信息
  • 接收包含实际ROS消息的MQTT消息
    • 根据是否注入时间戳,将其解码为序列化的ROS消息和序列化的时间戳
    • 如果消息包含时间戳,则计算延迟并在ROS主题~/latencies/<mqtt2ros_ros_topic>上发布
    • 使用ShapeShifter在ROS主题<mqtt2ros_ros_topic>上发布序列化的ROS消息

致谢

本研究在6GEM(项目编号:FKZ 16KISK036K)和UNICARagil(项目编号:FKZ 16EMO0284K)项目框架内完成。我们感谢德国联邦教育与研究部(BMBF)对这些项目的财政支持。

项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

白日梦AI

白日梦AI提供专注于AI视频生成的多样化功能,包括文生视频、动态画面和形象生成等,帮助用户快速上手,创造专业级内容。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

讯飞绘镜

讯飞绘镜是一个支持从创意到完整视频创作的智能平台,用户可以快速生成视频素材并创作独特的音乐视频和故事。平台提供多样化的主题和精选作品,帮助用户探索创意灵感。

Project Cover

讯飞文书

讯飞文书依托讯飞星火大模型,为文书写作者提供从素材筹备到稿件撰写及审稿的全程支持。通过录音智记和以稿写稿等功能,满足事务性工作的高频需求,帮助撰稿人节省精力,提高效率,优化工作与生活。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号