# 项目五：RGB-D 物体6D位姿估计与抓取

## 项目概述

构建基于RGB-D的物体位姿估计流水线，用于机器人抓取：
- 点云预处理（滤波、降采样、法向量估计）
- 全局配准（FPFH特征 + RANSAC）
- ICP精炼
- 抓取姿态生成
- 评估与可视化

## 难度：★★★★☆ (4/5)
## 预估时间：3-4周

---

## 1. 项目结构

```
rgbd_pose/
├── data/
│   ├── models/          # 物体CAD模型 (ply/obj)
│   ├── scenes/          # 场景点云
│   └── results/         # 评估结果
├── src/
│   ├── preprocess.py    # 点云预处理
│   ├── global_reg.py    # 全局粗配准
│   ├── icp_refine.py    # ICP精配准
│   ├── grasp_gen.py     # 抓取姿态生成
│   ├── evaluate.py      # 评估脚本
│   └── visualize.py     # 可视化
├── config/
│   └── params.yaml
└── requirements.txt
```

---

## 2. 核心代码

### 2.1 点云预处理

```python
import open3d as o3d
import numpy as np

class PointCloudProcessor:
    """RGB-D点云预处理流水线"""

    def __init__(self, voxel_size=0.005):
        self.voxel_size = voxel_size

    def load_rgbd_to_pointcloud(self, rgb_path, depth_path, K, depth_scale=1000.0):
        """从RGB-D图像生成点云"""
        color = o3d.io.read_image(rgb_path)
        depth = o3d.io.read_image(depth_path)

        rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(
            color, depth,
            depth_scale=depth_scale,
            depth_trunc=3.0,
            convert_rgb_to_intensity=False
        )

        intrinsics = o3d.camera.PinholeCameraIntrinsic(
            width=color.width, height=color.height,
            fx=K[0,0], fy=K[1,1], cx=K[0,2], cy=K[1,2]
        )

        return o3d.geometry.PointCloud.create_from_rgbd_image(
            rgbd, intrinsics
        )

    def preprocess(self, pcd, crop_box=None):
        """预处理流水线"""
        # 1. 裁剪工作空间
        if crop_box is not None:
            pcd = pcd.crop(
                o3d.geometry.AxisAlignedBoundingBox(*crop_box)
            )

        # 2. 降采样（保持几何结构）
        pcd = pcd.voxel_down_sample(self.voxel_size)

        # 3. 统计滤波去除离群点
        pcd, _ = pcd.remove_statistical_outlier(
            nb_neighbors=30, std_ratio=2.0
        )

        # 4. 估计法向量
        pcd.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(
                radius=self.voxel_size * 2, max_nn=30
            )
        )

        # 5. 法向量一致性定向（朝向相机）
        pcd.orient_normals_towards_camera_location(
            np.array([0, 0, 0])
        )

        return pcd

    def segment_plane_and_objects(self, pcd, distance_threshold=0.01,
                                   ransac_n=3, num_iterations=1000):
        """RANSAC平面分割：分离桌面和物体"""
        # 分割最大平面（桌面）
        plane_model, inliers = pcd.segment_plane(
            distance_threshold=distance_threshold,
            ransac_n=ransac_n,
            num_iterations=num_iterations
        )
        [a, b, c, d] = plane_model
        print(f"平面方程: {a:.3f}x + {b:.3f}y + {c:.3f}z + {d:.3f} = 0")

        # 桌面点云和物体点云
        plane_pcd = pcd.select_by_index(inliers)
        objects_pcd = pcd.select_by_index(inliers, invert=True)

        return plane_pcd, objects_pcd

    def cluster_objects(self, objects_pcd, eps=0.02, min_points=100):
        """DBSCAN聚类：分离单个物体"""
        labels = np.array(
            objects_pcd.cluster_dbscan(eps=eps, min_points=min_points)
        )

        clusters = []
        for label in np.unique(labels):
            if label == -1:  # 噪声
                continue
            cluster = objects_pcd.select_by_index(
                np.where(labels == label)[0]
            )
            clusters.append(cluster)

        print(f"找到 {len(clusters)} 个物体簇")
        return clusters
```

### 2.2 全局配准 (FPFH + RANSAC)

```python
class GlobalRegistrator:
    """FPFH特征 + RANSAC全局粗配准"""

    def __init__(self, voxel_size=0.005):
        self.voxel_size = voxel_size

    def compute_fpfh(self, pcd):
        """计算FPFH特征"""
        radius_normal = self.voxel_size * 2
        radius_feature = self.voxel_size * 5

        pcd.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(
                radius=radius_normal, max_nn=30
            )
        )

        fpfh = o3d.pipelines.registration.compute_fpfh_feature(
            pcd,
            o3d.geometry.KDTreeSearchParamHybrid(
                radius=radius_feature, max_nn=100
            )
        )
        return fpfh

    def register(self, source_pcd, target_pcd):
        """FPFH + RANSAC 全局配准"""
        source_fpfh = self.compute_fpfh(source_pcd)
        target_fpfh = self.compute_fpfh(target_pcd)

        distance_threshold = self.voxel_size * 1.5

        result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
            source_pcd, target_pcd,
            source_fpfh, target_fpfh,
            mutual_filter=True,
            max_correspondence_distance=distance_threshold,
            estimation_method=o3d.pipelines.registration.
                TransformationEstimationPointToPoint(False),
            ransac_n=3,
            checkers=[
                o3d.pipelines.registration.
                    CorrespondenceCheckerBasedOnEdgeLength(0.9),
                o3d.pipelines.registration.
                    CorrespondenceCheckerBasedOnDistance(distance_threshold),
            ],
            criteria=o3d.pipelines.registration.
                RANSACConvergenceCriteria(1000000, 500)
        )

        print(f"RANSAC配准: fit={result.fitness:.4f}, rmse={result.inlier_rmse:.4f}")
        return result.transformation, result
```

### 2.3 ICP精配准

```python
class ICPRefiner:
    """ICP多阶段精配准"""

    def __init__(self):
        self.results = []

    def refine(self, source, target, init_transform,
               max_correspondence_distances=[0.05, 0.02, 0.01, 0.005]):
        """多分辨率ICP"""
        current_transform = init_transform

        for i, dist in enumerate(max_correspondence_distances):
            reg = o3d.pipelines.registration.registration_icp(
                source, target, dist, current_transform,
                o3d.pipelines.registration.
                    TransformationEstimationPointToPoint(),
                o3d.pipelines.registration.
                    ICPConvergenceCriteria(
                        relative_fitness=1e-6,
                        relative_rmse=1e-6,
                        max_iteration=100
                    )
            )

            current_transform = reg.transformation
            self.results.append({
                'level': i,
                'distance_threshold': dist,
                'fitness': reg.fitness,
                'inlier_rmse': reg.inlier_rmse,
            })
            print(f"  Level {i}: dist={dist:.3f}, "
                  f"fitness={reg.fitness:.4f}, rmse={reg.inlier_rmse:.4f}")

        return current_transform

    def colored_icp(self, source, target, init_transform):
        """彩色ICP（利用颜色信息）"""
        # 体素降采样
        source_down = source.voxel_down_sample(0.005)
        target_down = target.voxel_down_sample(0.005)

        source_down.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(radius=0.01, max_nn=30)
        )
        target_down.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(radius=0.01, max_nn=30)
        )

        result = o3d.pipelines.registration.registration_colored_icp(
            source_down, target_down, 0.02, init_transform,
            o3d.pipelines.registration.
                TransformationEstimationForColoredICP(),
            o3d.pipelines.registration.
                ICPConvergenceCriteria(
                    relative_fitness=1e-6, relative_rmse=1e-6,
                    max_iteration=100
                )
        )
        return result.transformation
```

### 2.4 抓取姿态生成

```python
class GraspGenerator:
    """从物体位姿生成抓取候选"""

    def __init__(self, gripper_width=0.08):
        self.gripper_width = gripper_width

    def generate_top_grasps(self, object_pose, n_angles=8):
        """生成顶部抓取候选"""
        grasps = []
        center = object_pose[:3, 3]

        for i in range(n_angles):
            angle = 2 * np.pi * i / n_angles
            # 旋转绕z轴的抓取姿态
            R_grasp = np.array([
                [np.cos(angle), -np.sin(angle), 0],
                [np.sin(angle),  np.cos(angle), 0],
                [0,              0,             1]
            ])

            T_grasp = np.eye(4)
            T_grasp[:3, :3] = R_grasp
            T_grasp[:3, 3] = center + np.array([0, 0, 0.1])  # 上方10cm接近

            grasps.append(T_grasp)

        return grasps

    def score_grasp(self, grasp_pose, object_pcd, table_pcd):
        """启发式抓取评分"""
        score = 0.0

        # 1. 接近方向：抓取器Z轴应接近垂直
        approach_dir = grasp_pose[:3, 2]
        gravity_dir = np.array([0, 0, -1])
        alignment = abs(np.dot(approach_dir, gravity_dir))
        score += alignment * 3

        # 2. 远离桌面碰撞
        grasp_height = grasp_pose[2, 3]
        if grasp_height > 0.05:  # 高于桌面5cm
            score += 2

        # 3. 物体中心距离
        obj_center = np.mean(np.asarray(object_pcd.points), axis=0)
        dist_to_center = np.linalg.norm(grasp_pose[:3, 3] - obj_center)
        score += np.exp(-dist_to_center * 10) * 2

        return score
```

### 2.5 端到端位姿估计流水线

```python
class PoseEstimationPipeline:
    """完整6D位姿估计流水线"""

    def __init__(self, model_paths, voxel_size=0.005):
        """
        model_paths: dict {物体名: 模型点云路径}
        """
        self.processor = PointCloudProcessor(voxel_size)
        self.global_reg = GlobalRegistrator(voxel_size)
        self.icp = ICPRefiner()

        # 加载物体模型并预处理
        self.models = {}
        for name, path in model_paths.items():
            model = o3d.io.read_point_cloud(path)
            model = self.processor.preprocess(model)
            # 模型可能尺度不一致，归一化到单位球内
            bbox = model.get_axis_aligned_bounding_box()
            scale = 1.0 / max(bbox.get_extent())
            model.scale(scale, center=model.get_center())
            self.models[name] = model

        print(f"加载了 {len(self.models)} 个物体模型")

    def estimate(self, scene_pcd, target_objects=None):
        """
        估计场景中物体的6D位姿
        返回: list of {name, T, fitness, rmse}
        """
        if target_objects is None:
            target_objects = list(self.models.keys())

        # 预处理
        scene_pcd = self.processor.preprocess(scene_pcd)

        # 分割物体
        _, objects_pcd = self.processor.segment_plane_and_objects(scene_pcd)
        clusters = self.processor.cluster_objects(objects_pcd)

        results = []
        for cluster in clusters:
            best_fitness = 0
            best_result = None

            for name in target_objects:
                model = self.models[name]

                # 全局配准
                T_ransac, ransac_result = self.global_reg.register(
                    model, cluster
                )

                # ICP精炼
                T_refined = self.icp.refine(model, cluster, T_ransac)

                # 评估
                source_temp = o3d.geometry.PointCloud(model)
                source_temp.transform(T_refined)
                eval_result = o3d.pipelines.registration.evaluate_registration(
                    source_temp, cluster, 0.01, T_refined
                )

                if eval_result.fitness > best_fitness:
                    best_fitness = eval_result.fitness
                    best_result = {
                        'name': name,
                        'transform': T_refined,
                        'fitness': eval_result.fitness,
                        'rmse': eval_result.inlier_rmse,
                    }

            if best_result and best_result['fitness'] > 0.3:
                results.append(best_result)
                print(f"物体: {best_result['name']}, "
                      f"fitness={best_result['fitness']:.3f}")

        return results
```

---

## 3. 评估指标

```python
def evaluate_pose(estimated, ground_truth):
    """评估位姿估计精度"""
    R_est, t_est = estimated[:3,:3], estimated[:3,3]
    R_gt, t_gt = ground_truth[:3,:3], ground_truth[:3,3]

    # 平移误差
    translation_error = np.linalg.norm(t_est - t_gt)

    # 旋转误差 (角度)
    R_diff = R_est @ R_gt.T
    # 从旋转矩阵提取角度
    trace = np.clip((np.trace(R_diff) - 1) / 2, -1, 1)
    rotation_error = np.arccos(trace) * 180 / np.pi

    # ADD-S指标 (Average Distance of model points)
    # 用模型点云采样计算

    return {
        'translation_error_mm': translation_error * 1000,
        'rotation_error_deg': rotation_error,
    }
```

---

## 4. 验收标准

1. **点云预处理**：能正确降采样、去噪、分割桌面和物体（IoU > 0.85）
2. **位姿估计**：在BOP挑战格式的数据集上，ADD-S < 2cm 且 < 5° 的物体 > 70%
3. **ICP收敛**：粗配准后的ICP能在30步内收敛，最终RMSE < 5mm
4. **端到端Demo**：任给一张RGB-D图像 → 输出物体类型和6D位姿 → 可视化

---

## 5. 数据集

- **BOP Challenge**: https://bop.felk.cvut.cz/datasets/
- **YCB-Video**: 21个日常物体的视频序列
- **LineMOD**: 15个无纹理物体的标准基准

## 参考资源

- [Open3D Tutorial](http://www.open3d.org/docs/release/tutorial/pipelines/global_registration.html)
- [BOP Toolkit](https://github.com/thodan/bop_toolkit)
- [ICP Paper](https://ieeexplore.ieee.org/document/121791)
