Skip to content
清晨的一缕阳光
返回

决策逻辑与控制系统

决策逻辑与控制系统

决策系统是人形机器人的”大脑皮层”,负责理解任务、规划行动、做出决策。从高层任务分解到低层动作执行,需要多层次的决策架构。本文深入解析人形机器人决策系统的核心技术与工程实现。

一、决策系统架构

1.1 分层决策架构

人形机器人决策架构(三层模型)

┌─────────────────────────────────────┐
│ 任务层(Task Level)                 │
│ 时间尺度:秒级                       │
│ ───────────────────────────────────  │
│ 功能:                               │
│ - 理解自然语言指令                   │
│ - 任务分解与规划                     │
│ - 资源调度                           │
│ - 异常处理                           │
│                                      │
│ 技术:LLM、任务规划器、行为树        │
└──────────────┬──────────────────────┘

┌─────────────────────────────────────┐
│ 技能层(Skill Level)                │
│ 时间尺度:100ms-秒级                 │
│ ───────────────────────────────────  │
│ 功能:                               │
│ - 动作原语执行                       │
│ - 运动规划                           │
│ - 感知 - 动作闭环                     │
│ - 在线调整                           │
│                                      │
│ 技术:运动原语、MPC、WBC             │
└──────────────┬──────────────────────┘

┌─────────────────────────────────────┐
│ 执行层(Execution Level)            │
│ 时间尺度:ms 级                      │
│ ───────────────────────────────────  │
│ 功能:                               │
│ - 关节扭矩控制                       │
│ - 力位混合控制                       │
│ - 反射行为                           │
│ - 安全监控                           │
│                                      │
│ 技术:阻抗控制、 admittance 控制       │
└─────────────────────────────────────┘

信息流:
- 下行:任务→技能→执行(指令)
- 上行:执行→技能→任务(反馈)

1.2 决策频率

层级决策频率延迟要求典型算法
任务层1-10Hz<500msLLM、行为树
技能层50-200Hz<50msMPC、规划器
执行层500-1000Hz<5ms阻抗控制、力控

1.3 决策内容

任务层决策

任务层决策示例

输入:"把桌子上的水瓶拿给我"

决策过程:
1. 语义理解(LLM)
   - 物体:水瓶
   - 位置:桌子上
   - 动作:拿起、递送
   - 目标:用户

2. 任务分解
   - 子任务 1:走到桌子
   - 子任务 2:识别水瓶
   - 子任务 3:抓取水瓶
   - 子任务 4:走到用户
   - 子任务 5:递送水瓶

3. 资源调度
   - 导航模块:子任务 1、4
   - 视觉模块:子任务 2
   - 操作模块:子任务 3、5

4. 异常处理预案
   - 如果水瓶不在→搜索
   - 如果抓取失败→重试
   - 如果用户移动→跟踪

技能层决策

技能层决策示例

子任务:"抓取水瓶"

决策内容:
1. 选择抓取策略
   - 从上抓取 vs 侧面抓取
   - 单手握持 vs 双手握持

2. 规划抓取姿势
   - 手臂关节角度
   - 手部开合角度
   - 接近轨迹

3. 力控参数
   - 抓握力:50N
   - 最大力:100N(防滑)
   - 顺应性:中等

4. 在线调整
   - 根据视觉反馈调整位置
   - 根据力反馈调整抓握力

二、任务规划

2.1 任务表示

PDDL(Planning Domain Definition Language)

; PDDL 领域定义
(define (domain humanoid-robot)
  
  ; 类型定义
  (:types 
    location object robot hand)
  
  ; 谓词定义
  (:predicates
    (at ?r - robot ?l - location)
    (holding ?r - robot ?o - object ?h - hand)
    (on ?o - object ?l - location)
    (reachable ?l1 - location ?l2 - location)
    (graspable ?o - object))
  
  ; 动作定义
  (:action move
    :parameters (?r - robot ?from ?to - location)
    :precondition (and 
      (at ?r ?from) 
      (reachable ?from ?to))
    :effect (and 
      (at ?r ?to) 
      (not (at ?r ?from))))
  
  (:action pick
    :parameters (?r - robot ?o - object ?h - hand ?l - location)
    :precondition (and
      (at ?r ?l)
      (on ?o ?l)
      (graspable ?o)
      (empty ?h))
    :effect (and
      (holding ?r ?o ?h)
      (not (on ?o ?l))
      (not (empty ?h))))
  
  (:action place
    :parameters (?r - robot ?o - object ?h - hand ?l - location)
    :precondition (and
      (at ?r ?l)
      (holding ?r ?o ?h))
    :effect (and
      (on ?o ?l)
      (empty ?h)
      (not (holding ?r ?o ?h)))))

; PDDL 问题定义
(define (problem pick-and-place)
  (:domain humanoid-robot)
  (:objects 
    robot1 - robot
    hand-left hand-right - hand
    table counter - location
    bottle - object)
  (:init
    (at robot1 counter)
    (on bottle table)
    (reachable counter table)
    (graspable bottle)
    (empty hand-left)
    (empty hand-right))
  (:goal (holding robot1 bottle hand-left)))

2.2 规划算法

A 搜索算法*:

import heapq
from typing import List, Dict, Optional

class TaskPlanner:
    def __init__(self):
        self.heuristic = self.default_heuristic
    
    def plan(self, initial_state, goal_state, actions):
        """
        A* 任务规划
        """
        # 优先队列:(f_score, state, path)
        open_set = [(0, initial_state, [])]
        visited = set()
        
        while open_set:
            f, current, path = heapq.heappop(open_set)
            
            # 检查目标
            if self.is_goal(current, goal_state):
                return path
            
            # 跳过已访问
            if str(current) in visited:
                continue
            visited.add(str(current))
            
            # 扩展动作
            for action in actions:
                if self.is_applicable(current, action):
                    next_state = self.apply_action(current, action)
                    g = len(path) + 1  # 代价
                    h = self.heuristic(next_state, goal_state)
                    f = g + h
                    new_path = path + [action]
                    heapq.heappush(open_set, (f, next_state, new_path))
        
        return None  # 无解
    
    def default_heuristic(self, state, goal):
        """
        默认启发式:计算未满足目标数
        """
        return len(set(goal.items()) - set(state.items()))

# 使用示例
planner = TaskPlanner()
plan = planner.plan(
    initial_state={'robot_at': 'counter', 'bottle_on': 'table'},
    goal_state={'holding': 'bottle'},
    actions=['move', 'pick', 'place']
)

HTN(Hierarchical Task Network)规划

class HTNPlanner:
    """
    分层任务网络规划
    """
    def __init__(self):
        # 方法库:高层任务→子任务分解
        self.methods = {
            'fetch_object': [
                {
                    'precondition': lambda s: s['object_visible'],
                    'subtasks': ['navigate_to', 'pick_object', 'navigate_back', 'place_object']
                }
            ],
            'navigate_to': [
                {
                    'precondition': lambda s: True,
                    'subtasks': ['plan_path', 'follow_path']
                }
            ],
            'pick_object': [
                {
                    'precondition': lambda s: s['reachable'],
                    'subtasks': ['approach_object', 'grasp_object']
                }
            ]
        }
        
        # 原语动作库
        self.primitives = {
            'plan_path': self.plan_path,
            'follow_path': self.follow_path,
            'approach_object': self.approach_object,
            'grasp_object': self.grasp_object,
            # ...
        }
    
    def plan(self, task, state):
        """
        HTN 规划主函数
        """
        if task in self.primitives:
            # 原语动作,直接执行
            return [task]
        
        if task not in self.methods:
            return None
        
        # 尝试每个方法
        for method in self.methods[task]:
            if method['precondition'](state):
                # 递归规划子任务
                plan = []
                for subtask in method['subtasks']:
                    subplan = self.plan(subtask, state)
                    if subplan is None:
                        break
                    plan.extend(subplan)
                else:
                    return plan
        
        return None  # 无解

2.3 任务执行监控

class TaskExecutor:
    """
    任务执行与监控
    """
    def __init__(self):
        self.current_task = None
        self.execution_log = []
    
    def execute(self, plan, state_estimator):
        """
        执行计划并监控
        """
        for i, action in enumerate(plan):
            self.current_task = (i, action)
            
            # 执行动作
            success = self.execute_action(action)
            
            # 记录执行
            self.execution_log.append({
                'action': action,
                'success': success,
                'state': state_estimator.get_state()
            })
            
            # 检查失败
            if not success:
                # 尝试恢复
                recovery = self.recover_from_failure(action, state_estimator)
                if recovery:
                    # 重新执行
                    success = self.execute_action(action)
                
                if not success:
                    # 任务失败,重新规划
                    new_plan = self.replan(state_estimator)
                    return self.execute(new_plan, state_estimator)
        
        return True
    
    def recover_from_failure(self, action, state):
        """
        失败恢复策略
        """
        if action == 'grasp_object':
            # 抓取失败:调整姿势重试
            if state['grasp_attempts'] < 3:
                return {'adjust_grasp_pose': True}
        
        if action == 'navigate_to':
            # 导航失败:重新定位
            if state['localization_confidence'] < 0.8:
                return {'relocalize': True}
        
        return None
    
    def replan(self, state):
        """
        重新规划
        """
        # 基于当前状态重新规划
        planner = TaskPlanner()
        new_plan = planner.plan(
            initial_state=state,
            goal_state=self.goal_state,
            actions=self.available_actions
        )
        return new_plan

三、行为树

3.1 行为树基础

行为树节点类型

行为树节点类型

├── 控制节点
│   ├── Sequence(序列)
│   │   └── 依次执行子节点,全部成功才成功
│   ├── Selector(选择)
│   │   └── 依次执行子节点,一个成功就成功
│   ├── Parallel(并行)
│   │   └── 同时执行子节点
│   └── Decorator(装饰器)
│       ├── Inverter(取反)
│       ├── Repeater(重复)
│       └── Timeout(超时)

├── 条件节点
│   └── 返回 True/False,不改变状态

└── 动作节点
    └── 执行具体动作,返回 Success/Running/Failure

3.2 行为树实现

from enum import Enum
from typing import List, Optional

class NodeStatus(Enum):
    SUCCESS = "success"
    RUNNING = "running"
    FAILURE = "failure"

class BehaviorNode:
    """行为树节点基类"""
    def __init__(self, name: str):
        self.name = name
    
    def tick(self) -> NodeStatus:
        raise NotImplementedError

class SequenceNode(BehaviorNode):
    """序列节点:全部成功才成功"""
    def __init__(self, name: str, children: List[BehaviorNode]):
        super().__init__(name)
        self.children = children
        self.current_child = 0
    
    def tick(self) -> NodeStatus:
        while self.current_child < len(self.children):
            child = self.children[self.current_child]
            status = child.tick()
            
            if status == NodeStatus.RUNNING:
                return NodeStatus.RUNNING
            elif status == NodeStatus.FAILURE:
                self.current_child = 0  # 重置
                return NodeStatus.FAILURE
            
            self.current_child += 1
        
        self.current_child = 0  # 重置
        return NodeStatus.SUCCESS

class SelectorNode(BehaviorNode):
    """选择节点:一个成功就成功"""
    def __init__(self, name: str, children: List[BehaviorNode]):
        super().__init__(name)
        self.children = children
        self.current_child = 0
    
    def tick(self) -> NodeStatus:
        while self.current_child < len(self.children):
            child = self.children[self.current_child]
            status = child.tick()
            
            if status == NodeStatus.RUNNING:
                return NodeStatus.RUNNING
            elif status == NodeStatus.SUCCESS:
                self.current_child = 0  # 重置
                return NodeStatus.SUCCESS
            
            self.current_child += 1
        
        self.current_child = 0  # 重置
        return NodeStatus.FAILURE

class ActionNode(BehaviorNode):
    """动作节点"""
    def __init__(self, name: str, action_func):
        super().__init__(name)
        self.action_func = action_func
    
    def tick(self) -> NodeStatus:
        success = self.action_func()
        if success:
            return NodeStatus.SUCCESS
        else:
            return NodeStatus.FAILURE

class ConditionNode(BehaviorNode):
    """条件节点"""
    def __init__(self, name: str, condition_func):
        super().__init__(name)
        self.condition_func = condition_func
    
    def tick(self) -> NodeStatus:
        if self.condition_func():
            return NodeStatus.SUCCESS
        else:
            return NodeStatus.FAILURE

3.3 人形机器人行为树示例

def build_fetch_object_behavior():
    """
    构建"取物"行为树
    """
    # 条件检查
    has_object = ConditionNode("HasObject", lambda: robot.has_object())
    object_visible = ConditionNode("ObjectVisible", lambda: vision.detect_object())
    object_reachable = ConditionNode("ObjectReachable", lambda: kinematics.is_reachable())
    
    # 动作
    navigate = ActionNode("Navigate", navigation.goto_location)
    approach = ActionNode("Approach", manipulation.approach_object)
    grasp = ActionNode("Grasp", manipulation.grasp_object)
    retreat = ActionNode("Retreat", manipulation.retreat_arm)
    
    # 构建子树:抓取物体
    grasp_sequence = SequenceNode("GraspSequence", [
        object_reachable,
        approach,
        grasp,
        retreat
    ])
    
    # 构建子树:导航到物体
    navigate_to_object = SequenceNode("NavigateToObject", [
        object_visible,
        navigate
    ])
    
    # 主树:取物
    fetch_tree = SelectorNode("FetchObject", [
        # 如果已经有物体,成功
        has_object,
        # 否则执行取物流程
        SequenceNode("FetchProcess", [
            navigate_to_object,
            grasp_sequence
        ])
    ])
    
    return fetch_tree

# 执行行为树
behavior_tree = build_fetch_object_behavior()
while True:
    status = behavior_tree.tick()
    if status != NodeStatus.RUNNING:
        break
    time.sleep(0.1)  # 10Hz

3.4 行为树优势

vs 有限状态机

特性行为树有限状态机
模块化高(节点可复用)低(状态耦合)
可扩展性高(添加节点即可)低(状态数爆炸)
调试容易(可视化执行路径)困难
并发性支持(Parallel 节点)不支持
优先级天然支持(Selector)需要额外逻辑

四、强化学习决策

4.1 RL 在决策中的应用

适用场景

典型应用

RL 应用场景

├──  locomotion(运动)
│   ├── 行走步态学习
│   ├── 复杂地形适应
│   └── 摔倒恢复

├── manipulation(操作)
│   ├── 抓取策略学习
│   ├── 工具使用
│   └── 精细操作

└── high-level decision(高层决策)
    ├── 任务优先级学习
    ├── 人机交互策略
    └── 多机器人协作

4.2 RL 算法选择

PPO(Proximal Policy Optimization)

import torch
import torch.nn as nn
import torch.optim as optim

class PPOAgent:
    """
    PPO 智能体
    """
    def __init__(self, state_dim, action_dim, device='cuda'):
        self.device = device
        
        # Actor-Critic 网络
        self.actor = self.build_network(state_dim, action_dim)
        self.critic = self.build_network(state_dim, 1)
        
        # 优化器
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
        
        # PPO 参数
        self.clip_epsilon = 0.2
        self.gamma = 0.99
        self.lam = 0.95
    
    def select_action(self, state):
        """选择动作"""
        state = torch.FloatTensor(state).to(self.device)
        with torch.no_grad():
            action_mean = self.actor(state)
        action = torch.normal(action_mean, 0.1).cpu().numpy()
        return action
    
    def update(self, states, actions, rewards, next_states, dones):
        """
        PPO 更新
        """
        # 计算优势函数(GAE)
        advantages = self.compute_gae(states, rewards, next_states, dones)
        
        # 多次 PPO 更新
        for _ in range(10):
            # 计算策略比率
            old_log_probs = self.get_log_probs(states, actions)
            new_log_probs = self.get_log_probs(states, actions)
            ratio = torch.exp(new_log_probs - old_log_probs)
            
            # PPO 损失
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1-self.clip_epsilon, 1+self.clip_epsilon) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()
            
            # 价值损失
            values = self.critic(states).squeeze()
            critic_loss = nn.MSELoss()(values, rewards)
            
            # 优化
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()
            
            self.critic_optimizer.zero_grad()
            critic_loss.backward()
            self.critic_optimizer.step()

SAC(Soft Actor-Critic)

class SACAgent:
    """
    SAC 智能体(适合连续控制)
    """
    def __init__(self, state_dim, action_dim, action_range):
        self.action_range = action_range
        
        # Actor(策略网络)
        self.actor = StochasticActor(state_dim, action_dim)
        
        # Critic(双 Q 网络)
        self.critic1 = CriticNetwork(state_dim, action_dim)
        self.critic2 = CriticNetwork(state_dim, action_dim)
        self.critic_target1 = CriticNetwork(state_dim, action_dim)
        self.critic_target2 = CriticNetwork(state_dim, action_dim)
        
        # 熵温度系数
        self.alpha = 0.2
    
    def select_action(self, state, eval=False):
        """选择动作(带熵正则化)"""
        state = torch.FloatTensor(state)
        with torch.no_grad():
            action, _ = self.actor.sample(state)
        return action.cpu().numpy()
    
    def update(self, replay_buffer, batch_size=256):
        """SAC 更新"""
        # 采样 batch
        states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
        
        # 更新 Critic
        q_loss = self.update_critic(states, actions, rewards, next_states, dones)
        
        # 更新 Actor
        policy_loss = self.update_actor(states)
        
        # 更新温度系数
        alpha_loss = self.update_alpha()
        
        return q_loss, policy_loss, alpha_loss

4.3 Sim2Real 迁移

class Sim2RealTransfer:
    """
    仿真到现实迁移
    """
    def __init__(self):
        self.domain_randomization = DomainRandomization()
    
    def train_in_simulation(self, env, agent, num_episodes=10000):
        """
        在仿真中训练
        """
        for episode in range(num_episodes):
            # 随机化环境参数
            self.domain_randomization.randomize(env)
            
            # 收集轨迹
            state = env.reset()
            done = False
            while not done:
                action = agent.select_action(state)
                next_state, reward, done, _ = env.step(action)
                replay_buffer.add(state, action, reward, next_state, done)
                state = next_state
            
            # 更新策略
            agent.update(replay_buffer)
        
        return agent
    
    def deploy_to_real(self, agent, real_env):
        """
        部署到现实
        """
        # 无需微调(如果 domain randomization 充分)
        state = real_env.reset()
        done = False
        while not done:
            action = agent.select_action(state, eval=True)
            next_state, reward, done, _ = real_env.step(action)
            state = next_state

五、多模态决策融合

5.1 VLA 决策架构

VLA(Vision-Language-Action)决策架构

输入:
├── 视觉:摄像头图像
├── 语言:自然语言指令
└── 本体:关节状态、力传感器

处理:
├── Vision Encoder:提取视觉特征
├── Language Encoder:提取语言特征
├── Fusion Layer:多模态融合
└── Action Head:输出动作

输出:
└── 关节扭矩/位置命令

端到端训练:
- 数据:人类演示(视觉 + 语言→动作)
- 损失:动作预测误差
- 优化:梯度下降

5.2 决策不确定性处理

class UncertaintyAwareDecision:
    """
    考虑不确定性的决策
    """
    def __init__(self):
        self.uncertainty_threshold = 0.5
    
    def decide(self, state):
        """
        带不确定性的决策
        """
        # 获取动作分布
        action_dist = self.policy.get_distribution(state)
        
        # 计算不确定性(熵)
        uncertainty = self.compute_entropy(action_dist)
        
        if uncertainty > self.uncertainty_threshold:
            # 高不确定性:请求人类帮助
            return self.request_human_help(state)
        else:
            # 低不确定性:自主决策
            return action_dist.sample()
    
    def compute_entropy(self, dist):
        """计算分布熵"""
        if isinstance(dist, torch.distributions.Normal):
            return dist.entropy().mean().item()
        # 其他分布...

六、总结

6.1 核心要点

  1. 分层决策:任务层 - 技能层 - 执行层
  2. 任务规划:PDDL、HTN、A*搜索
  3. 行为树:模块化、可扩展的决策框架
  4. 强化学习:从数据中学习复杂决策
  5. VLA 融合:视觉 - 语言 - 动作端到端

6.2 技术趋势

决策技术演进

2010s:
├── 规则系统
├── 有限状态机
└── 传统规划(PDDL)

2020s:
├── 行为树(主流)
├── 强化学习
└── VLA 端到端

2030s(预测):
├── 大模型决策(LLM+ 机器人)
├── 世界模型预测
└── 类人推理能力

决策系统是人形机器人智能化的核心。随着大语言模型、VLA 模型的发展,人形机器人的决策能力将从”预编程”向”自主理解、自主学习”演进,最终实现类人的决策智能。


参考资料


分享这篇文章到:

上一篇文章
MySQL CPU 与并发优化
下一篇文章
Prompt 安全与防护工程