跳到主要内容

6. 服务通信 Service 与动作 Action

Topic 适合持续广播,但有些场景需要"给我一个目标点,返回关节角"这样的一问一答。这一讲学 Service 和 Action,用 Service 实现 IK 求解,用 Action 模拟带进度反馈的运动执行。


前言:Topic 不够用的场景

上一讲我们让节点通过 Topic 持续广播关节角。但现实中有些需求不是"持续广播",而是"我问你答":

"给我目标点 (1.2, 0.8),算出关节角是多少?"

用 Topic 做这件事很别扭——你得发一条消息,再订阅另一个 Topic 等结果,还不知道结果是不是对应你这次请求的。

这就是 Service 存在的原因。而当任务需要时间、需要中途反馈时,还有 Action

场景适合的通信方式
传感器数据持续推送Topic
给目标点,立即返回 IK 解Service
发送运动目标,等待执行完成Action

6.1 三种通信模式对比

6.1.1 一张图理解差异

Topic(广播):
发布者 ──→ /joint_states ──→ 订阅者A
──→ 订阅者B

Service(一问一答):
客户端 ──请求──→ 服务端
←──响应──

Action(长任务):
客户端 ──Goal──→ 服务端
←──Feedback── (持续)
←──Result── (完成时)
──Cancel──→ (可中途取消)

6.1.2 选择原则

  • 数据需要持续流动 → Topic
  • 立即算完就返回,像函数调用 → Service
  • 任务耗时,需要进度,可取消 → Action

6.2 自定义 Service 接口

6.2.1 为什么需要自定义接口

标准消息库里没有"给目标点返回关节角"这种接口,需要自己定义。

6.2.2 创建接口包

ROS2 中自定义接口需要单独的包(必须用 ament_cmake,不能用 ament_python):

cd $ROS_WS/src
ros2 pkg create --build-type ament_cmake my_robot_interfaces
mkdir my_robot_interfaces/srv

6.2.3 定义 .srv 文件

vim $ROS_WS/src/my_robot_interfaces/srv/SolveIK.srv
# 请求:目标末端位置
float64 x
float64 y
float64 l1
float64 l2
---
# 响应:求解结果
bool success
float64 theta1_deg
float64 theta2_deg
string message

--- 上面是请求字段,下面是响应字段。

6.2.4 配置 CMakeLists.txt

.srv 只是文本描述;编译时需要由 rosidl 生成各语言可用的消息类型(例如 Python 里的 SolveIK)。CMake 负责告诉构建系统:用哪个生成器、要处理哪些接口文件

vim $ROS_WS/src/my_robot_interfaces/CMakeLists.txt

ros2 pkg create 生成的模板里只有 find_package(ament_cmake REQUIRED)。请把文件中「查找依赖」那一段改成与下面一致顺序不能错:先有 rosidl_default_generators,再调用 rosidl_generate_interfaces;只抄后两行会报 Unknown CMake command "rosidl_generate_interfaces"):

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
"srv/SolveIK.srv"
)

ament_export_dependencies(rosidl_default_runtime)

其余模板内容(如 ament_package()BUILD_TESTING 等)保留不动即可。

action/MoveTo.action 在 6.4 创建后再补充进来,现在先只注册 srv 文件。届时在 rosidl_generate_interfaces再加一行 "action/MoveTo.action" 即可。

6.2.5 配置 package.xml

package.xmlCMakeLists.txt 分工不同:前者给 rosdep / colcon 看依赖关系,后者给 CMake 编译步骤 下指令。接口包两边都要写对,否则会出现「编译过了但运行找不到类型」或「构建顺序不对」等问题。

<buildtool_depend>ament_cmake</buildtool_depend> 后面加入:

<!-- 编译期:调用 rosidl 从 .srv/.action 生成代码(与 CMake 里的 generators 对应) -->
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<!-- 运行期:节点使用 SolveIK 等类型时需要(与 ament_export_dependencies 对应) -->
<exec_depend>rosidl_default_runtime</exec_depend>
<!-- 声明本包属于「接口包」分组,便于工具识别与依赖解析 -->
<member_of_group>rosidl_interface_packages</member_of_group>

6.3 实现 IK Service

6.3.1 Service 服务端

vim $ROS_WS/src/my_robot_basics/my_robot_basics/ik_service_node.py
# my_robot_basics/ik_service_node.py
# 运行方式:ros2 run my_robot_basics ik_service_node
#
# ============================================================
# 节点目标:提供 /solve_ik 服务,客户端发来目标点 (x, y) 后
# 立即返回 IK 求解结果(关节角),类似一次函数调用
# 执行逻辑:
# 1. __init__ 注册服务,绑定 handle_request 回调
# 2. spin() 等待客户端请求
# 3. 收到请求 → handle_request 被调用
# 4. 调用 inverse_kinematics → 填写 response → 返回
# 5. rclpy 自动把 response 发回给客户端
# 与 Topic 的区别:Service 是一问一答,客户端会阻塞等待响应
# ============================================================

import rclpy
from rclpy.node import Node

# SolveIK 是我们在 my_robot_interfaces/srv/SolveIK.srv 里定义的接口
# 编译后自动生成 Python 类,包含 SolveIK.Request 和 SolveIK.Response
from my_robot_interfaces.srv import SolveIK
from my_robot_basics.two_link_kinematics import inverse_kinematics


class IKServiceNode(Node):
def __init__(self):
super().__init__('ik_service_node')

# create_service(接口类型, 服务名称, 回调函数)
# 服务名称 '/solve_ik' 是绝对路径,客户端用同样的名称连接
# 回调函数签名必须是 def handle_request(self, request, response)
# request:客户端发来的请求对象(对应 .srv 文件 --- 上方的字段)
# response:要填写并返回的响应对象(对应 .srv 文件 --- 下方的字段)
self.srv = self.create_service(SolveIK, '/solve_ik', self.handle_request)
self.get_logger().info('IK Service 已启动,等待请求...')

def handle_request(self, request, response):
# request 对象的字段对应 SolveIK.srv 里 --- 上方的定义:x, y, l1, l2
self.get_logger().info(
f'收到请求: x={request.x:.3f}, y={request.y:.3f}, '
f'L1={request.l1}, L2={request.l2}'
)

try:
solutions = inverse_kinematics(request.x, request.y, request.l1, request.l2)
# 默认返回肘下解(theta2 > 0 的那组)
t1, t2 = solutions['elbow_down']
# 填写 response 对象的字段(对应 .srv 文件 --- 下方的定义)
response.success = True
response.theta1_deg = t1
response.theta2_deg = t2
response.message = f'elbow_down 解: θ1={t1:.2f}°, θ2={t2:.2f}°'
except ValueError as e:
response.success = False
response.theta1_deg = 0.0
response.theta2_deg = 0.0
response.message = str(e)

self.get_logger().info(f'响应: {response.message}')
# 返回 response,rclpy 会自动把它序列化并发回给客户端
return response


def main(args=None):
rclpy.init(args=args)
node = IKServiceNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

6.3.2 Service 客户端

vim $ROS_WS/src/my_robot_basics/my_robot_basics/ik_client_node.py
# my_robot_basics/ik_client_node.py
# 运行方式:ros2 run my_robot_basics ik_client_node
#
# ============================================================
# 节点目标:向 /solve_ik 服务发送一次 IK 请求,打印结果后退出
# 执行逻辑:
# 1. 创建 Service 客户端,连接 /solve_ik
# 2. 等待服务端上线(最多 5 秒)
# 3. 构造请求 → call_async() 异步发送
# 4. spin_until_future_complete() 阻塞等待响应
# 5. 打印结果,节点退出
# ============================================================

import sys
import rclpy
from rclpy.node import Node

from my_robot_interfaces.srv import SolveIK


class IKClientNode(Node):
def __init__(self):
super().__init__('ik_client_node')
# create_client(接口类型, 服务名称)
# 客户端和服务端的服务名称必须完全一致(包括前缀 '/')
self.client = self.create_client(SolveIK, '/solve_ik')

def send_request(self, x, y, l1=1.0, l2=0.8):
# wait_for_service():阻塞等待服务端上线,超时返回 False
# 如果服务端还没启动就直接 call,会立即失败
if not self.client.wait_for_service(timeout_sec=5.0):
self.get_logger().error('服务不可用,请先启动 ik_service_node')
return

# 构造请求对象,填写 .srv 文件 --- 上方定义的字段
request = SolveIK.Request()
request.x = x
request.y = y
request.l1 = l1
request.l2 = l2

# call_async():异步发送请求,立即返回一个 Future 对象
# Future 是"未来某时刻会有结果"的占位符,现在还没有值
future = self.client.call_async(request)

# spin_until_future_complete():让事件循环运转,直到 Future 有结果
# 这是在单线程节点里"同步等待异步结果"的标准写法
# 不能用 future.result() 直接取值,因为结果还没到
rclpy.spin_until_future_complete(self, future)

# Future 完成后,.result() 才有值,即服务端返回的 response 对象
response = future.result()
if response.success:
self.get_logger().info(
f'IK 成功: θ1={response.theta1_deg:.2f}°, θ2={response.theta2_deg:.2f}°'
)
else:
self.get_logger().warn(f'IK 失败: {response.message}')


def main(args=None):
rclpy.init(args=args)
node = IKClientNode()

# 从命令行读取目标点,默认 (1.2, 0.8)
x = float(sys.argv[1]) if len(sys.argv) > 1 else 1.2
y = float(sys.argv[2]) if len(sys.argv) > 2 else 0.8

node.send_request(x, y)
node.destroy_node()
rclpy.shutdown()

6.3.3 注册入口点

setup.pyconsole_scripts 里追加(当前完整内容):

'console_scripts': [
'hello_node = my_robot_basics.hello_node:main',
'kinematics_node = my_robot_basics.kinematics_node:main',
'kinematics_publisher = my_robot_basics.kinematics_publisher:main',
'kinematics_subscriber = my_robot_basics.kinematics_subscriber:main',
'ik_service_node = my_robot_basics.ik_service_node:main', # 新增
'ik_client_node = my_robot_basics.ik_client_node:main', # 新增
],

6.3.4 构建并运行

先构建接口包,再构建功能包:

cd $ROS_WS
colcon build --packages-select my_robot_interfaces
colcon build --packages-select my_robot_basics
source install/setup.bash

终端 1:启动服务端

ros2 run my_robot_basics ik_service_node

终端 2:调用客户端

ros2 run my_robot_basics ik_client_node 1.2 0.8

预期输出(终端 2):

[INFO] [...] [ik_client_node]: IK 成功: θ1=28.96°, θ2=62.18°

也可以直接用 CLI 调用,不需要启动客户端节点:

ros2 service call /solve_ik my_robot_interfaces/srv/SolveIK \
"{x: 1.2, y: 0.8, l1: 1.0, l2: 0.8}"

6.4 自定义 Action 接口

6.4.1 定义 .action 文件

mkdir -p $ROS_WS/src/my_robot_interfaces/action
vim $ROS_WS/src/my_robot_interfaces/action/MoveTo.action
# Goal:目标末端位置
float64 x
float64 y
---
# Result:最终结果
bool success
float64 final_theta1_deg
float64 final_theta2_deg
string message
---
# Feedback:执行进度(0.0 ~ 1.0)
float32 progress
string status

6.4.2 更新 CMakeLists.txt,加入 action 文件

vim $ROS_WS/src/my_robot_interfaces/CMakeLists.txt

rosidl_generate_interfaces 更新为同时包含 srv 和 action:

rosidl_generate_interfaces(${PROJECT_NAME}
"srv/SolveIK.srv"
"action/MoveTo.action"
)

重新构建接口包:

cd $ROS_WS
colcon build --packages-select my_robot_interfaces
source install/setup.bash

6.5 实现 MoveTo Action

6.5.1 Action 服务端

vim $ROS_WS/src/my_robot_basics/my_robot_basics/move_to_action_server.py
# my_robot_basics/move_to_action_server.py
# 运行方式:ros2 run my_robot_basics move_to_action_server
#
# ============================================================
# 节点目标:提供 /move_to Action 服务,接收目标末端位置后模拟
# 机械臂运动过程,期间持续发送进度反馈,完成后返回结果
# 执行逻辑:
# 1. __init__ 注册 ActionServer,绑定 execute_callback
# 2. 客户端发来 Goal → execute_callback 在独立线程中被调用
# 3. 先做 IK 可达性检查,不可达则 abort()
# 4. 可达则循环模拟运动,每步 publish_feedback() 发送进度
# 5. 完成后调用 succeed(),返回最终 Result
# 与 Service 的区别:Action 支持长时间执行 + 中途反馈 + 取消
# ============================================================

import time
import rclpy
from rclpy.node import Node
# ActionServer:在服务端注册动作,收到 Goal 后执行 execute_callback
from rclpy.action import ActionServer

# MoveTo 由 .action 文件生成:内含 Goal / Result / Feedback 三种消息类型
from my_robot_interfaces.action import MoveTo
from my_robot_basics.two_link_kinematics import inverse_kinematics


class MoveToActionServer(Node):
def __init__(self):
super().__init__('move_to_action_server')

# ActionServer(节点, 动作类型, 动作名, 执行回调)
# 动作名 '/move_to' 需与客户端 ActionClient 里的名称一致
# execute_callback:收到新 Goal 且被接受后,在独立线程中调用
# (与 Service 回调不同,Action 的执行回调可以 sleep/阻塞)
self._action_server = ActionServer(
self,
MoveTo,
'/move_to',
self.execute_callback,
)
self.get_logger().info('MoveTo Action Server 已启动...')

def execute_callback(self, goal_handle):
# goal_handle:本次目标的句柄,提供三个关键能力:
# goal_handle.request → 读取客户端发来的 Goal 字段
# goal_handle.publish_feedback(msg) → 向客户端发送进度
# goal_handle.succeed() / abort() / canceled() → 标记最终状态
x = goal_handle.request.x
y = goal_handle.request.y
self.get_logger().info(f'收到目标: ({x:.3f}, {y:.3f})')

# 先构造 Result 对象,成功和失败分支都要返回同一类型
result = MoveTo.Result()

# 可达性检查:不可达则 abort,客户端会收到"被中止"的状态
try:
solutions = inverse_kinematics(x, y, 1.0, 0.8)
except ValueError as e:
self.get_logger().warn(f'目标不可达: {e}')
# abort() 表示任务无法完成(区别于 canceled(),后者是客户端主动取消)
goal_handle.abort()
result.success = False
result.message = str(e)
return result

t1, t2 = solutions['elbow_down']

# 模拟运动过程:循环发送进度反馈
# 真实机器人里,这里会等待控制器到位信号,而不是 sleep
feedback_msg = MoveTo.Feedback()
steps = 5
for i in range(1, steps + 1):
# progress 是 0.0~1.0 的浮点数,客户端可用来显示进度条
feedback_msg.progress = float(i) / steps
feedback_msg.status = f'执行中 {i}/{steps}...'
# publish_feedback() 把 Feedback 消息推送给所有订阅了反馈的客户端
goal_handle.publish_feedback(feedback_msg)
self.get_logger().info(f'进度: {feedback_msg.progress:.0%}')
time.sleep(0.5)

# succeed() 标记任务成功完成,客户端的 result_callback 会被触发
# 必须调用 succeed()/abort()/canceled() 之一,否则客户端会一直等待
goal_handle.succeed()
result.success = True
result.final_theta1_deg = t1
result.final_theta2_deg = t2
result.message = f'到达目标: θ1={t1:.2f}°, θ2={t2:.2f}°'
self.get_logger().info(result.message)
return result


def main(args=None):
rclpy.init(args=args)
node = MoveToActionServer()
rclpy.spin(node) # 阻塞:处理 Action 请求、定时器、Topic 等回调
node.destroy_node()
rclpy.shutdown()

6.5.2 Action 客户端

vim $ROS_WS/src/my_robot_basics/my_robot_basics/move_to_action_client.py
# my_robot_basics/move_to_action_client.py
# 运行方式:ros2 run my_robot_basics move_to_action_client
#
# ============================================================
# 节点目标:向 /move_to Action 服务发送一个运动目标,实时打印
# 进度反馈,等待执行完成后打印最终结果
# 执行逻辑(全异步,靠回调链驱动):
# 1. send_goal_async() → 服务端响应"接不接受" → goal_response_callback
# 2. Goal 被接受 → get_result_async() → 等待执行完成 → result_callback
# 3. 执行期间服务端每次 publish_feedback → feedback_callback 被调用
# 4. result_callback 里打印结果,调用 rclpy.shutdown() 退出
# 注意:所有回调都由 spin() 调度,没有 spin() 则回调永远不会执行
# ============================================================

import sys
import rclpy
from rclpy.node import Node
# ActionClient:发 Goal、收 Feedback/Result;所有 API 均为异步(Future + 回调)
from rclpy.action import ActionClient

from my_robot_interfaces.action import MoveTo


class MoveToActionClient(Node):
def __init__(self):
super().__init__('move_to_action_client')
# ActionClient(节点, 动作类型, 动作名)
# 动作名必须与 ActionServer 里的名称完全一致
self._client = ActionClient(self, MoveTo, '/move_to')

def send_goal(self, x, y):
# 等待 Action Server 上线,最多 5 秒
if not self._client.wait_for_server(timeout_sec=5.0):
self.get_logger().error('Action Server 不可用')
return

# 构造 Goal 对象,填写 .action 文件第一段(--- 之前)的字段
goal = MoveTo.Goal()
goal.x = x
goal.y = y

self.get_logger().info(f'发送目标: ({x}, {y})')

# send_goal_async() 立即返回 Future,不阻塞主线程
# feedback_callback:服务端每次 publish_feedback 时由 rclpy 自动调用
# Future 完成的含义:服务端已响应"接不接受这个 Goal"(不是执行完成)
send_goal_future = self._client.send_goal_async(
goal,
feedback_callback=self.feedback_callback,
)
# add_done_callback:Future 完成时自动调用 goal_response_callback
# 这是 Action 客户端的标准异步链式写法
send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
# future.result() 返回 ClientGoalHandle,包含 accepted 字段
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().warn('Goal 被拒绝')
return
self.get_logger().info('Goal 已接受,等待结果...')

# get_result_async() 返回另一个 Future,在任务执行完成时触发
# 任务执行期间,feedback_callback 仍会持续被调用
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.result_callback)

def feedback_callback(self, feedback_msg):
# feedback_msg 是包装类型,.feedback 才是 MoveTo.Feedback 对象
# 包含 .progress(0.0~1.0)和 .status(字符串)字段
fb = feedback_msg.feedback
self.get_logger().info(f'[反馈] {fb.status} ({fb.progress:.0%})')

def result_callback(self, future):
# future.result() 是 GetResult 响应对象
# .result 才是 MoveTo.Result(success、final_theta1_deg 等字段)
result = future.result().result
if result.success:
self.get_logger().info(
f'执行完成: θ1={result.final_theta1_deg:.2f}°, '
f'θ2={result.final_theta2_deg:.2f}°'
)
else:
self.get_logger().warn(f'执行失败: {result.message}')
# 本示例发完一个 Goal 就退出;常驻节点通常不在这里 shutdown
rclpy.shutdown()


def main(args=None):
rclpy.init(args=args)
node = MoveToActionClient()

x = float(sys.argv[1]) if len(sys.argv) > 1 else 1.2
y = float(sys.argv[2]) if len(sys.argv) > 2 else 0.8

node.send_goal(x, y)
# spin() 是必须的:Future 完成事件、Feedback 消息、Result 回调
# 都需要事件循环来调度,没有 spin() 所有回调都不会执行
rclpy.spin(node)

6.5.3 注册入口点

setup.pyconsole_scripts 里追加(当前完整内容):

'console_scripts': [
'hello_node = my_robot_basics.hello_node:main',
'kinematics_node = my_robot_basics.kinematics_node:main',
'kinematics_publisher = my_robot_basics.kinematics_publisher:main',
'kinematics_subscriber = my_robot_basics.kinematics_subscriber:main',
'ik_service_node = my_robot_basics.ik_service_node:main',
'ik_client_node = my_robot_basics.ik_client_node:main',
'move_to_action_server = my_robot_basics.move_to_action_server:main', # 新增
'move_to_action_client = my_robot_basics.move_to_action_client:main', # 新增
],

6.5.4 构建并运行

cd $ROS_WS
colcon build --packages-select my_robot_basics
source install/setup.bash

终端 1

ros2 run my_robot_basics move_to_action_server

终端 2

ros2 run my_robot_basics move_to_action_client 1.2 0.8

预期输出(终端 2):

[INFO] [...] [move_to_action_client]: 发送目标: (1.2, 0.8)
[INFO] [...] [move_to_action_client]: Goal 已接受,等待结果...
[INFO] [...] [move_to_action_client]: [反馈] 执行中 1/5... (20%)
[INFO] [...] [move_to_action_client]: [反馈] 执行中 2/5... (40%)
[INFO] [...] [move_to_action_client]: [反馈] 执行中 3/5... (60%)
[INFO] [...] [move_to_action_client]: [反馈] 执行中 4/5... (80%)
[INFO] [...] [move_to_action_client]: [反馈] 执行中 5/5... (100%)
[INFO] [...] [move_to_action_client]: 执行完成: θ1=28.96°, θ2=62.18°

6.6 用 CLI 观察 Service 和 Action

前提:以下命令需要对应节点正在运行才能看到结果。/solve_ik 需要 ik_service_node 在线,/move_to 需要 move_to_action_server 在线。如果节点已经退出,ros2 service listros2 action list 里就不会出现这些名称——这是正常现象,不是安装问题。

建议在一个终端保持节点运行,另一个终端执行 CLI 命令:

# 终端1:启动服务端
cd $ROS_WS && source install/setup.bash
ros2 run my_robot_basics ik_service_node

# 终端2:执行以下 CLI 命令
# 查看所有 Service(能看到 /solve_ik 说明服务端在线)
ros2 service list

# 查看 Service 接口类型
ros2 service type /solve_ik

# 直接调用 Service
ros2 service call /solve_ik my_robot_interfaces/srv/SolveIK \
"{x: 1.0, y: 0.5, l1: 1.0, l2: 0.8}"

# 查看所有 Action(能看到 /move_to 说明 Action 服务端在线)
ros2 action list

# 查看 Action 详情
ros2 action info /move_to

# 直接发送 Action Goal
ros2 action send_goal /move_to my_robot_interfaces/action/MoveTo \
"{x: 1.0, y: 0.5}" --feedback

--feedback 参数会在终端实时打印 Feedback,非常适合调试。


6.7 常见错误

6.7.1 错误一:接口包没有先构建

ModuleNotFoundError: No module named 'my_robot_interfaces'

必须先构建接口包,再构建使用它的功能包:

colcon build --packages-select my_robot_interfaces
colcon build --packages-select my_robot_basics
source install/setup.bash

6.7.2 错误二:Service 调用卡住不返回

原因通常是服务端没有启动,或者服务端抛出异常没有返回 response。

检查:

# 确认服务存在
ros2 service list | grep solve_ik

# 确认服务端节点在运行
ros2 node list

6.7.3 错误三:Action Feedback 收不到

客户端必须在 send_goal_async 时传入 feedback_callback,否则 Feedback 会被丢弃:

# 错误:没有注册 feedback_callback
send_goal_future = self._client.send_goal_async(goal)

# 正确:注册 feedback_callback
send_goal_future = self._client.send_goal_async(
goal,
feedback_callback=self.feedback_callback,
)

6.8 练习题

6.8.1 练习 1:扩展 SolveIK 返回双解

修改 ik_service_node.py,让响应同时返回肘上解和肘下解的四个角度值,并更新 .srv 文件定义。

参考答案

SolveIK.srv 中增加字段:

bool success
float64 elbow_down_theta1
float64 elbow_down_theta2
float64 elbow_up_theta1
float64 elbow_up_theta2
string message

服务端回调中:

solutions = inverse_kinematics(request.x, request.y, request.l1, request.l2)
response.elbow_down_theta1, response.elbow_down_theta2 = solutions['elbow_down']
response.elbow_up_theta1, response.elbow_up_theta2 = solutions['elbow_up']
response.success = True

6.8.2 练习 2:用 CLI 测试不可达目标

# 目标点超出工作空间(L1+L2=1.8,距离 > 1.8)
ros2 service call /solve_ik my_robot_interfaces/srv/SolveIK \
"{x: 2.0, y: 0.0, l1: 1.0, l2: 0.8}"

观察响应中 success: falsemessage 字段的内容。


6.8.3 练习 3:说明三种通信方式的选择理由

用自己的话回答:

  1. 为什么 IK 求解用 Service 而不是 Topic?
  2. 为什么机械臂执行运动用 Action 而不是 Service?
  3. 什么情况下 Topic 比 Service 更合适?

本讲核心总结

概念一句话理解
Service一问一答,像函数调用,适合立即返回的短时计算
Action有 Goal/Feedback/Result,适合耗时任务,支持进度反馈和取消
.srv 文件--- 上面是请求,下面是响应
.action 文件三段:Goal / Result / Feedback,用 --- 分隔
接口包自定义接口必须放在独立的 ament_cmake 包里,先于功能包构建
CLI 验证ros2 service callros2 action send_goal --feedback 是最快的调试手段

参考代码

本讲对应的完整参考代码位于:

ros_ws/reference/src/my_robot_interfaces/
├── srv/SolveIK.srv ← 6.2 Service 接口定义
├── action/MoveTo.action ← 6.4 Action 接口定义
├── CMakeLists.txt ← rosidl 代码生成配置
└── package.xml

ros_ws/reference/src/my_robot_basics/my_robot_basics/
├── ik_service_node.py ← 6.3 IK Service 服务端
├── ik_client_node.py ← 6.3 IK Service 客户端
├── move_to_action_server.py ← 6.5 MoveTo Action 服务端
└── move_to_action_client.py ← 6.5 MoveTo Action 客户端

使用方式

# 接口包文件
cp $ROS_WS/reference/src/my_robot_interfaces/srv/SolveIK.srv \
$ROS_WS/src/my_robot_interfaces/srv/
cp $ROS_WS/reference/src/my_robot_interfaces/action/MoveTo.action \
$ROS_WS/src/my_robot_interfaces/action/

# 节点文件(4个)
for f in ik_service_node ik_client_node move_to_action_server move_to_action_client; do
cp $ROS_WS/reference/src/my_robot_basics/my_robot_basics/${f}.py \
$ROS_WS/src/my_robot_basics/my_robot_basics/
done

[注意] 接口包必须先于功能包构建:colcon build --packages-select my_robot_interfaces


下一讲预告

7. 参数系统与 Launch 文件

现在我们有了多个节点,每次启动都要开好几个终端,连杆长度也硬编码在代码里。下一讲我们会:

  • 把连杆长度、发布频率等配置提取为 ROS2 参数
  • 用 YAML 文件统一管理参数
  • 写一个 Launch 文件,一条命令启动整个系统
  • 理解参数和 Launch 如何让系统从"能跑"变成"能维护"