# 项目二：机器人抓取仿真

## 项目概述

本项目将实现一个完整的机器人抓取系统，包括：
- 使用PyBullet或Gazebo进行物理仿真
- 实现基于视觉的抓取检测网络
- 训练深度强化学习策略进行抓取
- 实现Sim-to-Real的基本概念

**难度**：⭐⭐⭐⭐  
**预计时间**：3-4周  
**前置知识**：Python、深度学习、ROS基础

---

## 目录

1. [项目环境搭建](#1-项目环境搭建)
2. [PyBullet物理仿真基础](#2-pybullet物理仿真基础)
3. [抓取检测网络实现](#3-抓取检测网络实现)
4. [视觉伺服控制](#4-视觉伺服控制)
5. [强化学习抓取策略](#5-强化学习抓取策略)
6. [Sim-to-Real迁移](#6-sim-to-real迁移)

---

## 1. 项目环境搭建

### 1.1 安装依赖

创建 `projects/robot_grasping/requirements.txt`：

```text
# 物理仿真
pybullet>=3.2.0

# 深度学习
torch>=1.10.0
torchvision>=0.11.0

# 视觉
opencv-python>=4.5.0
pillow>=8.3.0

# 工具库
numpy>=1.21.0
matplotlib>=3.4.0
scipy>=1.7.0
pyyaml>=6.0
```

安装：

```bash
cd projects/robot_grasping
pip install -r requirements.txt
```

### 1.2 项目目录结构

```
robot_grasping/
├── envs/
│   ├── __init__.py
│   └── grasping_env.py      # 抓取仿真环境
├── models/
│   ├── __init__.py
│   ├── graspnet.py          # 抓取检测网络
│   └── policy.py            # 抓取策略网络
├── utils/
│   ├── __init__.py
│   └── transforms.py        # 坐标变换工具
├── train_graspnet.py         # 训练抓取网络
├── train_rl.py              # 训练RL策略
├── evaluate.py              # 评估
└── configs/
    └── default.yaml          # 配置文件
```

---

## 2. PyBullet物理仿真基础

### 2.1 PyBullet简介

PyBullet是一个Python接口，用于Bullet物理引擎。它提供：
- 刚体动力学仿真
- 碰撞检测
- 正逆运动学
- URDF/SDF模型加载

### 2.2 创建抓取仿真环境

创建 `envs/grasping_env.py`：

```python
"""
机器人抓取仿真环境
使用PyBullet实现物理仿真
"""

import pybullet as p
import pybullet_data
import numpy as np
from gymnasium import spaces
import os


class RobotGraspingEnv:
    """
    机器人抓取仿真环境
    
    包含：
    - 机械臂模型
    - 目标物体
    - 相机系统
    """
    
    def __init__(self, render=False, gui=True):
        """
        初始化仿真环境
        
        参数:
            render: 是否渲染
            gui: 是否使用GUI界面
        """
        self.render = render
        self.gui = gui
        
        # 连接物理引擎
        if gui:
            self.physics_client = p.connect(p.GUI)
        else:
            self.physics_client = p.connect(p.DIRECT)
        
        # 设置搜索路径
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        
        # 仿真参数
        self.time_step = 1.0 / 240.0  # 240Hz
        self.max_iterations = 500
        
        # 初始化环境
        self._setup_world()
        self._setup_robot()
        self._setup_objects()
        self._setup_camera()
        
        # 状态和动作空间
        self.observation_space = spaces.Box(
            low=0, high=255, shape=(224, 224, 3), dtype=np.uint8
        )
        self.action_space = spaces.Box(
            low=-1, high=1, shape=(7,), dtype=np.float32
        )
    
    def _setup_world(self):
        """设置物理世界"""
        # 重力
        p.setGravity(0, 0, -9.81)
        
        # 时间步
        p.setTimeStep(self.time_step)
        
        # 渲染选项
        p.configureDebugVisualizer(p.COV_ENABLE_TINY_RENDERER, 0)
        p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
        
        # 添加地面
        plane_id = p.loadURDF("plane.urdf")
        p.changeVisualShape(plane_id, -1, rgbaColor=[0.5, 0.5, 0.5, 1])
    
    def _setup_robot(self):
        """设置机械臂"""
        # 加载Franka Panda机械臂
        robot_path = os.path.join(
            pybullet_data.getDataPath(), 
            "franka_panda/panda.urdf"
        )
        
        self.robot_id = p.loadURDF(
            robot_path,
            basePosition=[0, 0, 0],
            baseOrientation=[0, 0, 0, 1],
            useMaximalCoordinates=False
        )
        
        # 机械臂配置
        self.num_joints = p.getNumJoints(self.robot_id)
        
        # 获取关节信息
        self.joint_indices = []
        self.joint_limits = []
        
        for i in range(self.num_joints):
            joint_info = p.getJointInfo(self.robot_id, i)
            if joint_info[2] != p.JOINT_FIXED:
                self.joint_indices.append(i)
                lower = joint_info[8]
                upper = joint_info[9]
                self.joint_limits.append((lower, upper))
        
        # 设置控制模式
        p.setJointMotorControlArray(
            self.robot_id,
            self.joint_indices,
            p.POSITION_CONTROL,
            forces=[500] * len(self.joint_indices)
        )
        
        # 末端执行器索引（最后一个关节）
        self.end_effector_index = 11  # Panda的末端
    
    def _setup_objects(self):
        """设置目标物体"""
        # 随机生成物体位置
        self.object_ids = []
        
        # 添加一个立方体作为抓取目标
        object_position = [
            0.4 + np.random.uniform(-0.1, 0.1),
            0.0 + np.random.uniform(-0.1, 0.1),
            0.05
        ]
        
        object_orientation = p.getQuaternionFromEuler([
            0, 0, np.random.uniform(0, np.pi)
        ])
        
        collision_shape = p.createCollisionShape(
            p.GEOM_BOX,
            halfExtents=[0.03, 0.03, 0.03]
        )
        
        visual_shape = p.createVisualShape(
            p.GEOM_BOX,
            halfExtents=[0.03, 0.03, 0.03],
            rgbaColor=[1, 0.2, 0.2, 1]
        )
        
        self.object_id = p.createMultiBody(
            baseMass=0.1,
            baseCollisionShapeIndex=collision_shape,
            baseVisualShapeIndex=visual_shape,
            basePosition=object_position,
            baseOrientation=object_orientation
        )
        
        self.object_ids.append(self.object_id)
    
    def _setup_camera(self):
        """设置相机参数"""
        self.camera_config = {
            'width': 224,
            'height': 224,
            'fov': 60,
            'near': 0.01,
            'far': 10.0,
            'eye_position': [0.5, 0, 0.8],
            'target_position': [0.3, 0, 0],
            'up_vector': [0, 0, 1]
        }
    
    def reset(self):
        """重置环境"""
        # 重置机械臂到初始位置
        initial_positions = [0, -0.785, 0, -2.356, 0, 1.571, 0.785, 0, 0]
        for i, pos in enumerate(initial_positions[:len(self.joint_indices)]):
            p.resetJointState(self.robot_id, self.joint_indices[i], pos)
        
        # 移除旧物体
        for obj_id in self.object_ids:
            p.removeBody(obj_id)
        
        # 添加新物体
        self._setup_objects()
        
        # 执行几步仿真稳定
        for _ in range(100):
            p.stepSimulation()
        
        # 获取观察
        observation = self._get_observation()
        
        return observation, {}
    
    def step(self, action):
        """执行一步仿真"""
        # 解码动作
        target_positions = self._decode_action(action)
        
        # 控制机械臂
        p.setJointMotorControlArray(
            self.robot_id,
            self.joint_indices[:7],  # 前7个关节
            p.POSITION_CONTROL,
            targetPositions=target_positions,
            forces=[500] * 7
        )
        
        # 仿真一步
        for _ in range(10):  # 10个子步骤
            p.stepSimulation()
        
        # 获取观察和奖励
        observation = self._get_observation()
        reward = self._calculate_reward()
        
        # 检查完成
        done = self._check_done()
        
        info = {
            'object_position': self._get_object_position(),
            'end_effector_position': self._get_end_effector_position()
        }
        
        return observation, reward, done, False, info
    
    def _decode_action(self, action):
        """将动作向量解码为关节目标位置"""
        # 动作 = 目标关节角度（相对于当前）
        current_positions = []
        for joint_idx in self.joint_indices[:7]:
            state = p.getJointState(self.robot_id, joint_idx)
            current_positions.append(state[0])
        
        target_positions = np.array(current_positions) + np.array(action[:7]) * 0.1
        return target_positions.tolist()
    
    def _get_observation(self):
        """获取RGB图像观察"""
        # 计算相机视图矩阵
        view_matrix = p.computeViewMatrixFromYawPitchRoll(
            cameraTargetPosition=self.camera_config['target_position'],
            distance=self.camera_config['width'] / 2,
            yaw=45,
            pitch=-30,
            roll=0,
            upAxisIndex=2
        )
        
        # 计算投影矩阵
        proj_matrix = p.computeProjectionMatrixFOV(
            fov=self.camera_config['fov'],
            aspect=self.camera_config['width'] / self.camera_config['height'],
            nearVal=self.camera_config['near'],
            farVal=self.camera_config['far']
        )
        
        # 渲染图像
        (_, _, img, _, _) = p.getCameraImage(
            width=self.camera_config['width'],
            height=self.camera_config['height'],
            viewMatrix=view_matrix,
            projectionMatrix=proj_matrix,
            renderer=p.ER_BULLET_HARDWARE_OPENGL
        )
        
        # 处理图像（RGBA转RGB）
        img = np.array(img, dtype=np.uint8)
        img = img[:, :, :3]  # 去掉alpha通道
        
        return img
    
    def _calculate_reward(self):
        """计算奖励"""
        object_pos = self._get_object_position()
        ee_pos = self._get_end_effector_position()
        
        # 抓取判定：末端执行器靠近物体
        distance = np.linalg.norm(np.array(object_pos) - np.array(ee_pos))
        
        reward = 0
        
        if distance < 0.05:  # 抓取范围
            reward += 5.0
        
        # 惩罚距离
        reward -= distance * 0.5
        
        # 物体高度奖励
        if object_pos[2] > 0.1:  # 举起物体
            reward += object_pos[2] * 10
        
        return reward
    
    def _check_done(self):
        """检查是否完成"""
        object_pos = self._get_object_position()
        
        # 检查物体是否被举起
        if object_pos[2] > 0.15:
            return True
        
        return False
    
    def _get_object_position(self):
        """获取物体位置"""
        pos, orn = p.getBasePositionAndOrientation(self.object_id)
        return pos
    
    def _get_end_effector_position(self):
        """获取末端执行器位置"""
        link_state = p.getLinkState(self.robot_id, self.end_effector_index)
        return link_state[0]
    
    def get_gripper_width(self):
        """获取夹爪开度"""
        # Panda的夹爪是最后一个关节
        gripper_joint = self.joint_indices[-1]
        state = p.getJointState(self.robot_id, gripper_joint)
        return state[0]
    
    def close(self):
        """关闭环境"""
        p.disconnect()
    
    def render(self):
        """渲染当前状态"""
        return self._get_observation()


def test_environment():
    """测试环境"""
    print("创建抓取仿真环境...")
    env = RobotGraspingEnv(render=True, gui=True)
    
    print("测试重置...")
    obs, _ = env.reset()
    print(f"观察形状: {obs.shape}")
    
    print("测试执行动作...")
    for i in range(20):
        action = np.random.uniform(-0.5, 0.5, 7)
        obs, reward, done, _, info = env.step(action)
        
        if i % 5 == 0:
            print(f"步骤{i}: reward={reward:.3f}, done={done}")
        
        if done:
            print("任务完成!")
            break
    
    env.close()
    print("环境测试完成!")


if __name__ == '__main__':
    test_environment()
```

### 2.3 运行测试

```bash
python -m envs.grasping_env
```

---

## 3. 抓取检测网络实现

### 3.1 GraspNet架构

抓取检测网络预测每个像素位置的最佳抓取参数：
- 抓取质量分数
- 抓取角度（θ）
- 抓取宽度（w）

创建 `models/graspnet.py`：

```python
"""
抓取检测网络 (GraspNet)
基于RGB-D图像预测抓取参数
"""

import torch
import torch.nn as nn
import torch.nn.functional as F


class GraspNet(nn.Module):
    """
    抓取检测网络
    
    输入: RGB-D图像
    输出: 
        - quality: 抓取质量分数 (H, W, 1)
        - angle: 抓取角度 (H, W, 18) - 0-180度，每10度一个通道
        - width: 抓取宽度 (H, W, 1)
    """
    
    def __init__(self, input_channels=4, num_angle_bins=18):
        super().__init__()
        
        self.num_angle_bins = num_angle_bins
        
        # 编码器（ResNet风格）
        self.encoder = nn.Sequential(
            # Block 1
            self._conv_block(input_channels, 32, kernel_size=3, stride=2),
            self._conv_block(32, 32, kernel_size=3, stride=1),
            
            # Block 2
            self._conv_block(32, 64, kernel_size=3, stride=2),
            self._conv_block(64, 64, kernel_size=3, stride=1),
            
            # Block 3
            self._conv_block(64, 128, kernel_size=3, stride=2),
            self._conv_block(128, 128, kernel_size=3, stride=1),
            
            # Block 4
            self._conv_block(128, 256, kernel_size=3, stride=2),
            self._conv_block(256, 256, kernel_size=3, stride=1),
        )
        
        # 解码器
        self.decoder = nn.Sequential(
            # Decoder Block 1
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            
            # Decoder Block 2
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            
            # Decoder Block 3
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            
            # Decoder Block 4
            nn.ConvTranspose2d(32, 32, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
        )
        
        # 预测头
        self.quality_head = nn.Conv2d(32, 1, kernel_size=1)
        self.angle_head = nn.Conv2d(32, num_angle_bins, kernel_size=1)
        self.width_head = nn.Conv2d(32, 1, kernel_size=1)
    
    def _conv_block(self, in_channels, out_channels, kernel_size=3, stride=1):
        """卷积块"""
        padding = kernel_size // 2
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size, 1, padding),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x):
        """
        前向传播
        
        参数:
            x: 输入张量 (B, C, H, W)
               C = 4 (RGB + Depth)
        
        返回:
            quality: 抓取质量 (B, 1, H, W)
            angle: 抓取角度 (B, 18, H, W)
            width: 抓取宽度 (B, 1, H, W)
        """
        # 编码
        features = self.encoder(x)
        
        # 解码
        decoded = self.decoder(features)
        
        # 预测
        quality = torch.sigmoid(self.quality_head(decoded))
        angle = self.angle_head(decoded)  # softmax在损失函数中
        width = torch.sigmoid(self.width_head(decoded)) * 0.15  # 最大宽度15cm
        
        return quality, angle, width
    
    def predict_grasp(self, x):
        """
        预测最佳抓取
        
        参数:
            x: 输入图像 (1, C, H, W)
        
        返回:
            best_grasp: (x, y, theta, width) 归一化到[0,1]
        """
        self.eval()
        with torch.no_grad():
            quality, angle, width = self.forward(x)
        
        # 找到最高质量的位置
        quality = quality.squeeze().cpu().numpy()
        angle_map = angle.squeeze().cpu().numpy()
        width_map = width.squeeze().cpu().numpy()
        
        # 质量最大的位置
        idx = np.argmax(quality)
        y, x = idx // quality.shape[1], idx % quality.shape[1]
        
        # 角度
        angle_probs = angle_map[:, y, x]
        angle_idx = np.argmax(angle_probs)
        theta = angle_idx * (180 / self.num_angle_bins)
        
        # 宽度
        w = width_map[0, y, x]
        
        return {
            'x': x / quality.shape[1],
            'y': y / quality.shape[0],
            'theta': theta / 180,
            'width': w
        }


class GraspLoss(nn.Module):
    """抓取检测损失函数"""
    
    def __init__(self, angle_weight=1.0, width_weight=0.5):
        super().__init__()
        self.angle_weight = angle_weight
        self.width_weight = width_weight
    
    def forward(self, pred_quality, pred_angle, pred_width, 
                target_quality, target_angle, target_width, target_mask):
        """
        计算损失
        
        参数:
            pred_*: 预测值
            target_*: 目标值
            target_mask: 有效样本掩码
        """
        # 质量损失（二元交叉熵）
        quality_loss = F.binary_cross_entropy(
            pred_quality, target_quality, reduction='none'
        )
        quality_loss = (quality_loss * target_mask).sum() / (target_mask.sum() + 1e-6)
        
        # 角度损失（交叉熵）
        angle_loss = F.cross_entropy(
            pred_angle, target_angle.long(), reduction='none'
        )
        angle_loss = (angle_loss * target_mask.squeeze(1)).sum() / (target_mask.sum() + 1e-6)
        
        # 宽度损失（L1）
        width_loss = F.l1_loss(
            pred_width, target_width, reduction='none'
        )
        width_loss = (width_loss * target_mask).sum() / (target_mask.sum() + 1e-6)
        
        # 总损失
        total_loss = quality_loss + self.angle_weight * angle_loss + self.width_weight * width_loss
        
        return total_loss, {
            'quality_loss': quality_loss.item(),
            'angle_loss': angle_loss.item(),
            'width_loss': width_loss.item()
        }


def create_model(input_channels=4, num_angle_bins=18):
    """创建抓取网络模型"""
    model = GraspNet(input_channels=input_channels, num_angle_bins=num_angle_bins)
    return model


# 测试模型
if __name__ == '__main__':
    model = create_model()
    print(f"模型参数数量: {sum(p.numel() for p in model.parameters()):,}")
    
    # 测试前向传播
    x = torch.randn(2, 4, 224, 224)
    quality, angle, width = model(x)
    
    print(f"输入: {x.shape}")
    print(f"质量: {quality.shape}")
    print(f"角度: {angle.shape}")
    print(f"宽度: {width.shape}")
```

---

## 4. 视觉伺服控制

### 4.1 视觉伺服基础

视觉伺服（Visual Servoing）使用视觉反馈控制机器人运动。

**两种控制策略**：
1. **位置-based (PBVS)**：估计目标3D位置，计算期望关节角度
2. **图像-based (IBVS)**：直接在图像空间控制

创建 `utils/visual_servo.py`：

```python
"""
视觉伺服控制器
"""

import numpy as np
import torch
from scipy.spatial.transform import Rotation as R


class ImageBasedVisualServo:
    """
    基于图像的视觉伺服控制器
    
    控制末端执行器跟踪图像中的目标点
    """
    
    def __init__(self, camera_config, lambda_=0.5):
        """
        初始化视觉伺服控制器
        
        参数:
            camera_config: 相机内参和配置
            lambda_: 控制增益
        """
        self.K = camera_config['intrinsics']  # 相机内参矩阵
        self.lambda_ = lambda_
        
        # 交互矩阵参数
        self.Z = 0.5  # 假设深度
    
    def compute_interaction_matrix(self, point, Z=None):
        """
        计算交互矩阵（2×6）
        
        参数:
            point: 图像点 (u, v)
            Z: 点的深度
        
        返回:
            L: 交互矩阵
        """
        if Z is None:
            Z = self.Z
        
        u, v = point
        
        L = np.array([
            [-1/Z, 0, u/Z, u*v, -(1+u**2), v],
            [0, -1/Z, v/Z, 1+v**2, -u*v, -u]
        ])
        
        return L
    
    def compute_velocity(self, current_points, desired_points, Z=None):
        """
        计算末端执行器速度
        
        参数:
            current_points: 当前图像点
            desired_points: 期望图像点
            Z: 深度
        
        返回:
            v: 线速度和角速度 (6,)
        """
        # 计算图像误差
        error = np.array(current_points) - np.array(desired_points)
        error = error.flatten()
        
        # 堆叠交互矩阵
        L_total = []
        for cp in current_points:
            L_total.append(self.compute_interaction_matrix(cp, Z))
        L_total = np.vstack(L_total)
        
        # 计算伪逆
        try:
            L_pinv = np.linalg.pinv(L_total)
            v = -self.lambda_ * L_pinv @ error
        except:
            v = np.zeros(6)
        
        return v
    
    def compute_joint_velocities(self, end_effector_pose, velocity, jacobian):
        """
        将末端速度转换为关节速度
        
        参数:
            end_effector_pose: 末端执行器位姿 (x, y, z, roll, pitch, yaw)
            velocity: 末端速度 (vx, vy, vz, wx, wy, wz)
            jacobian: 雅可比矩阵
        
        返回:
            q_dot: 关节速度
        """
        # 速度到齐次变换
        T = self.pose_to_matrix(end_effector_pose)
        
        # 提取旋转部分
        R_ee = T[:3, :3]
        
        # 速度转换（从末端坐标系到基座坐标系）
        v_linear = R_ee @ velocity[:3]
        v_angular = R_ee @ velocity[3:]
        
        # 组合速度
        v_cartesian = np.concatenate([v_linear, v_angular])
        
        # 计算关节速度
        q_dot = np.linalg.pinv(jacobian) @ v_cartesian
        
        return q_dot
    
    @staticmethod
    def pose_to_matrix(pose):
        """将位姿转为齐次变换矩阵"""
        position = pose[:3]
        euler = pose[3:]
        
        rotation = R.from_euler('xyz', euler)
        R_matrix = rotation.as_matrix()
        
        T = np.eye(4)
        T[:3, :3] = R_matrix
        T[:3, 3] = position
        
        return T
    
    def pixel_to_3d(self, u, v, Z=None):
        """
        将像素坐标转换为3D点
        
        参数:
            u, v: 像素坐标
            Z: 深度
        
        返回:
            point_3d: 3D点 (x, y, z)
        """
        if Z is None:
            Z = self.Z
        
        fx, fy = self.K[0, 0], self.K[1, 1]
        cx, cy = self.K[0, 2], self.K[1, 2]
        
        x = (u - cx) * Z / fx
        y = (v - cy) * Z / fy
        
        return np.array([x, y, Z])


class PositionBasedVisualServo:
    """
    基于位置的视觉伺服控制器
    
    估计目标3D位置，控制机械臂到达
    """
    
    def __init__(self, camera_config, lambda_=0.5):
        self.K = camera_config['intrinsics']
        self.lambda_ = lambda_
    
    def estimate_3d_position(self, point, depth_map):
        """
        从深度图估计3D位置
        
        参数:
            point: 图像点 (u, v)
            depth_map: 深度图
        
        返回:
            position_3d: 3D位置
        """
        u, v = int(point[0]), int(point[1])
        
        if v < depth_map.shape[0] and u < depth_map.shape[1]:
            Z = depth_map[v, u]
        else:
            Z = 0.5  # 默认深度
        
        fx, fy = self.K[0, 0], self.K[1, 1]
        cx, cy = self.K[0, 2], self.K[1, 2]
        
        x = (u - cx) * Z / fx
        y = (v - cy) * Z / fy
        
        return np.array([x, y, Z])
    
    def compute_end_effector_velocity(self, current_pos, target_pos):
        """
        计算末端执行器速度
        
        参数:
            current_pos: 当前末端位置
            target_pos: 目标位置
        
        返回:
            velocity: 末端速度 (vx, vy, vz, wx, wy, wz)
        """
        # 位置误差
        position_error = np.array(target_pos) - np.array(current_pos)
        
        # 线速度（比例控制）
        v_linear = self.lambda_ * position_error
        
        # 角速度（设为0，简化为纯位置控制）
        v_angular = np.zeros(3)
        
        return np.concatenate([v_linear, v_angular])


class HybridVisualServo:
    """
    混合视觉伺服控制器
    结合位置和图像控制
    """
    
    def __init__(self, camera_config, lambda_=0.5, alpha=0.5):
        self.position_controller = PositionBasedVisualServo(camera_config, lambda_)
        self.image_controller = ImageBasedVisualServo(camera_config, lambda_)
        self.alpha = alpha  # 混合权重
    
    def compute_velocity(self, current_pos, current_pixels, 
                        target_pos, target_pixels, depth_map):
        """
        计算混合速度
        
        参数:
            current_pos: 当前末端位置
            current_pixels: 当前图像点
            target_pos: 目标位置
            target_pixels: 目标图像点
            depth_map: 深度图
        """
        # 位置控制
        v_position = self.position_controller.compute_end_effector_velocity(
            current_pos, target_pos
        )
        
        # 图像控制
        target_3d = self.position_controller.estimate_3d_position(target_pixels, depth_map)
        current_3d = self.position_controller.estimate_3d_position(current_pixels, depth_map)
        
        v_image = self.image_controller.compute_velocity(
            [current_pixels], [target_pixels], 
            Z=(target_3d[2] + current_3d[2]) / 2
        )
        
        # 混合
        v_total = self.alpha * v_position + (1 - self.alpha) * np.append(v_image, np.zeros(3))
        
        return v_total
```

---

## 5. 强化学习抓取策略

### 5.1 抓取任务建模

**状态**：RGB-D图像 + 机械臂关节状态  
**动作**：末端执行器位姿变化  
**奖励**：成功抓取正奖励，距离惩罚

### 5.2 PPO抓取策略

创建 `models/grasp_policy.py`：

```python
"""
抓取强化学习策略
使用PPO算法训练
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal


class GraspPolicyNetwork(nn.Module):
    """
    抓取策略网络
    
    输入: 
        - 图像特征 (来自CNN)
        - 机械臂状态
    输出:
        - 动作均值和方差
    """
    
    def __init__(self, image_feature_dim=512, state_dim=14, action_dim=7, hidden_dim=256):
        super().__init__()
        
        self.action_dim = action_dim
        
        # 图像特征处理
        self.image_conv = nn.Sequential(
            nn.Conv2d(3, 32, 3, stride=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=2),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((7, 7)),
            nn.Flatten()
        )
        
        # 特征融合
        self.fusion = nn.Sequential(
            nn.Linear(128 * 7 * 7 + state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        
        # 动作均值
        self.mean_head = nn.Linear(hidden_dim, action_dim)
        
        # 动作方差（可学习或固定）
        self.log_std = nn.Parameter(torch.zeros(action_dim))
    
    def forward(self, image, state):
        """
        前向传播
        
        参数:
            image: 图像 (B, 3, 224, 224)
            state: 机械臂状态 (B, 14) = 7关节位置 + 7关节速度
        
        返回:
            action_mean: 动作均值
            action_std: 动作标准差
        """
        # 图像特征
        image_features = self.image_conv(image)
        
        # 融合
        combined = torch.cat([image_features, state], dim=-1)
        fused = self.fusion(combined)
        
        # 动作均值
        action_mean = self.mean_head(fused)
        
        # 动作标准差
        action_std = torch.exp(self.log_std).expand_as(action_mean)
        
        return action_mean, action_std
    
    def sample_action(self, image, state, deterministic=False):
        """
        采样动作
        
        参数:
            deterministic: 是否使用确定性策略
        """
        action_mean, action_std = self.forward(image, state)
        
        if deterministic:
            return action_mean
        
        dist = Normal(action_mean, action_std)
        action = dist.sample()
        action = torch.tanh(action)  # 限制到[-1, 1]
        
        return action
    
    def get_log_prob(self, image, state, action):
        """计算动作的对数概率"""
        action_mean, action_std = self.forward(image, state)
        
        dist = Normal(action_mean, action_std)
        log_prob = dist.log_prob(action)
        
        return log_prob.sum(dim=-1)


class GraspPPO:
    """
    抓取PPO算法实现
    """
    
    def __init__(self, image_feature_dim, state_dim, action_dim,
                 lr=3e-4, gamma=0.99, lamda=0.95, clip_epsilon=0.2,
                 value_coef=0.5, entropy_coef=0.01):
        self.gamma = gamma
        self.lamda = lamda
        self.clip_epsilon = clip_epsilon
        self.value_coef = value_coef
        self.entropy_coef = entropy_coef
        
        # 网络
        self.policy = GraspPolicyNetwork(image_feature_dim, state_dim, action_dim)
        self.value_network = nn.Sequential(
            nn.Linear(128 * 7 * 7 + state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
        
        # 优化器
        self.optimizer = torch.optim.Adam(
            list(self.policy.parameters()) + list(self.value_network.parameters()),
            lr=lr
        )
    
    def compute_gae(self, rewards, values, dones, next_value):
        """计算GAE"""
        advantages = []
        gae = 0
        
        next_value = torch.tensor(next_value) if not isinstance(next_value, torch.Tensor) else next_value
        
        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_val = next_value
            else:
                next_val = values[t + 1]
            
            delta = rewards[t] + self.gamma * next_val * (1 - dones[t]) - values[t]
            gae = delta + self.gamma * self.lamda * (1 - dones[t]) * gae
            advantages.insert(0, gae)
        
        returns = [adv + val for adv, val in zip(advantages, values)]
        
        return advantages, returns
    
    def update(self, batch):
        """更新策略"""
        states, actions, old_log_probs, advantages, returns = batch
        
        # 新策略
        new_log_probs = self.policy.get_log_prob(states, actions)
        entropy = self.policy.get_log_std.mean()
        
        # PPO损失
        ratio = torch.exp(new_log_probs - old_log_probs)
        surr1 = ratio * advantages
        surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages
        
        policy_loss = -torch.min(surr1, surr2).mean()
        entropy_loss = -entropy * self.entropy_coef
        
        # 价值损失
        values = self.value_network(states).squeeze()
        value_loss = F.mse_loss(values, returns)
        
        # 总损失
        total_loss = policy_loss + entropy_loss + self.value_coef * value_loss
        
        # 反向传播
        self.optimizer.zero_grad()
        total_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 0.5)
        self.optimizer.step()
        
        return {
            'policy_loss': policy_loss.item(),
            'value_loss': value_loss.item(),
            'entropy': entropy.item()
        }
    
    def save(self, path):
        """保存模型"""
        torch.save({
            'policy': self.policy.state_dict(),
            'value': self.value_network.state_dict()
        }, path)
    
    def load(self, path):
        """加载模型"""
        checkpoint = torch.load(path)
        self.policy.load_state_dict(checkpoint['policy'])
        self.value_network.load_state_dict(checkpoint['value'])
```

---

## 6. Sim-to-Real迁移

### 6.1 Sim-to-Real挑战

仿真到真实的迁移面临以下挑战：
- **物理参数差异**：摩擦力、惯性、质量
- **视觉差异**：纹理、光照、噪声
- **控制延迟**：仿真无延迟，真实系统有延迟

### 6.2 域随机化

创建 `utils/domain_randomization.py`：

```python
"""
域随机化工具
增加仿真多样性以提高迁移成功率
"""

import numpy as np
import pybullet as p


class DomainRandomizer:
    """
    域随机化
    
    随机化仿真参数以提高泛化能力
    """
    
    def __init__(self, physics_client):
        self.physics_client = physics_client
        
        # 参数范围
        self.params = {
            'friction': (0.5, 2.0),
            'mass': (0.8, 1.2),
            'gravity': (9.5, 10.0),
            'camera_noise': (0, 0.05),
            'light_intensity': (0.7, 1.3)
        }
    
    def randomize(self):
        """应用随机化"""
        self._randomize_physics()
        self._randomize_visual()
    
    def _randomize_physics(self):
        """随机化物理参数"""
        # 摩擦力
        friction = np.random.uniform(*self.params['friction'])
        for i in range(p.getNumBodies()):
            p.changeDynamics(i, -1, lateralFriction=friction)
        
        # 重力
        gravity = np.random.uniform(*self.params['gravity'])
        p.setGravity(0, 0, -gravity)
    
    def _randomize_visual(self):
        """随机化视觉参数"""
        # 可以在这里修改渲染参数
        pass
    
    def get_camera_noise(self):
        """获取相机噪声"""
        return np.random.normal(0, self.params['camera_noise'][1], size=(224, 224, 3))
    
    def apply_camera_noise(self, image):
        """对图像添加噪声"""
        noise = self.get_camera_noise()
        noisy_image = np.clip(image + noise * 255, 0, 255).astype(np.uint8)
        return noisy_image


class TextureRandomizer:
    """纹理随机化"""
    
    def __init__(self):
        self.textures = []
    
    def load_textures(self, texture_paths):
        """加载纹理"""
        for path in texture_paths:
            texture_id = p.loadTexture(path)
            self.textures.append(texture_id)
    
    def apply_random_texture(self, body_id):
        """应用随机纹理"""
        if self.textures:
            texture_id = np.random.choice(self.textures)
            p.changeVisualShape(
                body_id, -1,
                textureUniqueId=texture_id
            )
```

### 6.3 域适应

创建 `utils/adaptation.py`：

```python
"""
域适应方法
"""

import torch
import torch.nn as nn


class DomainAdaptationNetwork(nn.Module):
    """
    域适应网络
    
    学习域不变特征
    """
    
    def __init__(self, feature_dim=256):
        super().__init__()
        
        # 特征提取器
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(3, 32, 3, stride=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=2),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((7, 7)),
            nn.Flatten(),
            nn.Linear(128 * 7 * 7, feature_dim)
        )
        
        # 域分类器（用于对抗训练）
        self.domain_classifier = nn.Sequential(
            nn.Linear(feature_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
    
    def forward(self, x, alpha=1.0):
        """
        前向传播
        
        参数:
            x: 输入图像
            alpha: 梯度反转强度
        """
        features = self.feature_extractor(x)
        
        # 域预测（带梯度反转）
        if self.training:
            domain_output = self.domain_classifier(features)
            # 梯度反转
            domain_output = domain_output * -alpha
        else:
            domain_output = None
        
        return features, domain_output


def train_domain_adaptation(source_data, target_data, model, epochs=100):
    """
    训练域适应
    
    目标是最小化源域和目标域之间的差异
    """
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.BCEWithLogitsLoss()
    
    for epoch in range(epochs):
        # 源域损失
        source_features, source_domain = model(source_data)
        source_loss = criterion(source_domain, torch.ones(len(source_data), 1))
        
        # 目标域损失
        target_features, target_domain = model(target_data)
        target_loss = criterion(target_domain, torch.zeros(len(target_data), 1))
        
        # 特征对齐损失
        feature_loss = torch.mean(torch.abs(source_features - target_features))
        
        # 总损失
        total_loss = source_loss + target_loss + 0.1 * feature_loss
        
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}: loss={total_loss.item():.4f}")
```

---

## 项目总结

### 完成清单

1. ✅ 创建PyBullet抓取仿真环境
2. ✅ 实现抓取检测网络（GraspNet）
3. ✅ 实现视觉伺服控制器
4. ✅ 训练PPO抓取策略
5. ✅ 域随机化和Sim-to-Real基础

### 扩展方向

1. **更真实的仿真**：使用Gazebo + MoveIt
2. **更好的抓取检测**：使用深度学习的抓取检测
3. **多指抓取**：扩展到灵巧手抓取
4. **实际部署**：在真实机器人上测试

### 参考资源

- [PyBullet文档](https://docs.google.com/document/d/10sXE4F_s漏接)
- [GraspNet论文](https://arxiv.org/abs/1811.06426)
- [Visual Servoing书籍](http://www.robotics-book.org/)

---

**下一步**：继续学习阶段七的具身智能核心算法
