决策逻辑与控制系统
决策系统是人形机器人的”大脑皮层”,负责理解任务、规划行动、做出决策。从高层任务分解到低层动作执行,需要多层次的决策架构。本文深入解析人形机器人决策系统的核心技术与工程实现。
一、决策系统架构
1.1 分层决策架构
人形机器人决策架构(三层模型)
┌─────────────────────────────────────┐
│ 任务层(Task Level) │
│ 时间尺度:秒级 │
│ ─────────────────────────────────── │
│ 功能: │
│ - 理解自然语言指令 │
│ - 任务分解与规划 │
│ - 资源调度 │
│ - 异常处理 │
│ │
│ 技术:LLM、任务规划器、行为树 │
└──────────────┬──────────────────────┘
▼
┌─────────────────────────────────────┐
│ 技能层(Skill Level) │
│ 时间尺度:100ms-秒级 │
│ ─────────────────────────────────── │
│ 功能: │
│ - 动作原语执行 │
│ - 运动规划 │
│ - 感知 - 动作闭环 │
│ - 在线调整 │
│ │
│ 技术:运动原语、MPC、WBC │
└──────────────┬──────────────────────┘
▼
┌─────────────────────────────────────┐
│ 执行层(Execution Level) │
│ 时间尺度:ms 级 │
│ ─────────────────────────────────── │
│ 功能: │
│ - 关节扭矩控制 │
│ - 力位混合控制 │
│ - 反射行为 │
│ - 安全监控 │
│ │
│ 技术:阻抗控制、 admittance 控制 │
└─────────────────────────────────────┘
信息流:
- 下行:任务→技能→执行(指令)
- 上行:执行→技能→任务(反馈)
1.2 决策频率
| 层级 | 决策频率 | 延迟要求 | 典型算法 |
|---|---|---|---|
| 任务层 | 1-10Hz | <500ms | LLM、行为树 |
| 技能层 | 50-200Hz | <50ms | MPC、规划器 |
| 执行层 | 500-1000Hz | <5ms | 阻抗控制、力控 |
1.3 决策内容
任务层决策:
任务层决策示例
输入:"把桌子上的水瓶拿给我"
决策过程:
1. 语义理解(LLM)
- 物体:水瓶
- 位置:桌子上
- 动作:拿起、递送
- 目标:用户
2. 任务分解
- 子任务 1:走到桌子
- 子任务 2:识别水瓶
- 子任务 3:抓取水瓶
- 子任务 4:走到用户
- 子任务 5:递送水瓶
3. 资源调度
- 导航模块:子任务 1、4
- 视觉模块:子任务 2
- 操作模块:子任务 3、5
4. 异常处理预案
- 如果水瓶不在→搜索
- 如果抓取失败→重试
- 如果用户移动→跟踪
技能层决策:
技能层决策示例
子任务:"抓取水瓶"
决策内容:
1. 选择抓取策略
- 从上抓取 vs 侧面抓取
- 单手握持 vs 双手握持
2. 规划抓取姿势
- 手臂关节角度
- 手部开合角度
- 接近轨迹
3. 力控参数
- 抓握力:50N
- 最大力:100N(防滑)
- 顺应性:中等
4. 在线调整
- 根据视觉反馈调整位置
- 根据力反馈调整抓握力
二、任务规划
2.1 任务表示
PDDL(Planning Domain Definition Language):
; PDDL 领域定义
(define (domain humanoid-robot)
; 类型定义
(:types
location object robot hand)
; 谓词定义
(:predicates
(at ?r - robot ?l - location)
(holding ?r - robot ?o - object ?h - hand)
(on ?o - object ?l - location)
(reachable ?l1 - location ?l2 - location)
(graspable ?o - object))
; 动作定义
(:action move
:parameters (?r - robot ?from ?to - location)
:precondition (and
(at ?r ?from)
(reachable ?from ?to))
:effect (and
(at ?r ?to)
(not (at ?r ?from))))
(:action pick
:parameters (?r - robot ?o - object ?h - hand ?l - location)
:precondition (and
(at ?r ?l)
(on ?o ?l)
(graspable ?o)
(empty ?h))
:effect (and
(holding ?r ?o ?h)
(not (on ?o ?l))
(not (empty ?h))))
(:action place
:parameters (?r - robot ?o - object ?h - hand ?l - location)
:precondition (and
(at ?r ?l)
(holding ?r ?o ?h))
:effect (and
(on ?o ?l)
(empty ?h)
(not (holding ?r ?o ?h)))))
; PDDL 问题定义
(define (problem pick-and-place)
(:domain humanoid-robot)
(:objects
robot1 - robot
hand-left hand-right - hand
table counter - location
bottle - object)
(:init
(at robot1 counter)
(on bottle table)
(reachable counter table)
(graspable bottle)
(empty hand-left)
(empty hand-right))
(:goal (holding robot1 bottle hand-left)))
2.2 规划算法
A 搜索算法*:
import heapq
from typing import List, Dict, Optional
class TaskPlanner:
def __init__(self):
self.heuristic = self.default_heuristic
def plan(self, initial_state, goal_state, actions):
"""
A* 任务规划
"""
# 优先队列:(f_score, state, path)
open_set = [(0, initial_state, [])]
visited = set()
while open_set:
f, current, path = heapq.heappop(open_set)
# 检查目标
if self.is_goal(current, goal_state):
return path
# 跳过已访问
if str(current) in visited:
continue
visited.add(str(current))
# 扩展动作
for action in actions:
if self.is_applicable(current, action):
next_state = self.apply_action(current, action)
g = len(path) + 1 # 代价
h = self.heuristic(next_state, goal_state)
f = g + h
new_path = path + [action]
heapq.heappush(open_set, (f, next_state, new_path))
return None # 无解
def default_heuristic(self, state, goal):
"""
默认启发式:计算未满足目标数
"""
return len(set(goal.items()) - set(state.items()))
# 使用示例
planner = TaskPlanner()
plan = planner.plan(
initial_state={'robot_at': 'counter', 'bottle_on': 'table'},
goal_state={'holding': 'bottle'},
actions=['move', 'pick', 'place']
)
HTN(Hierarchical Task Network)规划:
class HTNPlanner:
"""
分层任务网络规划
"""
def __init__(self):
# 方法库:高层任务→子任务分解
self.methods = {
'fetch_object': [
{
'precondition': lambda s: s['object_visible'],
'subtasks': ['navigate_to', 'pick_object', 'navigate_back', 'place_object']
}
],
'navigate_to': [
{
'precondition': lambda s: True,
'subtasks': ['plan_path', 'follow_path']
}
],
'pick_object': [
{
'precondition': lambda s: s['reachable'],
'subtasks': ['approach_object', 'grasp_object']
}
]
}
# 原语动作库
self.primitives = {
'plan_path': self.plan_path,
'follow_path': self.follow_path,
'approach_object': self.approach_object,
'grasp_object': self.grasp_object,
# ...
}
def plan(self, task, state):
"""
HTN 规划主函数
"""
if task in self.primitives:
# 原语动作,直接执行
return [task]
if task not in self.methods:
return None
# 尝试每个方法
for method in self.methods[task]:
if method['precondition'](state):
# 递归规划子任务
plan = []
for subtask in method['subtasks']:
subplan = self.plan(subtask, state)
if subplan is None:
break
plan.extend(subplan)
else:
return plan
return None # 无解
2.3 任务执行监控
class TaskExecutor:
"""
任务执行与监控
"""
def __init__(self):
self.current_task = None
self.execution_log = []
def execute(self, plan, state_estimator):
"""
执行计划并监控
"""
for i, action in enumerate(plan):
self.current_task = (i, action)
# 执行动作
success = self.execute_action(action)
# 记录执行
self.execution_log.append({
'action': action,
'success': success,
'state': state_estimator.get_state()
})
# 检查失败
if not success:
# 尝试恢复
recovery = self.recover_from_failure(action, state_estimator)
if recovery:
# 重新执行
success = self.execute_action(action)
if not success:
# 任务失败,重新规划
new_plan = self.replan(state_estimator)
return self.execute(new_plan, state_estimator)
return True
def recover_from_failure(self, action, state):
"""
失败恢复策略
"""
if action == 'grasp_object':
# 抓取失败:调整姿势重试
if state['grasp_attempts'] < 3:
return {'adjust_grasp_pose': True}
if action == 'navigate_to':
# 导航失败:重新定位
if state['localization_confidence'] < 0.8:
return {'relocalize': True}
return None
def replan(self, state):
"""
重新规划
"""
# 基于当前状态重新规划
planner = TaskPlanner()
new_plan = planner.plan(
initial_state=state,
goal_state=self.goal_state,
actions=self.available_actions
)
return new_plan
三、行为树
3.1 行为树基础
行为树节点类型:
行为树节点类型
├── 控制节点
│ ├── Sequence(序列)
│ │ └── 依次执行子节点,全部成功才成功
│ ├── Selector(选择)
│ │ └── 依次执行子节点,一个成功就成功
│ ├── Parallel(并行)
│ │ └── 同时执行子节点
│ └── Decorator(装饰器)
│ ├── Inverter(取反)
│ ├── Repeater(重复)
│ └── Timeout(超时)
│
├── 条件节点
│ └── 返回 True/False,不改变状态
│
└── 动作节点
└── 执行具体动作,返回 Success/Running/Failure
3.2 行为树实现
from enum import Enum
from typing import List, Optional
class NodeStatus(Enum):
SUCCESS = "success"
RUNNING = "running"
FAILURE = "failure"
class BehaviorNode:
"""行为树节点基类"""
def __init__(self, name: str):
self.name = name
def tick(self) -> NodeStatus:
raise NotImplementedError
class SequenceNode(BehaviorNode):
"""序列节点:全部成功才成功"""
def __init__(self, name: str, children: List[BehaviorNode]):
super().__init__(name)
self.children = children
self.current_child = 0
def tick(self) -> NodeStatus:
while self.current_child < len(self.children):
child = self.children[self.current_child]
status = child.tick()
if status == NodeStatus.RUNNING:
return NodeStatus.RUNNING
elif status == NodeStatus.FAILURE:
self.current_child = 0 # 重置
return NodeStatus.FAILURE
self.current_child += 1
self.current_child = 0 # 重置
return NodeStatus.SUCCESS
class SelectorNode(BehaviorNode):
"""选择节点:一个成功就成功"""
def __init__(self, name: str, children: List[BehaviorNode]):
super().__init__(name)
self.children = children
self.current_child = 0
def tick(self) -> NodeStatus:
while self.current_child < len(self.children):
child = self.children[self.current_child]
status = child.tick()
if status == NodeStatus.RUNNING:
return NodeStatus.RUNNING
elif status == NodeStatus.SUCCESS:
self.current_child = 0 # 重置
return NodeStatus.SUCCESS
self.current_child += 1
self.current_child = 0 # 重置
return NodeStatus.FAILURE
class ActionNode(BehaviorNode):
"""动作节点"""
def __init__(self, name: str, action_func):
super().__init__(name)
self.action_func = action_func
def tick(self) -> NodeStatus:
success = self.action_func()
if success:
return NodeStatus.SUCCESS
else:
return NodeStatus.FAILURE
class ConditionNode(BehaviorNode):
"""条件节点"""
def __init__(self, name: str, condition_func):
super().__init__(name)
self.condition_func = condition_func
def tick(self) -> NodeStatus:
if self.condition_func():
return NodeStatus.SUCCESS
else:
return NodeStatus.FAILURE
3.3 人形机器人行为树示例
def build_fetch_object_behavior():
"""
构建"取物"行为树
"""
# 条件检查
has_object = ConditionNode("HasObject", lambda: robot.has_object())
object_visible = ConditionNode("ObjectVisible", lambda: vision.detect_object())
object_reachable = ConditionNode("ObjectReachable", lambda: kinematics.is_reachable())
# 动作
navigate = ActionNode("Navigate", navigation.goto_location)
approach = ActionNode("Approach", manipulation.approach_object)
grasp = ActionNode("Grasp", manipulation.grasp_object)
retreat = ActionNode("Retreat", manipulation.retreat_arm)
# 构建子树:抓取物体
grasp_sequence = SequenceNode("GraspSequence", [
object_reachable,
approach,
grasp,
retreat
])
# 构建子树:导航到物体
navigate_to_object = SequenceNode("NavigateToObject", [
object_visible,
navigate
])
# 主树:取物
fetch_tree = SelectorNode("FetchObject", [
# 如果已经有物体,成功
has_object,
# 否则执行取物流程
SequenceNode("FetchProcess", [
navigate_to_object,
grasp_sequence
])
])
return fetch_tree
# 执行行为树
behavior_tree = build_fetch_object_behavior()
while True:
status = behavior_tree.tick()
if status != NodeStatus.RUNNING:
break
time.sleep(0.1) # 10Hz
3.4 行为树优势
vs 有限状态机:
| 特性 | 行为树 | 有限状态机 |
|---|---|---|
| 模块化 | 高(节点可复用) | 低(状态耦合) |
| 可扩展性 | 高(添加节点即可) | 低(状态数爆炸) |
| 调试 | 容易(可视化执行路径) | 困难 |
| 并发性 | 支持(Parallel 节点) | 不支持 |
| 优先级 | 天然支持(Selector) | 需要额外逻辑 |
四、强化学习决策
4.1 RL 在决策中的应用
适用场景:
- 复杂环境下的序列决策
- 难以手工设计规则的任务
- 需要从数据中学习的技能
典型应用:
RL 应用场景
├── locomotion(运动)
│ ├── 行走步态学习
│ ├── 复杂地形适应
│ └── 摔倒恢复
│
├── manipulation(操作)
│ ├── 抓取策略学习
│ ├── 工具使用
│ └── 精细操作
│
└── high-level decision(高层决策)
├── 任务优先级学习
├── 人机交互策略
└── 多机器人协作
4.2 RL 算法选择
PPO(Proximal Policy Optimization):
import torch
import torch.nn as nn
import torch.optim as optim
class PPOAgent:
"""
PPO 智能体
"""
def __init__(self, state_dim, action_dim, device='cuda'):
self.device = device
# Actor-Critic 网络
self.actor = self.build_network(state_dim, action_dim)
self.critic = self.build_network(state_dim, 1)
# 优化器
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
# PPO 参数
self.clip_epsilon = 0.2
self.gamma = 0.99
self.lam = 0.95
def select_action(self, state):
"""选择动作"""
state = torch.FloatTensor(state).to(self.device)
with torch.no_grad():
action_mean = self.actor(state)
action = torch.normal(action_mean, 0.1).cpu().numpy()
return action
def update(self, states, actions, rewards, next_states, dones):
"""
PPO 更新
"""
# 计算优势函数(GAE)
advantages = self.compute_gae(states, rewards, next_states, dones)
# 多次 PPO 更新
for _ in range(10):
# 计算策略比率
old_log_probs = self.get_log_probs(states, actions)
new_log_probs = self.get_log_probs(states, actions)
ratio = torch.exp(new_log_probs - old_log_probs)
# PPO 损失
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1-self.clip_epsilon, 1+self.clip_epsilon) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
# 价值损失
values = self.critic(states).squeeze()
critic_loss = nn.MSELoss()(values, rewards)
# 优化
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
SAC(Soft Actor-Critic):
class SACAgent:
"""
SAC 智能体(适合连续控制)
"""
def __init__(self, state_dim, action_dim, action_range):
self.action_range = action_range
# Actor(策略网络)
self.actor = StochasticActor(state_dim, action_dim)
# Critic(双 Q 网络)
self.critic1 = CriticNetwork(state_dim, action_dim)
self.critic2 = CriticNetwork(state_dim, action_dim)
self.critic_target1 = CriticNetwork(state_dim, action_dim)
self.critic_target2 = CriticNetwork(state_dim, action_dim)
# 熵温度系数
self.alpha = 0.2
def select_action(self, state, eval=False):
"""选择动作(带熵正则化)"""
state = torch.FloatTensor(state)
with torch.no_grad():
action, _ = self.actor.sample(state)
return action.cpu().numpy()
def update(self, replay_buffer, batch_size=256):
"""SAC 更新"""
# 采样 batch
states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
# 更新 Critic
q_loss = self.update_critic(states, actions, rewards, next_states, dones)
# 更新 Actor
policy_loss = self.update_actor(states)
# 更新温度系数
alpha_loss = self.update_alpha()
return q_loss, policy_loss, alpha_loss
4.3 Sim2Real 迁移
class Sim2RealTransfer:
"""
仿真到现实迁移
"""
def __init__(self):
self.domain_randomization = DomainRandomization()
def train_in_simulation(self, env, agent, num_episodes=10000):
"""
在仿真中训练
"""
for episode in range(num_episodes):
# 随机化环境参数
self.domain_randomization.randomize(env)
# 收集轨迹
state = env.reset()
done = False
while not done:
action = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
replay_buffer.add(state, action, reward, next_state, done)
state = next_state
# 更新策略
agent.update(replay_buffer)
return agent
def deploy_to_real(self, agent, real_env):
"""
部署到现实
"""
# 无需微调(如果 domain randomization 充分)
state = real_env.reset()
done = False
while not done:
action = agent.select_action(state, eval=True)
next_state, reward, done, _ = real_env.step(action)
state = next_state
五、多模态决策融合
5.1 VLA 决策架构
VLA(Vision-Language-Action)决策架构
输入:
├── 视觉:摄像头图像
├── 语言:自然语言指令
└── 本体:关节状态、力传感器
处理:
├── Vision Encoder:提取视觉特征
├── Language Encoder:提取语言特征
├── Fusion Layer:多模态融合
└── Action Head:输出动作
输出:
└── 关节扭矩/位置命令
端到端训练:
- 数据:人类演示(视觉 + 语言→动作)
- 损失:动作预测误差
- 优化:梯度下降
5.2 决策不确定性处理
class UncertaintyAwareDecision:
"""
考虑不确定性的决策
"""
def __init__(self):
self.uncertainty_threshold = 0.5
def decide(self, state):
"""
带不确定性的决策
"""
# 获取动作分布
action_dist = self.policy.get_distribution(state)
# 计算不确定性(熵)
uncertainty = self.compute_entropy(action_dist)
if uncertainty > self.uncertainty_threshold:
# 高不确定性:请求人类帮助
return self.request_human_help(state)
else:
# 低不确定性:自主决策
return action_dist.sample()
def compute_entropy(self, dist):
"""计算分布熵"""
if isinstance(dist, torch.distributions.Normal):
return dist.entropy().mean().item()
# 其他分布...
六、总结
6.1 核心要点
- 分层决策:任务层 - 技能层 - 执行层
- 任务规划:PDDL、HTN、A*搜索
- 行为树:模块化、可扩展的决策框架
- 强化学习:从数据中学习复杂决策
- VLA 融合:视觉 - 语言 - 动作端到端
6.2 技术趋势
决策技术演进
2010s:
├── 规则系统
├── 有限状态机
└── 传统规划(PDDL)
2020s:
├── 行为树(主流)
├── 强化学习
└── VLA 端到端
2030s(预测):
├── 大模型决策(LLM+ 机器人)
├── 世界模型预测
└── 类人推理能力
决策系统是人形机器人智能化的核心。随着大语言模型、VLA 模型的发展,人形机器人的决策能力将从”预编程”向”自主理解、自主学习”演进,最终实现类人的决策智能。
参考资料:
- Behavior Trees in Robotics and AI
- Reinforcement Learning: An Introduction
- Google RT-2 论文
- Tesla AI Day 技术分享