Skip to content
清晨的一缕阳光
返回

ROS2 实战教程

ROS2 实战教程

ROS2(Robot Operating System 2)是新一代机器人操作系统,解决了 ROS1 的实时性、安全性、分布式等局限。人形机器人作为复杂机器人系统,广泛采用 ROS2 作为软件框架。本文从基础到进阶,系统讲解 ROS2 的核心概念与人形机器人应用实战。

一、ROS2 基础

1.1 ROS2 架构

核心概念

ROS2 核心架构

├── 节点(Node)
│   └── 最小执行单元,完成特定功能

├── 话题(Topic)
│   └── 发布/订阅模式,单向通信

├── 服务(Service)
│   └── 请求/响应模式,双向同步通信

├── 动作(Action)
│   └── 长时间运行任务,可取消、可反馈

├── 参数(Parameter)
│   └── 运行时配置

├── 启动文件(Launch)
│   └── 多节点启动配置

└── 包(Package)
    └── 代码组织单元

DDS 中间件

ROS2 使用 DDS(Data Distribution Service)

├── 特点
│   ├── 去中心化(无主节点)
│   ├── 实时性(QoS 可配置)
│   ├── 安全性(加密、认证)
│   └── 分布式(多机通信)

├── 实现
│   ├── Fast DDS(默认)
│   ├── Cyclone DDS
│   ├── Connext DDS
│   └── OpenDDS

└── QoS 策略
    ├── 可靠性:可靠/尽力而为
    ├── 持久性:保持最后/瞬态
    ├── 历史深度:消息缓存数量
    └── 截止时间:消息超时

1.2 安装 ROS2

Ubuntu 22.04 安装 Humble

# 1. 设置区域
sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8

# 2. 添加源
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key \
  -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) \
  signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] \
  http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) \
  main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null

# 3. 安装
sudo apt update
sudo apt install ros-humble-desktop  # 完整版
# 或
sudo apt install ros-humble-ros-base  # 基础版

# 4. 环境配置
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
source ~/.bashrc

# 5. 验证
ros2 --version

Windows 10/11 安装

# 1. 下载 ROS2 Humble 安装包
# https://github.com/ros2/ros2/releases

# 2. 安装(双击 .msi)

# 3. 环境配置(PowerShell)
echo "Call C:\dev\ros2_humble\local_setup.bat" >> $PROFILE

# 4. 验证
ros2 --version

1.3 工作空间

创建工作空间

# 创建工作空间
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws

# 初始化
colcon build
source install/setup.bash

# 目录结构
ros2_ws/
├── src/           # 源代码
├── build/         # 编译中间文件
├── install/       # 安装文件
└── log/           # 日志

创建 ROS2 包

cd ~/ros2_ws/src

# 创建 Python 包
ros2 pkg create --build-type ament_python my_robot_package

# 创建 C++ 包
ros2 pkg create --build-type ament_cmake my_robot_package

# 包结构(Python)
my_robot_package/
├── my_robot_package/
   ├── __init__.py
   └── my_node.py      # 节点代码
├── test/
├── package.xml         # 包描述
├── setup.cfg
└── setup.py            # 安装配置

二、ROS2 节点

2.1 Python 节点

最小节点

#!/usr/bin/env python3
# my_robot_package/my_node.py

import rclpy
from rclpy.node import Node

class MyNode(Node):
    """
    最小 ROS2 节点
    """
    def __init__(self):
        super().__init__('my_node')
        self.get_logger().info('My Node Started!')

def main(args=None):
    rclpy.init(args=args)
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

运行节点

# 编译
cd ~/ros2_ws
colcon build

# 运行
source install/setup.bash
ros2 run my_robot_package my_node

2.2 发布器节点

#!/usr/bin/env python3
# 发布器节点

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from sensor_msgs.msg import JointState
import numpy as np

class JointStatePublisher(Node):
    """
    关节状态发布器
    模拟人形机器人关节角度
    """
    def __init__(self):
        super().__init__('joint_state_publisher')
        
        # 创建发布器
        self.publisher_ = self.create_publisher(
            JointState,
            '/joint_states',
            10  # QoS 历史深度
        )
        
        # 定时器(100Hz)
        self.timer = self.create_timer(0.01, self.timer_callback)
        
        # 关节名称
        self.joint_names = [
            'left_hip_yaw', 'left_hip_pitch', 'left_hip_roll',
            'left_knee', 'left_ankle_pitch', 'left_ankle_roll',
            'right_hip_yaw', 'right_hip_pitch', 'right_hip_roll',
            'right_knee', 'right_ankle_pitch', 'right_ankle_roll',
            # ... 更多关节
        ]
        
        self.t = 0  # 时间
    
    def timer_callback(self):
        # 创建消息
        msg = JointState()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.name = self.joint_names
        
        # 生成正弦波关节角度(模拟行走)
        self.t += 0.01
        msg.position = [
            0.1 * np.sin(2 * np.pi * 1.0 * self.t),  # 左髋偏航
            0.2 * np.sin(2 * np.pi * 1.0 * self.t),  # 左髋俯仰
            0.05 * np.cos(2 * np.pi * 1.0 * self.t), # 左髋滚转
            0.3 * np.sin(2 * np.pi * 1.0 * self.t),  # 左膝
            # ... 更多关节
        ] * len(self.joint_names)
        
        msg.velocity = [0.0] * len(self.joint_names)
        msg.effort = [0.0] * len(self.joint_names)
        
        # 发布
        self.publisher_.publish(msg)
        # self.get_logger().debug(f'Publishing: {msg.position[0]:.3f}')

def main(args=None):
    rclpy.init(args=args)
    node = JointStatePublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

2.3 订阅器节点

#!/usr/bin/env python3
# 订阅器节点

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
import numpy as np

class JointStateSubscriber(Node):
    """
    关节状态订阅器
    监控关节角度
    """
    def __init__(self):
        super().__init__('joint_state_subscriber')
        
        # 创建订阅器
        self.subscription = self.create_subscription(
            JointState,
            '/joint_states',
            self.joint_states_callback,
            10
        )
        
        # 关节限位
        self.joint_limits = {
            'left_hip_pitch': (-0.5, 2.0),
            'left_knee': (0.0, 2.5),
            # ...
        }
        
        self.latest_states = {}
    
    def joint_states_callback(self, msg):
        """
        回调函数
        """
        # 存储最新状态
        for name, position in zip(msg.name, msg.position):
            self.latest_states[name] = position
        
        # 检查限位
        self.check_limits()
        
        # 打印
        # self.get_logger().info(
        #     f'Received {len(msg.name)} joint states'
        # )
    
    def check_limits(self):
        """
        检查关节是否超限
        """
        for name, (min_val, max_val) in self.joint_limits.items():
            if name in self.latest_states:
                pos = self.latest_states[name]
                if pos < min_val or pos > max_val:
                    self.get_logger().warn(
                        f'Joint {name} out of limits: {pos:.3f}'
                    )

def main(args=None):
    rclpy.init(args=args)
    node = JointStateSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

三、话题通信

3.1 自定义消息

定义消息

# my_robot_package/msg/JointCommand.msg

string joint_name
float64 position
float64 velocity
float64 effort
uint8 mode

package.xml 添加依赖

<depend>rosidl_default_generators</depend>
<member_of_group>rosidl_interface_packages</member_of_group>

setup.py 配置

from setuptools import setup

package_name = 'my_robot_package'

setup(
    name=package_name,
    version='0.1.0',
    packages=[package_name],
    install_requires=['setuptools'],
    zip_safe=True,
    author='Your Name',
    author_email='your.email@example.com',
    description='Humanoid Robot Package',
    license='Apache License 2.0',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'joint_controller = my_robot_package.joint_controller:main',
        ],
    },
)

3.2 话题示例

命令行操作

# 查看话题列表
ros2 topic list

# 查看话题类型
ros2 topic type /joint_states

# 查看话题信息
ros2 topic info /joint_states

# 发布消息
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
  '{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.5}}'

# 持续发布
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
  '{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.5}}' \
  -r 10

# 查看话题内容
ros2 topic echo /joint_states

# 查看话题带宽
ros2 topic hz /joint_states
ros2 topic bw /joint_states

四、服务通信

4.1 自定义服务

定义服务

# my_robot_package/srv/Walk.srv

float64 step_length
float64 step_width
float32 duration
---
bool success
string message

4.2 服务服务器

#!/usr/bin/env python3
# 服务服务器

import rclpy
from rclpy.node import Node
from my_robot_package.srv import Walk

class WalkService(Node):
    """
    行走服务服务器
    """
    def __init__(self):
        super().__init__('walk_service')
        
        # 创建服务
        self.server = self.create_service(
            Walk,
            'walk',
            self.walk_callback
        )
        
        self.get_logger().info('Walk Service Ready')
    
    def walk_callback(self, request, response):
        """
        服务回调
        """
        step_length = request.step_length
        step_width = request.step_width
        duration = request.duration
        
        # 验证参数
        if step_length < 0 or step_length > 1.0:
            response.success = False
            response.message = 'Invalid step length'
            return response
        
        # 执行行走(简化)
        self.get_logger().info(
            f'Walking: length={step_length}, width={step_width}, '
            f'duration={duration}s'
        )
        
        # 模拟行走
        # 实际应调用运动控制器
        
        response.success = True
        response.message = 'Walking started'
        
        return response

def main(args=None):
    rclpy.init(args=args)
    node = WalkService()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

4.3 服务客户端

#!/usr/bin/env python3
# 服务客户端

import rclpy
from rclpy.node import Node
from my_robot_package.srv import Walk

class WalkClient(Node):
    """
    行走服务客户端
    """
    def __init__(self):
        super().__init__('walk_client')
        
        # 创建客户端
        self.client = self.create_client(Walk, 'walk')
        
        # 等待服务
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Waiting for walk service...')
    
    def send_request(self, step_length, step_width, duration):
        """
        发送请求
        """
        request = Walk.Request()
        request.step_length = step_length
        request.step_width = step_width
        request.duration = float(duration)
        
        # 异步调用
        future = self.client.call_async(request)
        rclpy.spin_until_future_complete(self, future)
        
        if future.result() is not None:
            response = future.result()
            self.get_logger().info(
                f'Result: {response.success}, {response.message}'
            )
            return response
        else:
            self.get_logger().error('Service call failed')
            return None

def main(args=None):
    rclpy.init(args=args)
    node = WalkClient()
    
    # 发送请求
    node.send_request(
        step_length=0.5,
        step_width=0.2,
        duration=10
    )
    
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

五、动作通信

5.1 自定义动作

定义动作

# my_robot_package/action/WalkTo.action

float64 x
float64 y
float64 yaw
---
float64 distance_remaining
float32 estimated_time
---
string status
float32 progress

5.2 动作服务器

#!/usr/bin/env python3
# 动作服务器

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from my_robot_package.action import WalkTo
import time

class WalkToServer(Node):
    """
    行走至目标动作服务器
    """
    def __init__(self):
        super().__init__('walk_to_server')
        
        # 创建动作服务器
        self.server = ActionServer(
            self,
            WalkTo,
            'walk_to',
            self.walk_to_callback
        )
        
        self.get_logger().info('WalkTo Action Server Ready')
    
    def walk_to_callback(self, goal_handle):
        """
        动作回调
        """
        self.get_logger().info('Received walk goal')
        
        # 获取目标
        target_x = goal_handle.request.x
        target_y = goal_handle.request.y
        
        # 执行动作
        feedback_msg = WalkTo.Feedback()
        result = WalkTo.Result()
        
        # 模拟行走过程
        distance = 10.0  # 初始距离
        for i in range(10):
            # 检查是否取消
            if goal_handle.is_cancel_requested:
                self.get_logger().info('Goal cancelled')
                goal_handle.canceled()
                result.distance_remaining = distance
                return result
            
            # 更新进度
            distance -= 1.0
            feedback_msg.distance_remaining = distance
            feedback_msg.estimated_time = distance * 2.0
            goal_handle.publish_feedback(feedback_msg)
            
            self.get_logger().debug(f'Distance remaining: {distance}')
            
            time.sleep(1)  # 模拟行走
        
        # 完成
        goal_handle.succeed()
        result.distance_remaining = 0.0
        self.get_logger().info('Goal succeeded')
        
        return result

def main(args=None):
    rclpy.init(args=args)
    node = WalkToServer()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

5.3 动作客户端

#!/usr/bin/env python3
# 动作客户端

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from my_robot_package.action import WalkTo

class WalkToClient(Node):
    """
    行走至目标动作客户端
    """
    def __init__(self):
        super().__init__('walk_to_client')
        
        # 创建动作客户端
        self.client = ActionClient(
            self,
            WalkTo,
            'walk_to'
        )
        
        self.get_logger().info('Waiting for action server...')
        self.client.wait_for_server()
    
    def send_goal(self, x, y, yaw):
        """
        发送目标
        """
        goal_msg = WalkTo.Goal()
        goal_msg.x = x
        goal_msg.y = y
        goal_msg.yaw = yaw
        
        # 异步发送
        future = self.client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )
        
        future.add_done_callback(self.goal_response_callback)
    
    def goal_response_callback(self, future):
        """
        目标响应回调
        """
        goal_handle = future.result()
        
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return
        
        self.get_logger().info('Goal accepted')
        
        # 等待结果
        result_future = goal_handle.get_result_async()
        result_future.add_done_callback(self.result_callback)
    
    def feedback_callback(self, feedback_msg):
        """
        反馈回调
        """
        feedback = feedback_msg.feedback
        self.get_logger().info(
            f'Distance remaining: {feedback.distance_remaining:.2f}m'
        )
    
    def result_callback(self, future):
        """
        结果回调
        """
        result = future.result().result
        self.get_logger().info(
            f'Final distance: {result.distance_remaining}'
        )

def main(args=None):
    rclpy.init(args=args)
    node = WalkToClient()
    
    # 发送目标
    node.send_goal(x=5.0, y=3.0, yaw=0.5)
    
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

六、人形机器人应用

6.1 典型节点架构

人形机器人 ROS2 节点架构

┌─────────────────────────────────────────┐
│ 感知层                                  │
│ ├── /camera/image_raw(相机驱动)       │
│ ├── /imu/data(IMU 驱动)                │
│ ├── /joint_states(编码器读取)         │
│ └── /force_torque(力传感器)           │
└──────────────┬──────────────────────────┘

┌──────────────▼──────────────────────────┐
│ 规划层                                  │
│ ├── /slam/map(SLAM)                   │
│ ├── /navigation/path(路径规划)        │
│ └── /motion/plan(运动规划)            │
└──────────────┬──────────────────────────┘

┌──────────────▼──────────────────────────┐
│ 控制层                                  │
│ ├── /joint_commands(关节控制)         │
│ ├── /balance(平衡控制)                │
│ └── /wbc(全身控制)                    │
└──────────────┬──────────────────────────┘

┌──────────────▼──────────────────────────┐
│ 执行层                                  │
│ └── /motor_commands(电机驱动)         │
└─────────────────────────────────────────┘

6.2 Launch 文件

# my_robot_package/launch/humanoid.launch.py

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node

def generate_launch_description():
    # 参数
    use_sim_time = LaunchConfiguration('use_sim_time', default='false')
    
    return LaunchDescription([
        # 参数声明
        DeclareLaunchArgument(
            'use_sim_time',
            default_value='false',
            description='Use simulation clock'
        ),
        
        # 关节状态发布器
        Node(
            package='my_robot_package',
            executable='joint_state_publisher',
            name='joint_state_publisher',
            parameters=[{'use_sim_time': use_sim_time}]
        ),
        
        # 关节控制器
        Node(
            package='my_robot_package',
            executable='joint_controller',
            name='joint_controller',
            parameters=[{'use_sim_time': use_sim_time}]
        ),
        
        # 平衡控制器
        Node(
            package='my_robot_package',
            executable='balance_controller',
            name='balance_controller',
            parameters=[{'use_sim_time': use_sim_time}]
        ),
        
        # RViz 可视化
        Node(
            package='rviz2',
            executable='rviz2',
            name='rviz2',
            arguments=['-d', 'path/to/config.rviz']
        ),
    ])

运行 Launch

ros2 launch my_robot_package humanoid.launch.py

6.3 URDF 模型

<!-- my_robot_package/urdf/humanoid.urdf -->

<?xml version="1.0"?>
<robot name="humanoid">
  
  <!-- 基座链接 -->
  <link name="base_link">
    <visual>
      <geometry>
        <box size="0.2 0.3 0.5"/>
      </geometry>
      <origin xyz="0 0 0.5"/>
    </visual>
    <collision>
      <geometry>
        <box size="0.2 0.3 0.5"/>
      </geometry>
    </collision>
    <inertial>
      <mass value="10"/>
      <inertia ixx="1.0" ixy="0.0" ixz="0.0" 
               iyy="1.0" iyz="0.0" izz="1.0"/>
    </inertial>
  </link>
  
  <!-- 左大腿 -->
  <link name="left_thigh">
    <visual>
      <geometry>
        <cylinder length="0.4" radius="0.08"/>
      </geometry>
    </visual>
  </link>
  
  <!-- 左髋关节 -->
  <joint name="left_hip_pitch" type="revolute">
    <parent link="base_link"/>
    <child link="left_thigh"/>
    <origin xyz="0 -0.15 0" rpy="0 0 0"/>
    <axis xyz="0 1 0"/>
    <limit lower="-0.5" upper="2.0" effort="100" velocity="5.0"/>
  </joint>
  
  <!-- 更多链接和关节... -->
  
</robot>

七、调试工具

7.1 RQT

# 安装
sudo apt install ros-humble-rqt*

# 启动
rqt

# 常用视图
rqt_graph      # 节点图
rqt_plot       # 数据绘图
rqt_console    # 日志控制台
rqt_robot_monitor  # 机器人监控

7.2 RViz2

# 启动
rviz2

# 配置
# - 添加 RobotModel 显示 URDF
# - 添加 TF 显示坐标系
# - 添加 PointStamped 显示点击

7.3 ROS2 命令

# 节点
ros2 node list
ros2 node info /node_name

# 话题
ros2 topic list
ros2 topic echo /topic_name
ros2 topic pub /topic_name ...

# 服务
ros2 service list
ros2 service call /service_name ...

# 动作
ros2 action list
ros2 action send /action_name ...

# 参数
ros2 param list
ros2 param get /node param_name
ros2 param set /node param_name value

# 包
ros2 pkg list
ros2 pkg executables

八、总结

8.1 核心要点

  1. 节点是基础:最小执行单元
  2. 话题最常用:传感器数据发布
  3. 服务用于请求:同步调用
  4. 动作用于长任务:可取消、可反馈
  5. DDS 是核心:去中心化通信

8.2 学习资源

ROS2 是人形机器人软件开发的标准框架。掌握 ROS2,是进入机器人行业的必备技能。从基础节点开始,逐步掌握话题、服务、动作,最终能够构建复杂的人形机器人系统。


参考资料


分享这篇文章到:

上一篇文章
Redis Cluster 原理详解
下一篇文章
Redis 消息队列实现方案