Skip to main content

5. 话题通信 Topic

上一讲的节点把计算结果打印到日志就结束了,外面没有人能用到这个数据。这一讲让节点通过 Topic 持续广播关节角,再写一个订阅节点接收并验证。


前言:节点还不会"说话"

上一讲我们把 FK/IK 函数接进了 ROS2 节点,节点启动时计算一次结果,打印到日志,然后就什么都不做了。

这有一个根本问题:结果只在节点内部,外面没有人能用到它

真实系统里,运动学节点算出的关节角需要发给控制器,控制器再驱动电机。这两个模块之间怎么传数据?答案就是本讲的主角——Topic

上一讲的节点本讲升级后
计算结果只打印日志把结果发布到 Topic
只能单机单进程使用任何节点都能订阅
启动一次就结束持续运行,持续发布

5.1 Topic 是什么

5.1.1 用广播电台类比

Topic 就像广播电台:

  • 电台(Publisher):持续播出节目,不管有没有人在听
  • 收音机(Subscriber):调到某个频道,有信号就处理
  • 频道名(Topic 名)/joint_states/target_point
  • 节目格式(消息类型)sensor_msgs/msg/JointStategeometry_msgs/msg/Point

电台和收音机互不认识,只要频道和格式对上,就能通信。这就是解耦

5.1.2 和函数调用的区别

函数调用(紧耦合):
节点A ──直接调用──→ 节点B
A 必须知道 B 的存在,B 挂了 A 也挂

Topic(松耦合):
节点A ──发布──→ /topic ──订阅──→ 节点B
A 不知道 B 存在,B 挂了 A 继续运行

5.1.3 Topic 的三个关键属性

属性说明示例
名称全局唯一的字符串,以 / 开头/joint_states
消息类型发布者和订阅者必须一致sensor_msgs/msg/JointState
QoS可靠性、队列深度等策略默认 depth=10

5.2 选择合适的消息类型

5.2.1 不要重复造轮子

ROS2 提供了大量标准消息类型,优先使用它们,而不是自定义。

本讲用到的消息类型:

场景消息类型关键字段
发布目标点geometry_msgs/msg/Pointx, y, z
发布关节角sensor_msgs/msg/JointStatename, position

5.2.2 JointState 消息结构

sensor_msgs/msg/JointState 是 ROS2 里描述关节状态的标准消息,结构如下:

JointState
├── header # 时间戳和坐标系 ID
│ ├── stamp # 时间戳
│ └── frame_id # 参考坐标系(通常填 "base_link")
├── name # 关节名称列表,如 ["joint1", "joint2"]
├── position # 关节角度列表(弧度),如 [0.523, 0.785]
├── velocity # 关节速度(可选,不用时留空)
└── effort # 关节力矩(可选,不用时留空)

注意position 单位是弧度,不是度。我们的 FK/IK 函数用的是度,发布前需要转换。

5.2.3 查看消息结构的方法

# 查看消息字段定义
ros2 interface show sensor_msgs/msg/JointState

# 查看所有可用的消息类型
ros2 interface list | grep sensor_msgs

5.3 发布关节角——升级 kinematics_node

5.3.1 思路

上一讲的 kinematics_node 只在启动时计算一次。现在我们改造它:

  1. 用定时器每秒触发一次
  2. 每次随机生成关节角,计算 FK
  3. 把关节角发布到 /kinematics/joint_states

为什么不用 /joint_states /joint_states 是 ROS2 的标准 topic,robot_state_publisher 等系统节点也会往这里发布消息。如果我们也发布到同一个 topic,订阅者会同时收到来自多个发布者的混合数据,出现大量 θ1=0°, θ2=0° 的默认值。用 /kinematics/joint_states 作为专用 topic,可以完全避免这个问题。

5.3.2 完整代码

在包的模块目录下创建新文件:

vim $ROS_WS/src/my_robot_basics/my_robot_basics/kinematics_publisher.py
# my_robot_basics/kinematics_publisher.py
# 运行方式:ros2 run my_robot_basics kinematics_publisher
#
# ============================================================
# 节点目标:每秒随机生成一组关节角,计算 FK,把关节角发布到
# /kinematics/joint_states Topic,供其他节点(订阅者、RViz2、
# robot_state_publisher 等)使用
# 执行逻辑:
# 1. __init__ 创建发布者和 1 秒定时器
# 2. 每次定时器触发 → timer_callback 执行
# 3. 随机生成 θ1、θ2 → 调用 FK 算末端位置(仅用于日志)
# 4. 构造 JointState 消息 → publish() 发出去
# ============================================================

import math
import random

import rclpy
from rclpy.node import Node
# JointState 是 ROS2 标准消息,描述关节状态
# 包含 header(时间戳)、name(关节名列表)、position(角度,弧度)等字段
from sensor_msgs.msg import JointState

from my_robot_basics.two_link_kinematics import forward_kinematics


class KinematicsPublisher(Node):
def __init__(self):
super().__init__('kinematics_publisher')

# create_publisher(消息类型, Topic名称, 队列深度)
# 队列深度 10:如果消费者处理慢,最多缓存 10 条消息,超出则丢弃最旧的
# Topic 名称以 '/' 开头表示绝对路径,不受命名空间影响
self.publisher_ = self.create_publisher(JointState, '/kinematics/joint_states', 10)

# create_timer(周期秒数, 回调):每 1 秒触发一次 timer_callback
self.timer = self.create_timer(1.0, self.timer_callback)

self.L1 = 1.0
self.L2 = 0.8

self.get_logger().info('kinematics_publisher 启动,每秒发布一次关节角...')

def timer_callback(self):
# 随机生成关节角(度)
theta1_deg = random.uniform(-90.0, 90.0)
theta2_deg = random.uniform(-120.0, 120.0)

# 计算末端位置(仅用于日志,不发布)
x, y = forward_kinematics(theta1_deg, theta2_deg, self.L1, self.L2)

# 构造 JointState 消息
msg = JointState()

# header.stamp:消息的时间戳,tf2 和 RViz2 依赖它做时间同步
# get_clock().now() 获取节点当前时间,to_msg() 转为消息格式
# 不填时间戳会导致后续 tf2 坐标系同步出问题
msg.header.stamp = self.get_clock().now().to_msg()

# header.frame_id:消息所在的参考坐标系,JointState 通常填 'base_link'
msg.header.frame_id = 'base_link'

# name:关节名称列表,必须和 URDF 里的 <joint name="..."> 完全一致
# robot_state_publisher 靠这个名字匹配 URDF 里的关节
msg.name = ['joint1', 'joint2']

# position:关节角度列表,单位是弧度(ROS2 约定)
# 我们的 FK/IK 函数用度,发布前必须转换
msg.position = [math.radians(theta1_deg), math.radians(theta2_deg)]

# publish() 把消息发送到 /kinematics/joint_states Topic
# 所有订阅了这个 Topic 的节点都会收到这条消息
self.publisher_.publish(msg)

self.get_logger().info(
f'发布: θ1={theta1_deg:6.1f}°, θ2={theta2_deg:6.1f}° → 末端=({x:.3f}, {y:.3f})'
)


def main(args=None):
rclpy.init(args=args)
node = KinematicsPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

5.3.3 注册入口点

编辑 $ROS_WS/src/my_robot_basics/setup.py,在 console_scripts 里追加:

'console_scripts': [
'hello_node = my_robot_basics.hello_node:main',
'kinematics_node = my_robot_basics.kinematics_node:main',
'kinematics_publisher = my_robot_basics.kinematics_publisher:main', # 新增
],

5.3.4 构建并运行

cd $ROS_WS
colcon build --packages-select my_robot_basics
source install/setup.bash
ros2 run my_robot_basics kinematics_publisher

预期输出(每秒一行,数值随机):

[INFO] [...] [kinematics_publisher]: kinematics_publisher 启动,每秒发布一次关节角...
[INFO] [...] [kinematics_publisher]: 发布: θ1= 23.4°, θ2= -67.8° → 末端=(1.234, 0.567)
[INFO] [...] [kinematics_publisher]: 发布: θ1= -45.1°, θ2= 112.3° → 末端=(-0.345, 0.901)

5.4 订阅关节角——写 IK 节点

5.4.1 思路

现在写一个订阅节点,接收 /kinematics/joint_states,用 FK 算出末端位置,再用 IK 反算关节角,验证 round-trip 是否正确。

vim $ROS_WS/src/my_robot_basics/my_robot_basics/kinematics_subscriber.py
# my_robot_basics/kinematics_subscriber.py
# 运行方式:ros2 run my_robot_basics kinematics_subscriber
#
# ============================================================
# 节点目标:订阅 /kinematics/joint_states Topic,收到关节角后做 FK + IK
# round-trip 验证,确认发布者的数据是正确的
# 执行逻辑:
# 1. __init__ 注册订阅者,绑定回调函数
# 2. spin() 进入事件循环,等待消息到来
# 3. 每当 /kinematics/joint_states 有新消息 → callback 自动被调用
# 4. callback 里:弧度→度 → FK → IK → 打印对比结果
# ============================================================

import math

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState

from my_robot_basics.two_link_kinematics import forward_kinematics, inverse_kinematics


class KinematicsSubscriber(Node):
def __init__(self):
super().__init__('kinematics_subscriber')

# create_subscription(消息类型, Topic名称, 回调函数, 队列深度)
# 队列深度 10:如果回调处理慢,最多缓存 10 条待处理消息
# 回调函数 self.callback 会在每次收到消息时被 spin() 调度执行
# 注意:回调函数签名必须是 def callback(self, msg),msg 类型与订阅类型一致
self.subscription = self.create_subscription(
JointState,
'/kinematics/joint_states',
self.callback,
10,
)

self.L1 = 1.0
self.L2 = 0.8

self.get_logger().info('kinematics_subscriber 启动,等待关节角消息...')

def callback(self, msg):
# msg 是收到的 JointState 消息对象
# 防御性检查:确保消息里有至少 2 个关节角
if len(msg.position) < 2:
return

# JointState.position 单位是弧度,转换为度供 FK/IK 函数使用
theta1_deg = math.degrees(msg.position[0])
theta2_deg = math.degrees(msg.position[1])

# FK:用收到的关节角算出末端位置
x, y = forward_kinematics(theta1_deg, theta2_deg, self.L1, self.L2)

# IK:把末端位置反算回关节角,和原始值对比(round-trip 验证)
try:
solutions = inverse_kinematics(x, y, self.L1, self.L2)
# 取肘下解做对比(发布者随机生成的角度不一定是肘下解,但末端位置应一致)
ik_t1, ik_t2 = solutions['elbow_down']
self.get_logger().info(
f'收到: θ1={theta1_deg:6.1f}°, θ2={theta2_deg:6.1f}° | '
f'FK→({x:.3f},{y:.3f}) | '
f'IK验证: θ1={ik_t1:.1f}°, θ2={ik_t2:.1f}°'
)
except ValueError as e:
self.get_logger().warn(f'IK 失败: {e}')


def main(args=None):
rclpy.init(args=args)
node = KinematicsSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

5.4.2 注册入口点

继续在 setup.pyconsole_scripts 里追加(当前完整内容):

'console_scripts': [
'hello_node = my_robot_basics.hello_node:main',
'kinematics_node = my_robot_basics.kinematics_node:main',
'kinematics_publisher = my_robot_basics.kinematics_publisher:main',
'kinematics_subscriber = my_robot_basics.kinematics_subscriber:main', # 新增
],

5.4.3 构建并运行

终端 1:启动发布者

cd $ROS_WS && source install/setup.bash
ros2 run my_robot_basics kinematics_publisher

终端 2:启动订阅者

cd $ROS_WS && source install/setup.bash
ros2 run my_robot_basics kinematics_subscriber

预期输出(终端 2):

[INFO] [...] [kinematics_subscriber]: kinematics_subscriber 启动,等待关节角消息...
[INFO] [...] [kinematics_subscriber]: 收到: θ1= 23.4°, θ2= -67.8° | FK→(1.234,0.567) | IK验证: θ1=23.4°, θ2=-67.8°

IK 验证的角度和收到的角度一致,说明 round-trip 正确。


5.5 用 CLI 观察消息流

节点运行时,新开一个终端,source 环境后执行以下命令:

# 查看当前所有 Topic
ros2 topic list

# 查看 /kinematics/joint_states 的消息类型和发布者/订阅者数量
ros2 topic info /kinematics/joint_states

# 实时打印消息内容
ros2 topic echo /kinematics/joint_states

# 测量发布频率
ros2 topic hz /kinematics/joint_states

ros2 topic echo 的输出类似:

header:
stamp:
sec: 1234567890
nanosec: 123456789
frame_id: base_link
name:
- joint1
- joint2
position:
- 0.4084
- -1.1834
velocity: []
effort: []
---

这是验证系统是否正常工作的第一手段,比看日志更直接。


5.6 QoS 基础

5.6.1 为什么需要 QoS

不同场景对消息传递的要求不同:

  • 传感器数据:允许丢失,但要实时(不需要历史消息)
  • 控制指令:必须可靠送达,不能丢
  • 状态广播:只关心最新值,历史没意义

QoS(Quality of Service)就是用来描述这些需求的。

5.6.2 入门只需要知道两个参数

参数选项说明
可靠性RELIABLE确保送达,有重传机制
BEST_EFFORT尽力而为,允许丢失
队列深度整数(如 10)缓存多少条消息

本讲用的 depth=10 是最常见的入门配置,适合大多数场景。

5.6.3 发布者和订阅者的 QoS 必须兼容

如果发布者用 RELIABLE,订阅者用 BEST_EFFORT,消息可能收不到,且不会有任何报错——这是初学者常见的坑。

# 查看 Topic 的 QoS 配置
ros2 topic info /kinematics/joint_states --verbose

5.7 常见错误

5.7.1 错误一:收不到消息,但节点都在运行

排查步骤

# 1. 确认 Topic 存在
ros2 topic list | grep kinematics/joint_states

# 2. 确认消息类型一致
ros2 topic info /kinematics/joint_states

# 3. 直接 echo 看有没有数据
ros2 topic echo /kinematics/joint_states

最常见原因:发布者和订阅者的 Topic 名字拼写不一致(大小写、斜杠)。


5.7.2 错误二:position 单位搞错

# 错误:直接把度数放进 position
msg.position = [theta1_deg, theta2_deg]

# 正确:转换为弧度
msg.position = [math.radians(theta1_deg), math.radians(theta2_deg)]

JointState.position 的单位是弧度,这是 ROS2 的约定。如果填度数,RViz2 里的机械臂会转到完全错误的位置。


5.7.3 错误三:忘记填 header.stamp

# 错误:不填时间戳
msg = JointState()
msg.name = ['joint1', 'joint2']

# 正确:填上时间戳
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = ['joint1', 'joint2']

tf2 和 RViz2 依赖时间戳来同步坐标系,不填会导致后续讲次里出现奇怪的显示问题。


5.8 练习题

5.8.1 练习 1:修改发布频率

kinematics_publisher 的发布频率从 1 Hz 改为 5 Hz,用 ros2 topic hz 验证频率变化。

参考答案

# 把 1.0 改为 0.2(周期 = 1/频率)
self.timer = self.create_timer(0.2, self.timer_callback)
ros2 topic hz /kinematics/joint_states
# 预期输出:average rate: 5.000

5.8.2 练习 2:发布目标点,订阅后调用 IK

创建一个新节点 ik_solver_node

  • 订阅 /target_pointgeometry_msgs/msg/Point
  • 在回调里调用 inverse_kinematics
  • 把结果打印出来,不可达时打印警告

参考答案

# my_robot_basics/ik_solver_node.py
#
# ============================================================
# 节点目标:订阅 /target_point(目标末端位置),收到后调用 IK
# 求解关节角并打印,不可达时打印警告
# 执行逻辑:
# 1. 订阅 /target_point(geometry_msgs/Point 类型)
# 2. 收到消息 → callback → 调用 inverse_kinematics
# 3. 成功则打印两组解,失败则打印警告
# ============================================================

import rclpy
from rclpy.node import Node
# geometry_msgs/Point:只有 x、y、z 三个 float64 字段,适合传递坐标点
from geometry_msgs.msg import Point

from my_robot_basics.two_link_kinematics import inverse_kinematics


class IKSolverNode(Node):
def __init__(self):
super().__init__('ik_solver_node')
self.subscription = self.create_subscription(
Point, '/target_point', self.callback, 10
)
self.L1 = 1.0
self.L2 = 0.8

def callback(self, msg):
# msg.x, msg.y 是目标末端坐标;msg.z 对平面机械臂无意义,忽略
try:
solutions = inverse_kinematics(msg.x, msg.y, self.L1, self.L2)
for name, (t1, t2) in solutions.items():
self.get_logger().info(f'IK {name}: θ1={t1:.2f}°, θ2={t2:.2f}°')
except ValueError as e:
self.get_logger().warn(str(e))


def main(args=None):
rclpy.init(args=args)
node = IKSolverNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

用 CLI 手动发一条消息测试:

ros2 topic pub --once /target_point geometry_msgs/msg/Point "{x: 1.2, y: 0.8, z: 0.0}"

5.8.3 练习 3:用 CLI 找出 Topic 的发布者和订阅者

运行 kinematics_publisherkinematics_subscriber 后,用以下命令找出 /kinematics/joint_states 的发布者和订阅者各是谁:

ros2 topic info /kinematics/joint_states --verbose

观察输出,理解 Publisher count 和 Subscriber count 的含义。


本讲核心总结

概念一句话理解
Topic广播式通信,发布者和订阅者解耦,通过名称和类型匹配
Publishercreate_publisher(类型, 名称, 队列深度),配合定时器持续发布
Subscribercreate_subscription(类型, 名称, 回调, 队列深度),收到消息自动触发回调
JointStateROS2 标准关节状态消息,position 单位是弧度
QoS控制消息可靠性和缓存策略,发布者和订阅者必须兼容
CLI 验证topic list/info/echo/hz 是排查通信问题的第一手段

参考代码

本讲对应的完整参考代码位于:

ros_ws/reference/src/my_robot_basics/my_robot_basics/
├── kinematics_publisher.py ← 5.3 发布关节角到 /kinematics/joint_states
└── kinematics_subscriber.py ← 5.4 订阅并做 round-trip 验证

使用方式

cp $ROS_WS/reference/src/my_robot_basics/my_robot_basics/kinematics_publisher.py \
$ROS_WS/src/my_robot_basics/my_robot_basics/
cp $ROS_WS/reference/src/my_robot_basics/my_robot_basics/kinematics_subscriber.py \
$ROS_WS/src/my_robot_basics/my_robot_basics/

同时参考 reference/src/my_robot_basics/setup.pyconsole_scripts 的完整写法,确认入口点已注册。


下一讲预告

6. 服务通信 Service 与动作 Action

Topic 适合持续数据流,但有些场景需要"请求-响应":

  • 让 IK 节点按需计算,而不是持续广播
  • 执行一个需要时间的运动任务,并在完成时得到通知

下一讲我们会:

  • 用 Service 实现"给我一个目标点,返回关节角"的 IK 服务
  • 用 Action 实现"执行运动,实时反馈进度,支持中途取消"
  • 理解三种通信模式(Topic / Service / Action)各自的适用场景