ROS2 实战教程
ROS2(Robot Operating System 2)是新一代机器人操作系统,解决了 ROS1 的实时性、安全性、分布式等局限。人形机器人作为复杂机器人系统,广泛采用 ROS2 作为软件框架。本文从基础到进阶,系统讲解 ROS2 的核心概念与人形机器人应用实战。
一、ROS2 基础
1.1 ROS2 架构
核心概念:
ROS2 核心架构
├── 节点(Node)
│ └── 最小执行单元,完成特定功能
│
├── 话题(Topic)
│ └── 发布/订阅模式,单向通信
│
├── 服务(Service)
│ └── 请求/响应模式,双向同步通信
│
├── 动作(Action)
│ └── 长时间运行任务,可取消、可反馈
│
├── 参数(Parameter)
│ └── 运行时配置
│
├── 启动文件(Launch)
│ └── 多节点启动配置
│
└── 包(Package)
└── 代码组织单元
DDS 中间件:
ROS2 使用 DDS(Data Distribution Service)
├── 特点
│ ├── 去中心化(无主节点)
│ ├── 实时性(QoS 可配置)
│ ├── 安全性(加密、认证)
│ └── 分布式(多机通信)
│
├── 实现
│ ├── Fast DDS(默认)
│ ├── Cyclone DDS
│ ├── Connext DDS
│ └── OpenDDS
│
└── QoS 策略
├── 可靠性:可靠/尽力而为
├── 持久性:保持最后/瞬态
├── 历史深度:消息缓存数量
└── 截止时间:消息超时
1.2 安装 ROS2
Ubuntu 22.04 安装 Humble:
# 1. 设置区域
sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
# 2. 添加源
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key \
-o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) \
signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] \
http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) \
main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null
# 3. 安装
sudo apt update
sudo apt install ros-humble-desktop # 完整版
# 或
sudo apt install ros-humble-ros-base # 基础版
# 4. 环境配置
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
source ~/.bashrc
# 5. 验证
ros2 --version
Windows 10/11 安装:
# 1. 下载 ROS2 Humble 安装包
# https://github.com/ros2/ros2/releases
# 2. 安装(双击 .msi)
# 3. 环境配置(PowerShell)
echo "Call C:\dev\ros2_humble\local_setup.bat" >> $PROFILE
# 4. 验证
ros2 --version
1.3 工作空间
创建工作空间:
# 创建工作空间
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
# 初始化
colcon build
source install/setup.bash
# 目录结构
ros2_ws/
├── src/ # 源代码
├── build/ # 编译中间文件
├── install/ # 安装文件
└── log/ # 日志
创建 ROS2 包:
cd ~/ros2_ws/src
# 创建 Python 包
ros2 pkg create --build-type ament_python my_robot_package
# 创建 C++ 包
ros2 pkg create --build-type ament_cmake my_robot_package
# 包结构(Python)
my_robot_package/
├── my_robot_package/
│ ├── __init__.py
│ └── my_node.py # 节点代码
├── test/
├── package.xml # 包描述
├── setup.cfg
└── setup.py # 安装配置
二、ROS2 节点
2.1 Python 节点
最小节点:
#!/usr/bin/env python3
# my_robot_package/my_node.py
import rclpy
from rclpy.node import Node
class MyNode(Node):
"""
最小 ROS2 节点
"""
def __init__(self):
super().__init__('my_node')
self.get_logger().info('My Node Started!')
def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
运行节点:
# 编译
cd ~/ros2_ws
colcon build
# 运行
source install/setup.bash
ros2 run my_robot_package my_node
2.2 发布器节点
#!/usr/bin/env python3
# 发布器节点
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from sensor_msgs.msg import JointState
import numpy as np
class JointStatePublisher(Node):
"""
关节状态发布器
模拟人形机器人关节角度
"""
def __init__(self):
super().__init__('joint_state_publisher')
# 创建发布器
self.publisher_ = self.create_publisher(
JointState,
'/joint_states',
10 # QoS 历史深度
)
# 定时器(100Hz)
self.timer = self.create_timer(0.01, self.timer_callback)
# 关节名称
self.joint_names = [
'left_hip_yaw', 'left_hip_pitch', 'left_hip_roll',
'left_knee', 'left_ankle_pitch', 'left_ankle_roll',
'right_hip_yaw', 'right_hip_pitch', 'right_hip_roll',
'right_knee', 'right_ankle_pitch', 'right_ankle_roll',
# ... 更多关节
]
self.t = 0 # 时间
def timer_callback(self):
# 创建消息
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = self.joint_names
# 生成正弦波关节角度(模拟行走)
self.t += 0.01
msg.position = [
0.1 * np.sin(2 * np.pi * 1.0 * self.t), # 左髋偏航
0.2 * np.sin(2 * np.pi * 1.0 * self.t), # 左髋俯仰
0.05 * np.cos(2 * np.pi * 1.0 * self.t), # 左髋滚转
0.3 * np.sin(2 * np.pi * 1.0 * self.t), # 左膝
# ... 更多关节
] * len(self.joint_names)
msg.velocity = [0.0] * len(self.joint_names)
msg.effort = [0.0] * len(self.joint_names)
# 发布
self.publisher_.publish(msg)
# self.get_logger().debug(f'Publishing: {msg.position[0]:.3f}')
def main(args=None):
rclpy.init(args=args)
node = JointStatePublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
2.3 订阅器节点
#!/usr/bin/env python3
# 订阅器节点
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
import numpy as np
class JointStateSubscriber(Node):
"""
关节状态订阅器
监控关节角度
"""
def __init__(self):
super().__init__('joint_state_subscriber')
# 创建订阅器
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
10
)
# 关节限位
self.joint_limits = {
'left_hip_pitch': (-0.5, 2.0),
'left_knee': (0.0, 2.5),
# ...
}
self.latest_states = {}
def joint_states_callback(self, msg):
"""
回调函数
"""
# 存储最新状态
for name, position in zip(msg.name, msg.position):
self.latest_states[name] = position
# 检查限位
self.check_limits()
# 打印
# self.get_logger().info(
# f'Received {len(msg.name)} joint states'
# )
def check_limits(self):
"""
检查关节是否超限
"""
for name, (min_val, max_val) in self.joint_limits.items():
if name in self.latest_states:
pos = self.latest_states[name]
if pos < min_val or pos > max_val:
self.get_logger().warn(
f'Joint {name} out of limits: {pos:.3f}'
)
def main(args=None):
rclpy.init(args=args)
node = JointStateSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
三、话题通信
3.1 自定义消息
定义消息:
# my_robot_package/msg/JointCommand.msg
string joint_name
float64 position
float64 velocity
float64 effort
uint8 mode
package.xml 添加依赖:
<depend>rosidl_default_generators</depend>
<member_of_group>rosidl_interface_packages</member_of_group>
setup.py 配置:
from setuptools import setup
package_name = 'my_robot_package'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
install_requires=['setuptools'],
zip_safe=True,
author='Your Name',
author_email='your.email@example.com',
description='Humanoid Robot Package',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'joint_controller = my_robot_package.joint_controller:main',
],
},
)
3.2 话题示例
命令行操作:
# 查看话题列表
ros2 topic list
# 查看话题类型
ros2 topic type /joint_states
# 查看话题信息
ros2 topic info /joint_states
# 发布消息
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
'{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.5}}'
# 持续发布
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
'{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.5}}' \
-r 10
# 查看话题内容
ros2 topic echo /joint_states
# 查看话题带宽
ros2 topic hz /joint_states
ros2 topic bw /joint_states
四、服务通信
4.1 自定义服务
定义服务:
# my_robot_package/srv/Walk.srv
float64 step_length
float64 step_width
float32 duration
---
bool success
string message
4.2 服务服务器
#!/usr/bin/env python3
# 服务服务器
import rclpy
from rclpy.node import Node
from my_robot_package.srv import Walk
class WalkService(Node):
"""
行走服务服务器
"""
def __init__(self):
super().__init__('walk_service')
# 创建服务
self.server = self.create_service(
Walk,
'walk',
self.walk_callback
)
self.get_logger().info('Walk Service Ready')
def walk_callback(self, request, response):
"""
服务回调
"""
step_length = request.step_length
step_width = request.step_width
duration = request.duration
# 验证参数
if step_length < 0 or step_length > 1.0:
response.success = False
response.message = 'Invalid step length'
return response
# 执行行走(简化)
self.get_logger().info(
f'Walking: length={step_length}, width={step_width}, '
f'duration={duration}s'
)
# 模拟行走
# 实际应调用运动控制器
response.success = True
response.message = 'Walking started'
return response
def main(args=None):
rclpy.init(args=args)
node = WalkService()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
4.3 服务客户端
#!/usr/bin/env python3
# 服务客户端
import rclpy
from rclpy.node import Node
from my_robot_package.srv import Walk
class WalkClient(Node):
"""
行走服务客户端
"""
def __init__(self):
super().__init__('walk_client')
# 创建客户端
self.client = self.create_client(Walk, 'walk')
# 等待服务
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for walk service...')
def send_request(self, step_length, step_width, duration):
"""
发送请求
"""
request = Walk.Request()
request.step_length = step_length
request.step_width = step_width
request.duration = float(duration)
# 异步调用
future = self.client.call_async(request)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
response = future.result()
self.get_logger().info(
f'Result: {response.success}, {response.message}'
)
return response
else:
self.get_logger().error('Service call failed')
return None
def main(args=None):
rclpy.init(args=args)
node = WalkClient()
# 发送请求
node.send_request(
step_length=0.5,
step_width=0.2,
duration=10
)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
五、动作通信
5.1 自定义动作
定义动作:
# my_robot_package/action/WalkTo.action
float64 x
float64 y
float64 yaw
---
float64 distance_remaining
float32 estimated_time
---
string status
float32 progress
5.2 动作服务器
#!/usr/bin/env python3
# 动作服务器
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from my_robot_package.action import WalkTo
import time
class WalkToServer(Node):
"""
行走至目标动作服务器
"""
def __init__(self):
super().__init__('walk_to_server')
# 创建动作服务器
self.server = ActionServer(
self,
WalkTo,
'walk_to',
self.walk_to_callback
)
self.get_logger().info('WalkTo Action Server Ready')
def walk_to_callback(self, goal_handle):
"""
动作回调
"""
self.get_logger().info('Received walk goal')
# 获取目标
target_x = goal_handle.request.x
target_y = goal_handle.request.y
# 执行动作
feedback_msg = WalkTo.Feedback()
result = WalkTo.Result()
# 模拟行走过程
distance = 10.0 # 初始距离
for i in range(10):
# 检查是否取消
if goal_handle.is_cancel_requested:
self.get_logger().info('Goal cancelled')
goal_handle.canceled()
result.distance_remaining = distance
return result
# 更新进度
distance -= 1.0
feedback_msg.distance_remaining = distance
feedback_msg.estimated_time = distance * 2.0
goal_handle.publish_feedback(feedback_msg)
self.get_logger().debug(f'Distance remaining: {distance}')
time.sleep(1) # 模拟行走
# 完成
goal_handle.succeed()
result.distance_remaining = 0.0
self.get_logger().info('Goal succeeded')
return result
def main(args=None):
rclpy.init(args=args)
node = WalkToServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
5.3 动作客户端
#!/usr/bin/env python3
# 动作客户端
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from my_robot_package.action import WalkTo
class WalkToClient(Node):
"""
行走至目标动作客户端
"""
def __init__(self):
super().__init__('walk_to_client')
# 创建动作客户端
self.client = ActionClient(
self,
WalkTo,
'walk_to'
)
self.get_logger().info('Waiting for action server...')
self.client.wait_for_server()
def send_goal(self, x, y, yaw):
"""
发送目标
"""
goal_msg = WalkTo.Goal()
goal_msg.x = x
goal_msg.y = y
goal_msg.yaw = yaw
# 异步发送
future = self.client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""
目标响应回调
"""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted')
# 等待结果
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.result_callback)
def feedback_callback(self, feedback_msg):
"""
反馈回调
"""
feedback = feedback_msg.feedback
self.get_logger().info(
f'Distance remaining: {feedback.distance_remaining:.2f}m'
)
def result_callback(self, future):
"""
结果回调
"""
result = future.result().result
self.get_logger().info(
f'Final distance: {result.distance_remaining}'
)
def main(args=None):
rclpy.init(args=args)
node = WalkToClient()
# 发送目标
node.send_goal(x=5.0, y=3.0, yaw=0.5)
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
六、人形机器人应用
6.1 典型节点架构
人形机器人 ROS2 节点架构
┌─────────────────────────────────────────┐
│ 感知层 │
│ ├── /camera/image_raw(相机驱动) │
│ ├── /imu/data(IMU 驱动) │
│ ├── /joint_states(编码器读取) │
│ └── /force_torque(力传感器) │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ 规划层 │
│ ├── /slam/map(SLAM) │
│ ├── /navigation/path(路径规划) │
│ └── /motion/plan(运动规划) │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ 控制层 │
│ ├── /joint_commands(关节控制) │
│ ├── /balance(平衡控制) │
│ └── /wbc(全身控制) │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ 执行层 │
│ └── /motor_commands(电机驱动) │
└─────────────────────────────────────────┘
6.2 Launch 文件
# my_robot_package/launch/humanoid.launch.py
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
# 参数
use_sim_time = LaunchConfiguration('use_sim_time', default='false')
return LaunchDescription([
# 参数声明
DeclareLaunchArgument(
'use_sim_time',
default_value='false',
description='Use simulation clock'
),
# 关节状态发布器
Node(
package='my_robot_package',
executable='joint_state_publisher',
name='joint_state_publisher',
parameters=[{'use_sim_time': use_sim_time}]
),
# 关节控制器
Node(
package='my_robot_package',
executable='joint_controller',
name='joint_controller',
parameters=[{'use_sim_time': use_sim_time}]
),
# 平衡控制器
Node(
package='my_robot_package',
executable='balance_controller',
name='balance_controller',
parameters=[{'use_sim_time': use_sim_time}]
),
# RViz 可视化
Node(
package='rviz2',
executable='rviz2',
name='rviz2',
arguments=['-d', 'path/to/config.rviz']
),
])
运行 Launch:
ros2 launch my_robot_package humanoid.launch.py
6.3 URDF 模型
<!-- my_robot_package/urdf/humanoid.urdf -->
<?xml version="1.0"?>
<robot name="humanoid">
<!-- 基座链接 -->
<link name="base_link">
<visual>
<geometry>
<box size="0.2 0.3 0.5"/>
</geometry>
<origin xyz="0 0 0.5"/>
</visual>
<collision>
<geometry>
<box size="0.2 0.3 0.5"/>
</geometry>
</collision>
<inertial>
<mass value="10"/>
<inertia ixx="1.0" ixy="0.0" ixz="0.0"
iyy="1.0" iyz="0.0" izz="1.0"/>
</inertial>
</link>
<!-- 左大腿 -->
<link name="left_thigh">
<visual>
<geometry>
<cylinder length="0.4" radius="0.08"/>
</geometry>
</visual>
</link>
<!-- 左髋关节 -->
<joint name="left_hip_pitch" type="revolute">
<parent link="base_link"/>
<child link="left_thigh"/>
<origin xyz="0 -0.15 0" rpy="0 0 0"/>
<axis xyz="0 1 0"/>
<limit lower="-0.5" upper="2.0" effort="100" velocity="5.0"/>
</joint>
<!-- 更多链接和关节... -->
</robot>
七、调试工具
7.1 RQT
# 安装
sudo apt install ros-humble-rqt*
# 启动
rqt
# 常用视图
rqt_graph # 节点图
rqt_plot # 数据绘图
rqt_console # 日志控制台
rqt_robot_monitor # 机器人监控
7.2 RViz2
# 启动
rviz2
# 配置
# - 添加 RobotModel 显示 URDF
# - 添加 TF 显示坐标系
# - 添加 PointStamped 显示点击
7.3 ROS2 命令
# 节点
ros2 node list
ros2 node info /node_name
# 话题
ros2 topic list
ros2 topic echo /topic_name
ros2 topic pub /topic_name ...
# 服务
ros2 service list
ros2 service call /service_name ...
# 动作
ros2 action list
ros2 action send /action_name ...
# 参数
ros2 param list
ros2 param get /node param_name
ros2 param set /node param_name value
# 包
ros2 pkg list
ros2 pkg executables
八、总结
8.1 核心要点
- 节点是基础:最小执行单元
- 话题最常用:传感器数据发布
- 服务用于请求:同步调用
- 动作用于长任务:可取消、可反馈
- DDS 是核心:去中心化通信
8.2 学习资源
- 官方文档:https://docs.ros.org
- 教程:ROS2 Foxy/Humble Tutorials
- 书籍:《ROS2 机器人编程》
- 实践:从简单节点开始,逐步复杂
ROS2 是人形机器人软件开发的标准框架。掌握 ROS2,是进入机器人行业的必备技能。从基础节点开始,逐步掌握话题、服务、动作,最终能够构建复杂的人形机器人系统。
参考资料:
- ROS2 官方文档
- ROS2 入门教程
- 人形机器人开源项目