# 项目一：强化学习机器人导航

## 项目概述

本项目将从头构建一个基于深度强化学习的机器人导航系统。你将学习：
- 如何将机器人导航问题建模为马尔可夫决策过程（MDP）
- 如何实现Q-Learning和DQN算法
- 如何在Gymnasium环境中训练和评估智能体
- 如何实现Sim-to-Real迁移的基本概念

**难度**：⭐⭐⭐  
**预计时间**：2周  
**前置知识**：Python基础、线性代数、强化学习基础

---

## 目录

1. [项目环境搭建](#1-项目环境搭建)
2. [问题定义与建模](#2-问题定义与建模)
3. [Q-Learning算法实现](#3-q-learning算法实现)
4. [深度Q网络(DQN)实现](#4-深度q网络dqn实现)
5. [训练与评估](#5-训练与评估)
6. [进阶：PPO算法](#6-进阶ppo算法)
7. [可视化与调试](#7-可视化与调试)

---

## 1. 项目环境搭建

### 1.1 创建项目目录

```bash
cd ~/Documents/具身智能
mkdir -p projects/robot_navigation
cd projects/robot_navigation
```

### 1.2 创建虚拟环境（推荐）

```bash
# 创建虚拟环境
python -m venv venv

# 激活虚拟环境
# macOS/Linux:
source venv/bin/activate
# Windows:
# venv\Scripts\activate
```

### 1.3 安装依赖

创建 `requirements.txt`：

```text
# 强化学习核心
gymnasium>=0.26.0
stable-baselines3>=1.6.0

# 深度学习框架（选择一种）
torch>=1.10.0  # 推荐PyTorch

# 可视化
matplotlib>=3.4.0
pygame>=2.0.0

# 数据处理
numpy>=1.21.0
```

安装依赖：

```bash
pip install -r requirements.txt
```

### 1.4 验证安装

创建 `check_env.py`：

```python
#!/usr/bin/env python3
"""验证环境安装"""

import sys
print(f"Python版本: {sys.version}")

# 检查核心库
try:
    import gymnasium
    print(f"✓ Gymnasium版本: {gymnasium.__version__}")
except ImportError:
    print("✗ Gymnasium未安装")

try:
    import torch
    print(f"✓ PyTorch版本: {torch.__version__}")
    print(f"  CUDA可用: {torch.cuda.is_available()}")
except ImportError:
    print("✗ PyTorch未安装")

try:
    import numpy as np
    print(f"✓ NumPy版本: {np.__version__}")
except ImportError:
    print("✗ NumPy未安装")

try:
    import matplotlib.pyplot as plt
    print(f"✓ Matplotlib版本: {plt.matplotlib.__version__}")
except ImportError:
    print("✗ Matplotlib未安装")

print("\n环境验证完成!")
```

运行：

```bash
python check_env.py
```

---

## 2. 问题定义与建模

### 2.1 机器人导航问题

**目标**：让机器人在二维网格世界中从起始位置移动到目标位置，同时避开障碍物。

**环境描述**：
- 网格世界：10×10的格子
- 状态：机器人位置 (x, y)
- 动作：上、下、左、右 四个方向
- 奖励：接近目标正奖励，碰到障碍物负奖励

### 2.2 创建自定义Gymnasium环境

创建 `envs/grid_world.py`：

```python
"""
自定义网格世界导航环境
基于Gymnasium API实现
"""

import numpy as np
import gymnasium as gym
from gymnasium import spaces


class GridWorldEnv(gym.Env):
    """
    10×10网格世界导航环境
    
    状态空间：机器人位置 (0-99)
    动作空间：0=上, 1=下, 2=左, 3=右
    """
    
    metadata = {'render_modes': ['human', 'rgb_array'], 'render_fps': 10}
    
    def __init__(self, grid_size=10, max_steps=100, render_mode=None):
        super().__init__()
        
        self.grid_size = grid_size
        self.max_steps = max_steps
        self.render_mode = render_mode
        
        # 状态空间：grid_size × grid_size
        self.observation_space = spaces.Discrete(grid_size * grid_size)
        
        # 动作空间：4个方向
        self.action_space = spaces.Discrete(4)
        
        # 定义网格（0=可通过, 1=障碍物, 2=目标）
        self.grid = np.zeros((grid_size, grid_size), dtype=np.int8)
        
        # 设置障碍物
        self._setup_obstacles()
        
        # 状态
        self.agent_pos = None
        self.goal_pos = None
        self.steps = None
        
        # 渲染
        self.window = None
        self.clock = None
    
    def _setup_obstacles(self):
        """设置障碍物布局"""
        # L形障碍
        self.grid[3, 3:7] = 1
        self.grid[3:8, 3] = 1
        
        # 分散的障碍
        self.grid[1, 5] = 1
        self.grid[5, 1] = 1
        self.grid[7, 8] = 1
        self.grid[8, 2] = 1
    
    def _pos_to_state(self, pos):
        """位置转状态"""
        return pos[0] * self.grid_size + pos[1]
    
    def _state_to_pos(self, state):
        """状态转位置"""
        return (state // self.grid_size, state % self.grid_size)
    
    def reset(self, seed=None, options=None):
        """重置环境"""
        super().reset(seed=seed)
        
        # 随机设置起点（角落区域）
        self.agent_pos = (
            self.np_random.integers(0, 3),
            self.np_random.integers(0, 3)
        )
        
        # 设置目标位置（对角区域）
        self.goal_pos = (
            self.np_random.integers(self.grid_size-3, self.grid_size),
            self.np_random.integers(self.grid_size-3, self.grid_size)
        )
        
        # 确保目标不在障碍物上
        while self.grid[self.goal_pos] == 1:
            self.goal_pos = (
                self.np_random.integers(self.grid_size-3, self.grid_size),
                self.np_random.integers(self.grid_size-3, self.grid_size)
            )
        
        self.steps = 0
        
        observation = self._pos_to_state(self.agent_pos)
        info = {}
        
        if self.render_mode == 'human':
            self._render_frame()
        
        return observation, info
    
    def step(self, action):
        """执行动作"""
        row, col = self.agent_pos
        
        # 动作映射：0=上, 1=下, 2=左, 3=右
        new_row, new_col = row, col
        if action == 0:  # 上
            new_row = max(0, row - 1)
        elif action == 1:  # 下
            new_row = min(self.grid_size - 1, row + 1)
        elif action == 2:  # 左
            new_col = max(0, col - 1)
        elif action == 3:  # 右
            new_col = min(self.grid_size - 1, col + 1)
        
        # 检查碰撞
        if self.grid[new_row, new_col] == 1:
            # 碰到障碍物，保持原位，给予负奖励
            reward = -1.0
            terminated = False
        else:
            self.agent_pos = (new_row, new_col)
            
            # 计算奖励
            distance = abs(new_row - self.goal_pos[0]) + abs(new_col - self.goal_pos[1])
            reward = -0.1  # 每步小惩罚
            
            # 到达目标
            if self.agent_pos == self.goal_pos:
                reward = 10.0
                terminated = True
            else:
                # 接近目标奖励
                reward += 0.1 * (self.grid_size * 2 - distance) / (self.grid_size * 2)
                terminated = False
        
        self.steps += 1
        
        # 超时
        if self.steps >= self.max_steps:
            terminated = True
            reward = -1.0
        
        observation = self._pos_to_state(self.agent_pos)
        info = {
            'agent_pos': self.agent_pos,
            'goal_pos': self.goal_pos,
            'steps': self.steps
        }
        
        if self.render_mode == 'human':
            self._render_frame()
        
        return observation, reward, terminated, False, info
    
    def render(self):
        """渲染环境"""
        if self.render_mode == 'human':
            return self._render_frame()
        return None
    
    def _render_frame(self):
        """绘制一帧"""
        import pygame
        import numpy as np
        
        if self.window is None:
            pygame.init()
            self.window = pygame.display.set_mode((500, 500))
            pygame.display.set_caption("Grid World Navigation")
        
        if self.clock is None:
            self.clock = pygame.time.Clock()
        
        # 清屏
        self.window.fill((30, 30, 40))
        
        # 绘制网格
        cell_size = 500 // self.grid_size
        
        for row in range(self.grid_size):
            for col in range(self.grid_size):
                rect = pygame.Rect(col * cell_size, row * cell_size, cell_size, cell_size)
                
                if self.grid[row, col] == 1:
                    pygame.draw.rect(self.window, (100, 100, 100), rect)
                elif (row, col) == self.goal_pos:
                    pygame.draw.rect(self.window, (0, 200, 100), rect)
                else:
                    pygame.draw.rect(self.window, (60, 60, 80), rect, 1)
        
        # 绘制起点
        start = (0, 0)
        start_rect = pygame.Rect(start[1] * cell_size, start[0] * cell_size, cell_size, cell_size)
        pygame.draw.rect(self.window, (100, 100, 200), start_rect)
        
        # 绘制智能体
        agent_rect = pygame.Rect(
            self.agent_pos[1] * cell_size + 5,
            self.agent_pos[0] * cell_size + 5,
            cell_size - 10,
            cell_size - 10
        )
        pygame.draw.circle(self.window, (255, 200, 0), agent_rect.center, cell_size // 3)
        
        pygame.display.flip()
        self.clock.tick(self.metadata['render_fps'])
    
    def close(self):
        """关闭环境"""
        if self.window is not None:
            import pygame
            pygame.quit()
            self.window = None
    
    def get_state(self):
        """获取当前状态（用于调试）"""
        return {
            'agent_pos': self.agent_pos,
            'goal_pos': self.goal_pos,
            'steps': self.steps,
            'grid': self.grid.copy()
        }
```

### 2.3 测试环境

创建 `test_env.py`：

```python
"""测试自定义环境"""

import sys
sys.path.append('.')

from envs.grid_world import GridWorldEnv

def test_environment():
    """测试环境功能"""
    print("=" * 50)
    print("测试 GridWorldEnv")
    print("=" * 50)
    
    # 创建环境
    env = GridWorldEnv(grid_size=10, render_mode=None)
    
    print(f"状态空间: {env.observation_space}")
    print(f"动作空间: {env.action_space}")
    print(f"网格大小: {env.grid_size}×{env.grid_size}")
    
    # 重置环境
    observation, info = env.reset()
    state = env.get_state()
    
    print(f"\n初始状态:")
    print(f"  智能体位置: {state['agent_pos']}")
    print(f"  目标位置: {state['goal_pos']}")
    print(f"  观察值: {observation}")
    
    # 测试几个动作
    print("\n执行10步随机动作:")
    total_reward = 0
    
    for step in range(10):
        action = env.action_space.sample()
        observation, reward, terminated, truncated, info = env.step(action)
        total_reward += reward
        
        state = env.get_state()
        print(f"  步骤{step+1}: 动作={action}, 奖励={reward:.2f}, "
              f"位置={state['agent_pos']}, 完成={terminated}")
        
        if terminated:
            print(f"  任务完成！总奖励: {total_reward:.2f}")
            break
    
    env.close()
    print("\n环境测试完成!")

if __name__ == '__main__':
    test_environment()
```

运行测试：

```bash
python test_env.py
```

---

## 3. Q-Learning算法实现

### 3.1 算法原理

Q-Learning是一种基于值函数的强化学习算法，通过迭代更新Q表来学习最优策略。

**Q值更新公式**：
```
Q(s, a) ← Q(s, a) + α[r + γ max_a' Q(s', a') - Q(s, a)]
```

其中：
- α：学习率
- γ：折扣因子
- r：即时奖励
- max_a' Q(s', a')：下一状态的最大Q值

### 3.2 Q-Learning实现

创建 `agents/q_learning.py`：

```python
"""
Q-Learning 智能体实现
"""

import numpy as np
from collections import defaultdict


class QLearningAgent:
    """
    Q-Learning 强化学习智能体
    
    使用表格形式存储Q值，适用于离散状态和动作空间
    """
    
    def __init__(self, n_states, n_actions, learning_rate=0.1, gamma=0.99,
                 epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        """
        初始化Q-Learning智能体
        
        参数:
            n_states: 状态空间大小
            n_actions: 动作空间大小
            learning_rate: 学习率 (α)
            gamma: 折扣因子 (γ)
            epsilon: 探索率初始值
            epsilon_decay: 探索率衰减率
            epsilon_min: 探索率最小值
        """
        self.n_states = n_states
        self.n_actions = n_actions
        self.lr = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        
        # 初始化Q表
        self.q_table = np.zeros((n_states, n_actions))
        
        # 训练统计
        self.training_history = []
    
    def select_action(self, state):
        """
        ε-greedy策略选择动作
        
        参数:
            state: 当前状态
            
        返回:
            action: 选择的动作
        """
        if np.random.random() < self.epsilon:
            # 随机探索
            return np.random.randint(self.n_actions)
        else:
            # 利用已知最优
            return np.argmax(self.q_table[state])
    
    def update(self, state, action, reward, next_state):
        """
        更新Q值
        
        Q(s, a) ← Q(s, a) + α[r + γ max Q(s', a') - Q(s, a)]
        
        参数:
            state: 当前状态
            action: 执行的动作
            reward: 获得的奖励
            next_state: 下一状态
        """
        current_q = self.q_table[state, action]
        max_next_q = np.max(self.q_table[next_state])
        
        # TD误差
        td_error = reward + self.gamma * max_next_q - current_q
        
        # 更新Q值
        self.q_table[state, action] = current_q + self.lr * td_error
        
        return td_error
    
    def decay_epsilon(self):
        """衰减探索率"""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
    
    def train(self, env, n_episodes=500, max_steps=100):
        """
        训练智能体
        
        参数:
            env: Gymnasium环境
            n_episodes: 训练的回合数
            max_steps: 每回合最大步数
            
        返回:
            episode_rewards: 每回合的总奖励列表
        """
        episode_rewards = []
        
        for episode in range(n_episodes):
            state, _ = env.reset()
            total_reward = 0
            td_errors = []
            
            for step in range(max_steps):
                # 选择动作
                action = self.select_action(state)
                
                # 执行动作
                next_state, reward, terminated, truncated, info = env.step(action)
                
                # 更新Q值
                td_error = self.update(state, action, reward, next_state)
                td_errors.append(abs(td_error))
                
                total_reward += reward
                state = next_state
                
                if terminated or truncated:
                    break
            
            # 衰减探索率
            self.decay_epsilon()
            
            episode_rewards.append(total_reward)
            
            # 记录训练历史
            self.training_history.append({
                'episode': episode,
                'reward': total_reward,
                'epsilon': self.epsilon,
                'mean_td_error': np.mean(td_errors) if td_errors else 0
            })
            
            # 打印训练进度
            if (episode + 1) % 50 == 0:
                recent_avg = np.mean(episode_rewards[-50:])
                print(f"Episode {episode+1}/{n_episodes} | "
                      f"平均奖励: {recent_avg:.2f} | "
                      f"ε: {self.epsilon:.4f}")
        
        return episode_rewards
    
    def get_policy(self):
        """
        获取当前最优策略
        
        返回:
            policy: 每个状态对应的最优动作
        """
        return np.argmax(self.q_table, axis=1)
    
    def evaluate(self, env, n_episodes=10, max_steps=100):
        """
        评估训练好的智能体
        
        参数:
            env: Gymnasium环境
            n_episodes: 评估的回合数
            max_steps: 每回合最大步数
            
        返回:
            mean_reward: 平均奖励
            success_rate: 成功率
        """
        episode_rewards = []
        successes = 0
        
        for episode in range(n_episodes):
            state, _ = env.reset()
            total_reward = 0
            
            for step in range(max_steps):
                action = np.argmax(self.q_table[state])  # 贪心策略
                next_state, reward, terminated, truncated, _ = env.step(action)
                
                total_reward += reward
                state = next_state
                
                if terminated:
                    if reward > 0:  # 成功完成任务
                        successes += 1
                    break
                if truncated:
                    break
            
            episode_rewards.append(total_reward)
        
        return np.mean(episode_rewards), successes / n_episodes


def visualize_q_table(q_table, grid_size=10):
    """
    可视化Q表
    
    参数:
        q_table: Q值表
        grid_size: 网格大小
    """
    import matplotlib.pyplot as plt
    import numpy as np
    
    # 提取每个状态的最优动作和Q值
    best_actions = np.argmax(q_table, axis=1).reshape(grid_size, grid_size)
    max_q_values = np.max(q_table, axis=1).reshape(grid_size, grid_size)
    
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # 最优动作可视化
    ax = axes[0]
    im = ax.imshow(best_actions, cmap='viridis', aspect='auto')
    ax.set_title('最优动作 (0=上, 1=下, 2=左, 3=右)')
    ax.set_xlabel('列')
    ax.set_ylabel('行')
    plt.colorbar(im, ax=ax)
    
    # Q值可视化
    ax = axes[1]
    im = ax.imshow(max_q_values, cmap='plasma', aspect='auto')
    ax.set_title('最大Q值')
    ax.set_xlabel('列')
    ax.set_ylabel('行')
    plt.colorbar(im, ax=ax)
    
    plt.tight_layout()
    plt.savefig('q_table_visualization.png', dpi=150)
    plt.show()
    print("Q表可视化已保存到 q_table_visualization.png")


if __name__ == '__main__':
    # 测试Q-Learning
    from envs.grid_world import GridWorldEnv
    
    print("=" * 50)
    print("Q-Learning 算法测试")
    print("=" * 50)
    
    # 创建环境
    env = GridWorldEnv(grid_size=10)
    
    # 创建智能体
    agent = QLearningAgent(
        n_states=env.observation_space.n,
        n_actions=env.action_space.n,
        learning_rate=0.1,
        gamma=0.95,
        epsilon=1.0,
        epsilon_decay=0.99,
        epsilon_min=0.01
    )
    
    # 训练
    print("\n开始训练...")
    rewards = agent.train(env, n_episodes=500, max_steps=100)
    
    # 评估
    print("\n评估训练结果...")
    mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
    print(f"平均奖励: {mean_reward:.2f}")
    print(f"成功率: {success_rate:.2%}")
    
    # 可视化Q表
    visualize_q_table(agent.q_table)
    
    env.close()
```

### 3.3 运行Q-Learning

```bash
python agents/q_learning.py
```

---

## 4. 深度Q网络(DQN)实现

### 4.1 DQN原理

当状态空间很大时（如图像），表格形式的Q-Learning不再适用。DQN使用深度神经网络来逼近Q函数。

**DQN的关键技术**：
1. **经验回放 (Experience Replay)**：存储转移样本，随机采样更新
2. **目标网络 (Target Network)**：定期复制网络参数，稳定训练

### 4.2 DQN实现

创建 `agents/dqn.py`：

```python
"""
深度Q网络 (DQN) 实现
"""

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque, namedtuple
import random


# 定义转移样本
Transition = namedtuple('Transition', 
                       ('state', 'action', 'reward', 'next_state', 'done'))


class ReplayBuffer:
    """
    经验回放缓冲区
    存储转移样本，用于随机采样打破数据相关性
    """
    
    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        """添加转移样本到缓冲区"""
        self.buffer.append(Transition(state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        """随机采样一批样本"""
        batch = random.sample(self.buffer, batch_size)
        
        states = torch.FloatTensor(np.array([t.state for t in batch]))
        actions = torch.LongTensor([t.action for t in batch])
        rewards = torch.FloatTensor([t.reward for t in batch])
        next_states = torch.FloatTensor(np.array([t.next_state for t in batch]))
        dones = torch.FloatTensor([t.done for t in batch])
        
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.buffer)


class QNetwork(nn.Module):
    """
    Q网络
    用于逼近Q(s, a)
    """
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super().__init__()
        
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, x):
        return self.network(x)


class DQNAgent:
    """
    DQN智能体
    """
    
    def __init__(self, state_dim, action_dim, hidden_dim=128,
                 learning_rate=0.001, gamma=0.99, epsilon=1.0,
                 epsilon_decay=0.995, epsilon_min=0.01,
                 target_update_freq=10, replay_buffer_size=10000):
        
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.target_update_freq = target_update_freq
        
        # 设备
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"使用设备: {self.device}")
        
        # Q网络和目标网络
        self.q_network = QNetwork(state_dim, action_dim, hidden_dim).to(self.device)
        self.target_network = QNetwork(state_dim, action_dim, hidden_dim).to(self.device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        
        # 优化器
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
        
        # 经验回放缓冲区
        self.replay_buffer = ReplayBuffer(replay_buffer_size)
        
        # 训练统计
        self.training_history = []
        self.update_count = 0
    
    def select_action(self, state, training=True):
        """ε-greedy策略选择动作"""
        if training and np.random.random() < self.epsilon:
            return np.random.randint(self.action_dim)
        else:
            with torch.no_grad():
                state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
                q_values = self.q_network(state_tensor)
                return q_values.argmax().item()
    
    def update(self, batch_size=64):
        """更新Q网络"""
        if len(self.replay_buffer) < batch_size:
            return None
        
        # 采样
        states, actions, rewards, next_states, dones = \
            self.replay_buffer.sample(batch_size)
        
        states = states.to(self.device)
        actions = actions.to(self.device)
        rewards = rewards.to(self.device)
        next_states = next_states.to(self.device)
        dones = dones.to(self.device)
        
        # 计算当前Q值
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze()
        
        # 计算目标Q值（使用目标网络）
        with torch.no_grad():
            max_next_q = self.target_network(next_states).max(1)[0]
            target_q = rewards + self.gamma * max_next_q * (1 - dones)
        
        # 计算损失
        loss = nn.functional.mse_loss(current_q, target_q)
        
        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), 1.0)
        self.optimizer.step()
        
        # 定期更新目标网络
        self.update_count += 1
        if self.update_count % self.target_update_freq == 0:
            self.target_network.load_state_dict(self.q_network.state_dict())
        
        return loss.item()
    
    def decay_epsilon(self):
        """衰减探索率"""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
    
    def train(self, env, n_episodes=500, max_steps=100, batch_size=64):
        """训练智能体"""
        episode_rewards = []
        losses = []
        
        for episode in range(n_episodes):
            state, _ = env.reset()
            # 将状态转为one-hot或直接使用索引
            state = np.eye(env.observation_space.n)[state]  # one-hot编码
            
            total_reward = 0
            episode_loss = []
            
            for step in range(max_steps):
                # 选择动作
                action = self.select_action(state)
                
                # 执行动作
                next_state, reward, terminated, truncated, _ = env.step(action)
                next_state_onehot = np.eye(env.observation_space.n)[next_state]
                
                done = terminated or truncated
                
                # 存储转移
                self.replay_buffer.push(state, action, reward, next_state_onehot, done)
                
                # 更新网络
                loss = self.update(batch_size)
                if loss is not None:
                    episode_loss.append(loss)
                
                total_reward += reward
                state = next_state_onehot
                
                if done:
                    break
            
            self.decay_epsilon()
            episode_rewards.append(total_reward)
            losses.append(np.mean(episode_loss) if episode_loss else 0)
            
            if (episode + 1) % 50 == 0:
                recent_avg = np.mean(episode_rewards[-50:])
                print(f"Episode {episode+1}/{n_episodes} | "
                      f"平均奖励: {recent_avg:.2f} | "
                      f"ε: {self.epsilon:.4f}")
        
        self.training_history = {
            'rewards': episode_rewards,
            'losses': losses
        }
        
        return episode_rewards
    
    def evaluate(self, env, n_episodes=10, max_steps=100):
        """评估智能体"""
        episode_rewards = []
        successes = 0
        
        for episode in range(n_episodes):
            state, _ = env.reset()
            state = np.eye(env.observation_space.n)[state]
            total_reward = 0
            
            for step in range(max_steps):
                action = self.select_action(state, training=False)
                next_state, reward, terminated, truncated, _ = env.step(action)
                next_state = np.eye(env.observation_space.n)[next_state]
                
                total_reward += reward
                state = next_state
                
                if terminated:
                    if reward > 0:
                        successes += 1
                    break
                if truncated:
                    break
            
            episode_rewards.append(total_reward)
        
        return np.mean(episode_rewards), successes / n_episodes


def plot_training_history(history):
    """绘制训练曲线"""
    import matplotlib.pyplot as plt
    
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # 奖励曲线
    axes[0].plot(history['rewards'])
    axes[0].set_xlabel('Episode')
    axes[0].set_ylabel('Total Reward')
    axes[0].set_title('Training Rewards')
    axes[0].grid(True)
    
    # 损失曲线
    axes[1].plot(history['losses'])
    axes[1].set_xlabel('Episode')
    axes[1].set_ylabel('Loss')
    axes[1].set_title('Training Loss')
    axes[1].grid(True)
    
    plt.tight_layout()
    plt.savefig('dqn_training.png', dpi=150)
    plt.show()


if __name__ == '__main__':
    from envs.grid_world import GridWorldEnv
    
    print("=" * 50)
    print("深度Q网络 (DQN) 算法测试")
    print("=" * 50)
    
    # 创建环境
    env = GridWorldEnv(grid_size=10)
    
    # 创建智能体
    state_dim = env.observation_space.n  # 100个状态
    action_dim = env.action_space.n     # 4个动作
    
    agent = DQNAgent(
        state_dim=state_dim,
        action_dim=action_dim,
        hidden_dim=128,
        learning_rate=0.001,
        gamma=0.99,
        epsilon=1.0,
        epsilon_decay=0.99,
        epsilon_min=0.01
    )
    
    # 训练
    print("\n开始训练...")
    rewards = agent.train(env, n_episodes=500, max_steps=100, batch_size=64)
    
    # 评估
    print("\n评估训练结果...")
    mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
    print(f"平均奖励: {mean_reward:.2f}")
    print(f"成功率: {success_rate:.2%}")
    
    # 绘制训练曲线
    plot_training_history(agent.training_history)
    
    env.close()
```

---

## 5. 训练与评估

### 5.1 训练脚本

创建 `train.py`：

```python
"""训练和评估脚本"""

import argparse
import numpy as np
import matplotlib.pyplot as plt
from envs.grid_world import GridWorldEnv
from agents.q_learning import QLearningAgent
from agents.dqn import DQNAgent


def plot_comparison(q_rewards, dqn_rewards, title="Algorithm Comparison"):
    """绘制算法对比图"""
    plt.figure(figsize=(12, 5))
    
    # 奖励曲线
    plt.subplot(1, 2, 1)
    plt.plot(q_rewards, label='Q-Learning', alpha=0.7)
    plt.plot(dqn_rewards, label='DQN', alpha=0.7)
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('Training Rewards')
    plt.legend()
    plt.grid(True)
    
    # 移动平均
    window = 20
    plt.subplot(1, 2, 2)
    q_ma = np.convolve(q_rewards, np.ones(window)/window, mode='valid')
    dqn_ma = np.convolve(dqn_rewards, np.ones(window)/window, mode='valid')
    plt.plot(q_ma, label='Q-Learning (MA)', linewidth=2)
    plt.plot(dqn_ma, label='DQN (MA)', linewidth=2)
    plt.xlabel('Episode')
    plt.ylabel('Moving Average Reward')
    plt.title(f'{window}-Episode Moving Average')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig('algorithm_comparison.png', dpi=150)
    plt.show()


def main():
    parser = argparse.ArgumentParser(description='训练强化学习智能体')
    parser.add_argument('--algorithm', type=str, default='dqn', 
                       choices=['qlearning', 'dqn'],
                       help='选择算法')
    parser.add_argument('--episodes', type=int, default=500,
                       help='训练回合数')
    args = parser.parse_args()
    
    # 创建环境
    env = GridWorldEnv(grid_size=10)
    
    if args.algorithm == 'qlearning':
        print("训练 Q-Learning 智能体...")
        agent = QLearningAgent(
            n_states=env.observation_space.n,
            n_actions=env.action_space.n,
            learning_rate=0.1,
            gamma=0.95,
            epsilon=1.0,
            epsilon_decay=0.99,
            epsilon_min=0.01
        )
        rewards = agent.train(env, n_episodes=args.episodes, max_steps=100)
        mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
        
    else:  # dqn
        print("训练 DQN 智能体...")
        agent = DQNAgent(
            state_dim=env.observation_space.n,
            action_dim=env.action_space.n,
            hidden_dim=128,
            learning_rate=0.001,
            gamma=0.99,
            epsilon=1.0,
            epsilon_decay=0.99,
            epsilon_min=0.01
        )
        rewards = agent.train(env, n_episodes=args.episodes, max_steps=100, batch_size=64)
        mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
    
    print(f"\n最终结果:")
    print(f"  平均奖励: {mean_reward:.2f}")
    print(f"  成功率: {success_rate:.2%}")
    
    env.close()


if __name__ == '__main__':
    main()
```

### 5.2 运行训练

```bash
# 训练Q-Learning
python train.py --algorithm qlearning --episodes 500

# 训练DQN
python train.py --algorithm dqn --episodes 500
```

---

## 6. 进阶：PPO算法

### 6.1 PPO原理

PPO (Proximal Policy Optimization) 是一种策略梯度算法，通过限制策略更新幅度来提高训练稳定性。

**核心思想**：
```
L^CLIP(θ) = E[ min(r(θ) * A_t, clip(r(θ), 1-ε, 1+ε) * A_t) ]

其中 r(θ) = π_θ(a|s) / π_θ_old(a|s) 是概率比率
```

### 6.2 使用Stable-Baselines3实现PPO

创建 `train_ppo.py`：

```python
"""
使用Stable-Baselines3实现PPO
"""

import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
import numpy as np


def main():
    # 创建向量化环境（加速训练）
    env = make_vec_env('CartPole-v1', n_envs=4)
    
    # 创建PPO模型
    model = PPO(
        'MlpPolicy',
        env,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.01,
        verbose=1
    )
    
    # 训练
    print("训练 PPO 智能体...")
    model.learn(total_timesteps=100000, progress_bar=True)
    
    # 评估
    print("\n评估训练结果...")
    eval_env = gym.make('CartPole-v1')
    
    obs, _ = eval_env.reset()
    total_reward = 0
    n_episodes = 10
    
    for episode in range(n_episodes):
        episode_reward = 0
        done = False
        
        while not done:
            action, _ = model.predict(obs)
            obs, reward, terminated, truncated, _ = eval_env.step(action)
            episode_reward += reward
            done = terminated or truncated
            
            if terminated or truncated:
                obs, _ = eval_env.reset()
                total_reward += episode_reward
                print(f"Episode {episode+1}: reward = {episode_reward}")
                break
    
    print(f"\n平均奖励: {total_reward / n_episodes:.2f}")
    
    # 保存模型
    model.save("ppo_cartpole")
    print("模型已保存到 ppo_cartpole.zip")


if __name__ == '__main__':
    main()
```

---

## 7. 可视化与调试

### 7.1 创建训练可视化工具

创建 `visualize.py`：

```python
"""
训练过程可视化工具
"""

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import FancyBboxPatch, Circle, Rectangle


class TrainingVisualizer:
    """训练过程可视化"""
    
    def __init__(self, grid_size=10):
        self.grid_size = grid_size
    
    def visualize_episode(self, episode_data):
        """
        可视化一个回合的轨迹
        
        episode_data: list of (state, action, reward) tuples
        """
        fig, axes = plt.subplots(2, 5, figsize=(15, 6))
        axes = axes.flatten()
        
        # 只显示前10步
        for i, (state, action, reward) in enumerate(episode_data[:10]):
            ax = axes[i]
            
            # 计算位置
            row, col = state // self.grid_size, state % self.grid_size
            
            # 绘制网格
            ax.set_xlim(-0.5, self.grid_size - 0.5)
            ax.set_ylim(-0.5, self.grid_size - 0.5)
            ax.set_aspect('equal')
            
            for r in range(self.grid_size):
                for c in range(self.grid_size):
                    rect = Rectangle((c, r), 1, 1, fill=False, edgecolor='gray', linewidth=0.5)
                    ax.add_patch(rect)
            
            # 绘制智能体
            agent = Circle((col + 0.5, row + 0.5), 0.3, color='orange')
            ax.add_patch(agent)
            
            # 绘制动作方向
            action_dirs = {0: (0, 0.5), 1: (0, -0.5), 2: (-0.5, 0), 3: (0.5, 0)}
            dx, dy = action_dirs[action]
            ax.annotate('', xy=(col + 0.5 + dx, row + 0.5 + dy),
                       xytext=(col + 0.5, row + 0.5),
                       arrowprops=dict(arrowstyle='->', color='red', lw=2))
            
            ax.set_title(f'Step {i+1}: a={action}, r={reward:.1f}')
            ax.axis('off')
        
        plt.tight_layout()
        plt.savefig('episode_visualization.png', dpi=150)
        plt.show()
    
    def plot_learning_curve(self, rewards, window=20):
        """绘制学习曲线"""
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # 原始奖励
        axes[0].plot(rewards, alpha=0.5)
        axes[0].set_xlabel('Episode')
        axes[0].set_ylabel('Total Reward')
        axes[0].set_title('Training Rewards')
        axes[0].grid(True)
        
        # 移动平均
        ma = np.convolve(rewards, np.ones(window)/window, mode='valid')
        axes[1].plot(ma, color='orange', linewidth=2)
        axes[1].set_xlabel('Episode')
        axes[1].set_ylabel(f'{window}-Episode Moving Average')
        axes[1].set_title('Learning Curve (Smoothed)')
        axes[1].grid(True)
        
        plt.tight_layout()
        plt.savefig('learning_curve.png', dpi=150)
        plt.show()


if __name__ == '__main__':
    # 演示可视化
    import numpy as np
    
    # 生成模拟数据
    episode_data = []
    state = 0
    for i in range(15):
        action = np.random.randint(4)
        reward = np.random.randn()
        episode_data.append((state, action, reward))
        state = (state + np.random.randint(1, 4)) % 100
    
    visualizer = TrainingVisualizer(grid_size=10)
    visualizer.visualize_episode(episode_data)
    
    # 模拟学习曲线
    rewards = np.cumsum(np.random.randn(500)) + 50
    rewards = rewards - np.arange(500) * 0.1 + np.random.rand(500) * 20
    visualizer.plot_learning_curve(rewards)
```

---

## 总结与下一步

### 项目总结

完成本项目后，你将掌握：
1. ✅ 自定义Gymnasium环境的创建
2. ✅ Q-Learning算法的实现和训练
3. ✅ DQN算法的实现和训练
4. ✅ 训练过程的可视化和评估

### 扩展建议

1. **环境扩展**：
   - 添加随机障碍物生成
   - 实现3D环境
   - 添加传感器噪声

2. **算法扩展**：
   - 实现Double DQN
   - 实现Dueling DQN
   - 实现PPO算法

3. **应用扩展**：
   - 迁移到Gazebo仿真环境
   - 结合视觉输入（CNN）
   - 实现多智能体协作

### 参考资源

- [Gymnasium官方文档](https://gymnasium.farama.org/)
- [Stable-Baselines3文档](https://stable-baselines3.readthedocs.io/)
- [Reinforcement Learning: An Introduction](http://incompleteideas.net/book/the-book-2nd.html)

---

**下一步**：[项目二：机器人抓取仿真](./project2-robot-grasping.md)
