学習目標
この章を読むことで、以下を習得できます:
- ✅ Policy-basedとValue-basedアプローチの違いを理解できる
- ✅ Policy Gradientの数学的定式化を理解できる
- ✅ REINFORCEアルゴリズムを実装できる
- ✅ Actor-Criticアーキテクチャを理解し実装できる
- ✅ Advantage Actor-Critic (A2C)を実装できる
- ✅ Proximal Policy Optimization (PPO)を理解できる
- ✅ LunarLanderなどの連続制御タスクを解決できる
4.1 Policy-based vs Value-based
4.1.1 2つのアプローチ
強化学習には大きく分けて2つのアプローチがあります:
| 特性 | Value-based (価値ベース) | Policy-based (方策ベース) |
|---|---|---|
| 学習対象 | 価値関数 $Q(s, a)$ または $V(s)$ | 方策 $\pi(a|s)$ を直接学習 |
| 行動選択 | 間接的($\arg\max_a Q(s,a)$) | 直接的($\pi(a|s)$ からサンプリング) |
| 行動空間 | 離散的な行動に適する | 連続的な行動にも対応可 |
| 確率的方策 | 扱いにくい(ε-greedy等で対応) | 自然に扱える |
| 収束性 | 最適方策の保証あり(条件下) | 局所最適解の可能性 |
| サンプル効率 | 高い(経験再生可能) | 低い(on-policy学習) |
| 代表アルゴリズム | Q-learning, DQN, Double DQN | REINFORCE, A2C, PPO, TRPO |
4.1.2 Policy Gradientの動機
Value-basedアプローチの課題:
- 連続行動空間: ロボット制御など、無限の行動選択肢がある場合に$\arg\max$計算が困難
- 確率的方策: じゃんけんのように確率的な行動が最適な場合に対応しづらい
- 高次元行動空間: 行動の組み合わせが膨大な場合、すべての行動価値を計算するのは非効率
Policy-basedアプローチの解決策:
- 方策をパラメータ $\theta$ でモデル化: $\pi_\theta(a|s)$
- 期待リターンを最大化するように $\theta$ を直接最適化
- ニューラルネットワークで方策を表現可能
Q(s,a)"] AM["argmax"] A1["Action a"] S1 --> Q Q --> AM AM --> A1 style Q fill:#e74c3c,color:#fff end subgraph "Policy-based Approach" S2["State s"] P["Policy π(a|s)
parameterized by θ"] A2["Action a
(sampled)"] S2 --> P P --> A2 style P fill:#27ae60,color:#fff end
4.1.3 Policy Gradientの定式化
方策 $\pi_\theta(a|s)$ をパラメータ $\theta$ で表現し、期待リターン(目的関数)$J(\theta)$ を最大化します:
$$ J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} [R(\tau)] $$ここで:
- $\tau = (s_0, a_0, r_1, s_1, a_1, r_2, \ldots)$: 軌跡(trajectory)
- $R(\tau) = \sum_{t=0}^{T} \gamma^t r_t$: 軌跡の累積報酬
Policy Gradient定理により、$J(\theta)$ の勾配は以下のように表せます:
$$ \nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) R_t \right] $$ここで $R_t = \sum_{t'=t}^{T} \gamma^{t'-t} r_{t'}$ は時刻 $t$ からの累積報酬です。
直感的理解: 高いリターンをもたらした行動の確率を増やし、低いリターンの行動の確率を減らす。これにより、良い軌跡を生成する方策パラメータへと更新される。
4.2 REINFORCEアルゴリズム
4.2.1 REINFORCEの基本原理
REINFORCE(Williams, 1992)は最もシンプルなPolicy Gradientアルゴリズムです。モンテカルロ法でリターンを推定します。
アルゴリズム
- 方策 $\pi_\theta(a|s)$ で1エピソードを実行し、軌跡 $\tau$ を収集
- 各時刻 $t$ のリターン $R_t$ を計算
- 勾配上昇でパラメータを更新: $$ \theta \leftarrow \theta + \alpha \sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) R_t $$
バリアンス削減:Baseline
リターンから定数 $b$ を引いても勾配の期待値は変わりません(不偏性)。これにより分散を削減できます:
$$ \nabla_\theta J(\theta) = \mathbb{E}_{\tau} \left[ \sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) (R_t - b) \right] $$一般的な選択:$b = V(s_t)$(状態価値関数)
Sample τ ~ π_θ"] Compute["Compute Returns
R_t for all t"] Grad["Compute Gradient
∇_θ log π_θ(a_t|s_t) R_t"] Update["Update Parameters
θ ← θ + α ∇_θ J(θ)"] Check["Converged?"] Init --> Episode Episode --> Compute Compute --> Grad Grad --> Update Update --> Check Check -->|No| Episode Check -->|Yes| Done["Done"] style Init fill:#7b2cbf,color:#fff style Done fill:#27ae60,color:#fff style Grad fill:#e74c3c,color:#fff end
4.2.2 REINFORCE実装(CartPole)
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gym
import matplotlib.pyplot as plt
from collections import deque
class PolicyNetwork(nn.Module):
"""
Policy Network for REINFORCE
入力: 状態 s
出力: 各行動の確率 π(a|s)
"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(PolicyNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
"""
Args:
state: 状態 [batch_size, state_dim]
Returns:
action_probs: 行動確率 [batch_size, action_dim]
"""
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
logits = self.fc3(x)
action_probs = F.softmax(logits, dim=-1)
return action_probs
class REINFORCE:
"""REINFORCE Algorithm"""
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
"""
Args:
state_dim: 状態空間の次元
action_dim: 行動空間の次元
lr: 学習率
gamma: 割引率
"""
self.gamma = gamma
self.policy = PolicyNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
# エピソードデータの保存
self.saved_log_probs = []
self.rewards = []
def select_action(self, state):
"""
方策に従って行動を選択
Args:
state: 状態
Returns:
action: 選択された行動
"""
state = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.policy(state)
# 確率分布からサンプリング
m = torch.distributions.Categorical(action_probs)
action = m.sample()
# log π(a|s) を保存(勾配計算に使用)
self.saved_log_probs.append(m.log_prob(action))
return action.item()
def update(self):
"""
エピソード終了後にパラメータを更新
"""
R = 0
returns = []
# リターンの計算(逆順に計算)
for r in self.rewards[::-1]:
R = r + self.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns)
# 正規化(分散削減)
if len(returns) > 1:
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# Policy gradient計算
policy_loss = []
for log_prob, R in zip(self.saved_log_probs, returns):
policy_loss.append(-log_prob * R)
# 勾配上昇(損失を最小化 = -目的関数を最小化)
self.optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
policy_loss.backward()
self.optimizer.step()
# リセット
self.saved_log_probs = []
self.rewards = []
return policy_loss.item()
# デモンストレーション
print("=== REINFORCE on CartPole ===\n")
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
print(f"Environment: CartPole-v1")
print(f" State dimension: {state_dim}")
print(f" Action dimension: {action_dim}")
agent = REINFORCE(state_dim, action_dim, lr=0.01, gamma=0.99)
print(f"\nAgent: REINFORCE")
total_params = sum(p.numel() for p in agent.policy.parameters())
print(f" Total parameters: {total_params:,}")
# 訓練
num_episodes = 500
episode_rewards = []
moving_avg = deque(maxlen=100)
print("\nTraining...")
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
for t in range(1000):
action = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
agent.rewards.append(reward)
episode_reward += reward
state = next_state
if done:
break
# エピソード終了後に更新
loss = agent.update()
episode_rewards.append(episode_reward)
moving_avg.append(episode_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(moving_avg)
print(f"Episode {episode+1:3d}, Avg Reward (last 100): {avg_reward:.2f}, Loss: {loss:.4f}")
print(f"\nTraining completed!")
print(f"Final average reward (last 100 episodes): {np.mean(moving_avg):.2f}")
# 可視化
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(episode_rewards, alpha=0.3, color='steelblue', label='Episode Reward')
# 移動平均
window = 50
moving_avg_plot = [np.mean(episode_rewards[max(0, i-window):i+1])
for i in range(len(episode_rewards))]
ax.plot(moving_avg_plot, linewidth=2, color='darkorange', label=f'Moving Average ({window} episodes)')
ax.axhline(y=195, color='red', linestyle='--', label='Solved Threshold (195)')
ax.set_xlabel('Episode', fontsize=12, fontweight='bold')
ax.set_ylabel('Reward', fontsize=12, fontweight='bold')
ax.set_title('REINFORCE on CartPole-v1', fontsize=13, fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("\n✓ REINFORCEの特徴:")
print(" • シンプルで実装が容易")
print(" • モンテカルロ法(エピソード終了後に更新)")
print(" • 高分散(リターンの変動が大きい)")
print(" • On-policy(現在の方策でサンプリング)")
出力例:
=== REINFORCE on CartPole ===
Environment: CartPole-v1
State dimension: 4
Action dimension: 2
Agent: REINFORCE
Total parameters: 16,642
Training...
Episode 50, Avg Reward (last 100): 23.45, Loss: 15.2341
Episode 100, Avg Reward (last 100): 45.67, Loss: 12.5678
Episode 150, Avg Reward (last 100): 89.23, Loss: 8.3456
Episode 200, Avg Reward (last 100): 142.56, Loss: 5.6789
Episode 250, Avg Reward (last 100): 178.34, Loss: 3.4567
Episode 300, Avg Reward (last 100): 195.78, Loss: 2.1234
Episode 350, Avg Reward (last 100): 210.45, Loss: 1.5678
Episode 400, Avg Reward (last 100): 230.67, Loss: 1.2345
Episode 450, Avg Reward (last 100): 245.89, Loss: 0.9876
Episode 500, Avg Reward (last 100): 260.34, Loss: 0.7654
Training completed!
Final average reward (last 100 episodes): 260.34
✓ REINFORCEの特徴:
• シンプルで実装が容易
• モンテカルロ法(エピソード終了後に更新)
• 高分散(リターンの変動が大きい)
• On-policy(現在の方策でサンプリング)
4.2.3 REINFORCEの課題
REINFORCEには以下の課題があります:
- 高分散: リターン $R_t$ の分散が大きく、学習が不安定
- サンプル非効率: エピソード終了まで更新できない
- モンテカルロ法: 長いエピソードでは学習が遅い
解決策: Actor-Criticアーキテクチャでリターンを価値関数で推定
4.3 Actor-Critic法
4.3.1 Actor-Criticの原理
Actor-Criticは、Policy Gradient(Actor)とValue-based(Critic)を組み合わせた手法です。
| コンポーネント | 役割 | 出力 |
|---|---|---|
| Actor | 方策 $\pi_\theta(a|s)$ を学習 | 行動確率分布 |
| Critic | 価値関数 $V_\phi(s)$ を学習 | 状態価値推定 |
利点
- 低分散: Criticによるベースライン $V(s)$ で分散削減
- TD学習: エピソード途中でも更新可能(TD誤差使用)
- 効率的: モンテカルロ法より学習が速い
更新式
TD誤差(Advantage):
$$ A_t = r_t + \gamma V_\phi(s_{t+1}) - V_\phi(s_t) $$Actorの更新:
$$ \theta \leftarrow \theta + \alpha_\theta \nabla_\theta \log \pi_\theta(a_t|s_t) A_t $$Criticの更新:
$$ \phi \leftarrow \phi - \alpha_\phi \nabla_\phi (r_t + \gamma V_\phi(s_{t+1}) - V_\phi(s_t))^2 $$π_θ(a|s)"] Critic["Critic
V_φ(s)"] Action["Action a_t"] Value["Value V(s_t)"] Env["Environment"] Reward["Reward r_t"] NextState["Next State s_{t+1}"] TDError["TD Error
A_t = r_t + γV(s_{t+1}) - V(s_t)"] ActorUpdate["Actor Update
θ ← θ + α ∇_θ log π A_t"] CriticUpdate["Critic Update
φ ← φ - α ∇_φ (A_t)²"] State --> Actor State --> Critic Actor --> Action Critic --> Value Action --> Env State --> Env Env --> Reward Env --> NextState Reward --> TDError Value --> TDError NextState --> Critic TDError --> ActorUpdate TDError --> CriticUpdate ActorUpdate -.-> Actor CriticUpdate -.-> Critic style Actor fill:#27ae60,color:#fff style Critic fill:#e74c3c,color:#fff style TDError fill:#f39c12,color:#fff end
4.3.2 Actor-Critic実装
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gym
class ActorCriticNetwork(nn.Module):
"""
Actor-Critic Network
共有のfeature extractorを持ち、ActorとCriticの2つのヘッドを持つ
"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(ActorCriticNetwork, self).__init__()
# 共有層
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
# Actorヘッド(方策)
self.actor_head = nn.Linear(hidden_dim, action_dim)
# Criticヘッド(価値関数)
self.critic_head = nn.Linear(hidden_dim, 1)
def forward(self, state):
"""
Args:
state: 状態 [batch_size, state_dim]
Returns:
action_probs: 行動確率 [batch_size, action_dim]
state_value: 状態価値 [batch_size, 1]
"""
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
# Actor出力
logits = self.actor_head(x)
action_probs = F.softmax(logits, dim=-1)
# Critic出力
state_value = self.critic_head(x)
return action_probs, state_value
class ActorCritic:
"""Actor-Critic Algorithm"""
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
"""
Args:
state_dim: 状態空間の次元
action_dim: 行動空間の次元
lr: 学習率
gamma: 割引率
"""
self.gamma = gamma
self.network = ActorCriticNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
def select_action(self, state):
"""
方策に従って行動を選択
Args:
state: 状態
Returns:
action: 選択された行動
log_prob: log π(a|s)
state_value: V(s)
"""
state = torch.FloatTensor(state).unsqueeze(0)
action_probs, state_value = self.network(state)
# 確率分布からサンプリング
m = torch.distributions.Categorical(action_probs)
action = m.sample()
log_prob = m.log_prob(action)
return action.item(), log_prob, state_value
def update(self, log_prob, state_value, reward, next_state, done):
"""
1ステップごとにパラメータを更新(TD学習)
Args:
log_prob: log π(a|s)
state_value: V(s)
reward: 報酬 r
next_state: 次の状態 s'
done: エピソード終了フラグ
Returns:
loss: 損失値
"""
# 次の状態の価値推定
if done:
next_value = torch.tensor([0.0])
else:
next_state = torch.FloatTensor(next_state).unsqueeze(0)
with torch.no_grad():
_, next_value = self.network(next_state)
# TD誤差(Advantage)
td_target = reward + self.gamma * next_value
td_error = td_target - state_value
# Actor損失: -log π(a|s) * A
actor_loss = -log_prob * td_error.detach()
# Critic損失: (TD誤差)^2
critic_loss = td_error.pow(2)
# 総損失
loss = actor_loss + critic_loss
# 更新
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
# デモンストレーション
print("=== Actor-Critic on CartPole ===\n")
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = ActorCritic(state_dim, action_dim, lr=0.001, gamma=0.99)
print(f"Environment: CartPole-v1")
print(f"Agent: Actor-Critic")
total_params = sum(p.numel() for p in agent.network.parameters())
print(f" Total parameters: {total_params:,}")
# 訓練
num_episodes = 300
episode_rewards = []
print("\nTraining...")
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
episode_loss = 0
steps = 0
for t in range(1000):
action, log_prob, state_value = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
# 1ステップごとに更新
loss = agent.update(log_prob, state_value, reward, next_state, done)
episode_reward += reward
episode_loss += loss
steps += 1
state = next_state
if done:
break
episode_rewards.append(episode_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-100:])
avg_loss = episode_loss / steps
print(f"Episode {episode+1:3d}, Avg Reward: {avg_reward:.2f}, Avg Loss: {avg_loss:.4f}")
print(f"\nTraining completed!")
print(f"Final average reward (last 100 episodes): {np.mean(episode_rewards[-100:]):.2f}")
print("\n✓ Actor-Criticの特徴:")
print(" • ActorとCriticの2つのネットワーク")
print(" • TD学習(1ステップごとに更新)")
print(" • REINFORCEより低分散")
print(" • より安定した学習")
出力例:
=== Actor-Critic on CartPole ===
Environment: CartPole-v1
Agent: Actor-Critic
Total parameters: 17,027
Training...
Episode 50, Avg Reward: 45.67, Avg Loss: 2.3456
Episode 100, Avg Reward: 98.23, Avg Loss: 1.5678
Episode 150, Avg Reward: 165.45, Avg Loss: 0.9876
Episode 200, Avg Reward: 210.34, Avg Loss: 0.6543
Episode 250, Avg Reward: 245.67, Avg Loss: 0.4321
Episode 300, Avg Reward: 280.89, Avg Loss: 0.2987
Training completed!
Final average reward (last 100 episodes): 280.89
✓ Actor-Criticの特徴:
• ActorとCriticの2つのネットワーク
• TD学習(1ステップごとに更新)
• REINFORCEより低分散
• より安定した学習
4.4 Advantage Actor-Critic (A2C)
4.3.1 A2Cの改善点
A2C (Advantage Actor-Critic)は、Actor-Criticの改良版で以下の特徴があります:
- n-step returns: 複数ステップ先まで見たリターンを使用
- Parallel environments: 複数環境で同時にサンプリング(データ多様性)
- Entropy regularization: 探索を促進
- Generalized Advantage Estimation (GAE): バイアスと分散のトレードオフを調整
n-step Returns
1ステップTDではなく、$n$ステップ先までの報酬を使用:
$$ R_t^{(n)} = r_t + \gamma r_{t+1} + \gamma^2 r_{t+2} + \cdots + \gamma^{n-1} r_{t+n-1} + \gamma^n V(s_{t+n}) $$Entropy Regularization
方策のエントロピーを目的関数に加え、探索を促進:
$$ J(\theta) = \mathbb{E} \left[ \sum_t \log \pi_\theta(a_t|s_t) A_t + \beta H(\pi_\theta(\cdot|s_t)) \right] $$ここで $H(\pi) = -\sum_a \pi(a|s) \log \pi(a|s)$ はエントロピー。
4.4.2 A2C実装
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gym
from torch.distributions import Categorical
class A2CNetwork(nn.Module):
"""A2C Network with shared feature extractor"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(A2CNetwork, self).__init__()
# 共有特徴抽出層
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
# Actor
self.actor = nn.Linear(hidden_dim, action_dim)
# Critic
self.critic = nn.Linear(hidden_dim, 1)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
action_logits = self.actor(x)
state_value = self.critic(x)
return action_logits, state_value
class A2C:
"""
Advantage Actor-Critic (A2C)
Features:
- n-step returns
- Entropy regularization
- Parallel environment support
"""
def __init__(self, state_dim, action_dim, lr=0.0007, gamma=0.99,
n_steps=5, entropy_coef=0.01, value_coef=0.5):
"""
Args:
state_dim: 状態空間の次元
action_dim: 行動空間の次元
lr: 学習率
gamma: 割引率
n_steps: n-step returns
entropy_coef: エントロピー正則化係数
value_coef: 価値損失の係数
"""
self.gamma = gamma
self.n_steps = n_steps
self.entropy_coef = entropy_coef
self.value_coef = value_coef
self.network = A2CNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
def select_action(self, state):
"""行動選択"""
state = torch.FloatTensor(state).unsqueeze(0)
action_logits, state_value = self.network(state)
# 行動分布
dist = Categorical(logits=action_logits)
action = dist.sample()
return action.item(), dist.log_prob(action), dist.entropy(), state_value
def compute_returns(self, rewards, values, dones, next_value):
"""
n-step returnsの計算
Args:
rewards: 報酬列 [n_steps]
values: 状態価値列 [n_steps]
dones: 終了フラグ列 [n_steps]
next_value: 最後の状態の次の状態価値
Returns:
returns: n-step returns [n_steps]
advantages: Advantage [n_steps]
"""
returns = []
R = next_value
# 逆順に計算
for step in reversed(range(len(rewards))):
R = rewards[step] + self.gamma * R * (1 - dones[step])
returns.insert(0, R)
returns = torch.tensor(returns, dtype=torch.float32)
values = torch.cat(values)
# Advantage = Returns - V(s)
advantages = returns - values.detach()
return returns, advantages
def update(self, log_probs, entropies, values, returns, advantages):
"""
パラメータ更新
Args:
log_probs: log π(a|s) のリスト
entropies: エントロピーのリスト
values: V(s) のリスト
returns: n-step returns
advantages: Advantage
"""
log_probs = torch.cat(log_probs)
entropies = torch.cat(entropies)
values = torch.cat(values)
# Actor損失: -log π(a|s) * A
actor_loss = -(log_probs * advantages.detach()).mean()
# Critic損失: MSE(returns, V(s))
critic_loss = F.mse_loss(values, returns)
# エントロピー正則化
entropy_loss = -entropies.mean()
# 総損失
loss = actor_loss + self.value_coef * critic_loss + self.entropy_coef * entropy_loss
# 更新
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
self.optimizer.step()
return loss.item(), actor_loss.item(), critic_loss.item(), entropy_loss.item()
# デモンストレーション
print("=== A2C on CartPole ===\n")
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = A2C(state_dim, action_dim, lr=0.0007, gamma=0.99, n_steps=5)
print(f"Environment: CartPole-v1")
print(f"Agent: A2C")
print(f" n_steps: {agent.n_steps}")
print(f" entropy_coef: {agent.entropy_coef}")
print(f" value_coef: {agent.value_coef}")
total_params = sum(p.numel() for p in agent.network.parameters())
print(f" Total parameters: {total_params:,}")
# 訓練
num_episodes = 500
episode_rewards = []
print("\nTraining...")
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
# n-stepデータの収集
log_probs = []
entropies = []
values = []
rewards = []
dones = []
done = False
while not done:
action, log_prob, entropy, value = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
log_probs.append(log_prob)
entropies.append(entropy)
values.append(value)
rewards.append(reward)
dones.append(done)
episode_reward += reward
state = next_state
# n-stepごとまたはエピソード終了時に更新
if len(rewards) >= agent.n_steps or done:
# 次の状態の価値
if done:
next_value = 0
else:
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
with torch.no_grad():
_, next_value = agent.network(next_state_tensor)
next_value = next_value.item()
# Returns and Advantagesの計算
returns, advantages = agent.compute_returns(rewards, values, dones, next_value)
# 更新
loss, actor_loss, critic_loss, entropy_loss = agent.update(
log_probs, entropies, values, returns, advantages
)
# リセット
log_probs = []
entropies = []
values = []
rewards = []
dones = []
episode_rewards.append(episode_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-100:])
print(f"Episode {episode+1:3d}, Avg Reward: {avg_reward:.2f}, "
f"Loss: {loss:.4f} (Actor: {actor_loss:.4f}, Critic: {critic_loss:.4f}, Entropy: {entropy_loss:.4f})")
print(f"\nTraining completed!")
print(f"Final average reward (last 100 episodes): {np.mean(episode_rewards[-100:]):.2f}")
print("\n✓ A2Cの特徴:")
print(" • n-step returns(より正確なリターン推定)")
print(" • エントロピー正則化(探索促進)")
print(" • 勾配クリッピング(安定性向上)")
print(" • 並列環境対応(この例では1環境)")
出力例:
=== A2C on CartPole ===
Environment: CartPole-v1
Agent: A2C
n_steps: 5
entropy_coef: 0.01
value_coef: 0.5
Total parameters: 68,097
Training...
Episode 50, Avg Reward: 56.78, Loss: 1.8765 (Actor: 0.5432, Critic: 2.6543, Entropy: -0.5678)
Episode 100, Avg Reward: 112.34, Loss: 1.2345 (Actor: 0.3456, Critic: 1.7654, Entropy: -0.4321)
Episode 150, Avg Reward: 178.56, Loss: 0.8765 (Actor: 0.2345, Critic: 1.2987, Entropy: -0.3456)
Episode 200, Avg Reward: 220.45, Loss: 0.6543 (Actor: 0.1876, Critic: 0.9876, Entropy: -0.2987)
Episode 250, Avg Reward: 265.78, Loss: 0.4987 (Actor: 0.1432, Critic: 0.7654, Entropy: -0.2456)
Episode 300, Avg Reward: 295.34, Loss: 0.3876 (Actor: 0.1098, Critic: 0.6321, Entropy: -0.2134)
Episode 350, Avg Reward: 320.67, Loss: 0.2987 (Actor: 0.0876, Critic: 0.5234, Entropy: -0.1876)
Episode 400, Avg Reward: 340.23, Loss: 0.2345 (Actor: 0.0654, Critic: 0.4321, Entropy: -0.1654)
Episode 450, Avg Reward: 355.89, Loss: 0.1876 (Actor: 0.0543, Critic: 0.3654, Entropy: -0.1432)
Episode 500, Avg Reward: 370.45, Loss: 0.1543 (Actor: 0.0432, Critic: 0.3098, Entropy: -0.1234)
Training completed!
Final average reward (last 100 episodes): 370.45
✓ A2Cの特徴:
• n-step returns(より正確なリターン推定)
• エントロピー正則化(探索促進)
• 勾配クリッピング(安定性向上)
• 並列環境対応(この例では1環境)
4.5 Proximal Policy Optimization (PPO)
4.5.1 PPOの動機
Policy Gradientの課題:
- 大きなパラメータ更新: 方策が大きく変化しすぎると性能が悪化
- サンプル効率: on-policy学習は非効率
PPO (Proximal Policy Optimization)の解決策:
- Clipped objective: 方策の変化を制限
- Multiple epochs: 同じデータで複数回更新(off-policyに近い)
- Trust region: 安全な更新範囲内で最適化
4.5.2 PPOのClipped Objective
PPOの目的関数は以下のようにクリップされた確率比を使用します:
$$ L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min(r_t(\theta) A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) A_t) \right] $$ここで:
- $r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{\text{old}}}(a_t|s_t)}$: 確率比
- $\epsilon$: クリッピング範囲(通常 0.1 〜 0.2)
- $A_t$: Advantage
直感的理解:
- Advantageが正の場合: 確率比を $[1, 1+\epsilon]$ に制限(過度な増加を防ぐ)
- Advantageが負の場合: 確率比を $[1-\epsilon, 1]$ に制限(過度な減少を防ぐ)
π_old(a|s)"] NewPolicy["New Policy
π_θ(a|s)"] Ratio["Probability Ratio
r = π_θ / π_old"] Clip["Clipping
clip(r, 1-ε, 1+ε)"] Advantage["Advantage
A_t"] Obj1["Objective 1
r × A"] Obj2["Objective 2
clip(r) × A"] Min["min(Obj1, Obj2)"] Loss["PPO Loss
-E[min(...)]"] OldPolicy --> Ratio NewPolicy --> Ratio Ratio --> Obj1 Ratio --> Clip Clip --> Obj2 Advantage --> Obj1 Advantage --> Obj2 Obj1 --> Min Obj2 --> Min Min --> Loss style OldPolicy fill:#7b2cbf,color:#fff style NewPolicy fill:#27ae60,color:#fff style Loss fill:#e74c3c,color:#fff end
4.5.3 PPO実装
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gym
from torch.distributions import Categorical
class PPONetwork(nn.Module):
"""PPO Network"""
def __init__(self, state_dim, action_dim, hidden_dim=64):
super(PPONetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.actor = nn.Linear(hidden_dim, action_dim)
self.critic = nn.Linear(hidden_dim, 1)
def forward(self, state):
x = F.tanh(self.fc1(state))
x = F.tanh(self.fc2(x))
action_logits = self.actor(x)
state_value = self.critic(x)
return action_logits, state_value
class PPO:
"""
Proximal Policy Optimization (PPO)
Features:
- Clipped objective
- Multiple epochs per update
- GAE (Generalized Advantage Estimation)
"""
def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
epsilon=0.2, gae_lambda=0.95, epochs=10, batch_size=64):
"""
Args:
state_dim: 状態空間の次元
action_dim: 行動空間の次元
lr: 学習率
gamma: 割引率
epsilon: クリッピング範囲
gae_lambda: GAE λ
epochs: 1回のデータ収集あたりの更新回数
batch_size: ミニバッチサイズ
"""
self.gamma = gamma
self.epsilon = epsilon
self.gae_lambda = gae_lambda
self.epochs = epochs
self.batch_size = batch_size
self.network = PPONetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
def select_action(self, state):
"""行動選択"""
state = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
action_logits, state_value = self.network(state)
dist = Categorical(logits=action_logits)
action = dist.sample()
log_prob = dist.log_prob(action)
return action.item(), log_prob.item(), state_value.item()
def compute_gae(self, rewards, values, dones, next_value):
"""
Generalized Advantage Estimation (GAE)
Args:
rewards: 報酬列
values: 状態価値列
dones: 終了フラグ列
next_value: 最後の次状態の価値
Returns:
advantages: GAE Advantage
returns: リターン
"""
advantages = []
gae = 0
values = values + [next_value]
for step in reversed(range(len(rewards))):
delta = rewards[step] + self.gamma * values[step + 1] * (1 - dones[step]) - values[step]
gae = delta + self.gamma * self.gae_lambda * (1 - dones[step]) * gae
advantages.insert(0, gae)
advantages = torch.tensor(advantages, dtype=torch.float32)
returns = advantages + torch.tensor(values[:-1], dtype=torch.float32)
return advantages, returns
def update(self, states, actions, old_log_probs, returns, advantages):
"""
PPO更新(Multiple epochs)
Args:
states: 状態列
actions: 行動列
old_log_probs: 旧方策のlog確率
returns: リターン
advantages: Advantage
"""
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
old_log_probs = torch.FloatTensor(old_log_probs)
returns = returns.detach()
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
dataset_size = states.size(0)
for epoch in range(self.epochs):
# ミニバッチでの更新
indices = np.random.permutation(dataset_size)
for start in range(0, dataset_size, self.batch_size):
end = start + self.batch_size
batch_indices = indices[start:end]
batch_states = states[batch_indices]
batch_actions = actions[batch_indices]
batch_old_log_probs = old_log_probs[batch_indices]
batch_returns = returns[batch_indices]
batch_advantages = advantages[batch_indices]
# 現在の方策の評価
action_logits, state_values = self.network(batch_states)
dist = Categorical(logits=action_logits)
new_log_probs = dist.log_prob(batch_actions)
entropy = dist.entropy()
# 確率比
ratio = torch.exp(new_log_probs - batch_old_log_probs)
# Clipped objective
surr1 = ratio * batch_advantages
surr2 = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon) * batch_advantages
actor_loss = -torch.min(surr1, surr2).mean()
# Critic損失
critic_loss = F.mse_loss(state_values.squeeze(), batch_returns)
# エントロピーボーナス
entropy_loss = -entropy.mean()
# 総損失
loss = actor_loss + 0.5 * critic_loss + 0.01 * entropy_loss
# 更新
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
self.optimizer.step()
# デモンストレーション
print("=== PPO on CartPole ===\n")
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = PPO(state_dim, action_dim, lr=3e-4, gamma=0.99, epsilon=0.2, epochs=10)
print(f"Environment: CartPole-v1")
print(f"Agent: PPO")
print(f" epsilon (clip): {agent.epsilon}")
print(f" gae_lambda: {agent.gae_lambda}")
print(f" epochs per update: {agent.epochs}")
total_params = sum(p.numel() for p in agent.network.parameters())
print(f" Total parameters: {total_params:,}")
# 訓練
num_iterations = 100
update_timesteps = 2048 # データ収集ステップ数
episode_rewards = []
print("\nTraining...")
total_timesteps = 0
for iteration in range(num_iterations):
# データ収集
states_list = []
actions_list = []
log_probs_list = []
rewards_list = []
values_list = []
dones_list = []
state = env.reset()
episode_reward = 0
for _ in range(update_timesteps):
action, log_prob, value = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
states_list.append(state)
actions_list.append(action)
log_probs_list.append(log_prob)
rewards_list.append(reward)
values_list.append(value)
dones_list.append(done)
episode_reward += reward
total_timesteps += 1
state = next_state
if done:
episode_rewards.append(episode_reward)
episode_reward = 0
state = env.reset()
# 最後の状態の価値
_, _, next_value = agent.select_action(state)
# GAEの計算
advantages, returns = agent.compute_gae(rewards_list, values_list, dones_list, next_value)
# PPO更新
agent.update(states_list, actions_list, log_probs_list, returns, advantages)
if (iteration + 1) % 10 == 0:
avg_reward = np.mean(episode_rewards[-100:]) if len(episode_rewards) >= 100 else np.mean(episode_rewards)
print(f"Iteration {iteration+1:3d}, Timesteps: {total_timesteps}, Avg Reward: {avg_reward:.2f}")
print(f"\nTraining completed!")
print(f"Total timesteps: {total_timesteps}")
print(f"Final average reward (last 100 episodes): {np.mean(episode_rewards[-100:]):.2f}")
print("\n✓ PPOの特徴:")
print(" • Clipped objective(安全な方策更新)")
print(" • Multiple epochs(データ再利用)")
print(" • GAE(バイアス-分散トレードオフ)")
print(" • 現代的なPolicy Gradientのデファクトスタンダード")
print(" • OpenAI Five、ChatGPTのRLHFなどに使用")
出力例:
=== PPO on CartPole ===
Environment: CartPole-v1
Agent: PPO
epsilon (clip): 0.2
gae_lambda: 0.95
epochs per update: 10
Total parameters: 4,545
Training...
Iteration 10, Timesteps: 20480, Avg Reward: 78.45
Iteration 20, Timesteps: 40960, Avg Reward: 145.67
Iteration 30, Timesteps: 61440, Avg Reward: 210.34
Iteration 40, Timesteps: 81920, Avg Reward: 265.89
Iteration 50, Timesteps: 102400, Avg Reward: 310.45
Iteration 60, Timesteps: 122880, Avg Reward: 345.67
Iteration 70, Timesteps: 143360, Avg Reward: 380.23
Iteration 80, Timesteps: 163840, Avg Reward: 405.78
Iteration 90, Timesteps: 184320, Avg Reward: 425.34
Iteration 100, Timesteps: 204800, Avg Reward: 440.56
Training completed!
Total timesteps: 204800
Final average reward (last 100 episodes): 440.56
✓ PPOの特徴:
• Clipped objective(安全な方策更新)
• Multiple epochs(データ再利用)
• GAE(バイアス-分散トレードオフ)
• 現代的なPolicy Gradientのデファクトスタンダード
• OpenAI Five、ChatGPTのRLHFなどに使用
4.6 実践:LunarLander連続制御
4.6.1 LunarLander環境
LunarLander-v2は、月面着陸船を制御するタスクです。
| 項目 | 値 |
|---|---|
| 状態空間 | 8次元(位置、速度、角度、角速度、脚接地) |
| 行動空間 | 4次元(何もしない、左エンジン、メインエンジン、右エンジン) |
| 目標 | 着陸パッドに安全に着陸(200点以上で解決) |
| 報酬 | 着陸成功: +100〜+140、墜落: -100、燃料消費: マイナス |
4.6.2 PPOによるLunarLander学習
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gym
from torch.distributions import Categorical
import matplotlib.pyplot as plt
# PPOクラスは前のセクションと同じ(省略)
# LunarLanderでの訓練
print("=== PPO on LunarLander-v2 ===\n")
env = gym.make('LunarLander-v2')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
print(f"Environment: LunarLander-v2")
print(f" State dimension: {state_dim}")
print(f" Action dimension: {action_dim}")
print(f" Solved threshold: 200")
agent = PPO(state_dim, action_dim, lr=3e-4, gamma=0.99, epsilon=0.2, epochs=10, batch_size=64)
print(f"\nAgent: PPO")
total_params = sum(p.numel() for p in agent.network.parameters())
print(f" Total parameters: {total_params:,}")
# 訓練設定
num_iterations = 300
update_timesteps = 2048
episode_rewards = []
all_episode_rewards = []
print("\nTraining...")
total_timesteps = 0
best_avg_reward = -float('inf')
for iteration in range(num_iterations):
# データ収集
states_list = []
actions_list = []
log_probs_list = []
rewards_list = []
values_list = []
dones_list = []
state = env.reset()
episode_reward = 0
for _ in range(update_timesteps):
action, log_prob, value = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
states_list.append(state)
actions_list.append(action)
log_probs_list.append(log_prob)
rewards_list.append(reward)
values_list.append(value)
dones_list.append(done)
episode_reward += reward
total_timesteps += 1
state = next_state
if done:
all_episode_rewards.append(episode_reward)
episode_reward = 0
state = env.reset()
# 最後の状態の価値
_, _, next_value = agent.select_action(state)
# GAEの計算
advantages, returns = agent.compute_gae(rewards_list, values_list, dones_list, next_value)
# PPO更新
agent.update(states_list, actions_list, log_probs_list, returns, advantages)
# 評価
if (iteration + 1) % 10 == 0:
avg_reward = np.mean(all_episode_rewards[-100:]) if len(all_episode_rewards) >= 100 else np.mean(all_episode_rewards)
episode_rewards.append(avg_reward)
if avg_reward > best_avg_reward:
best_avg_reward = avg_reward
print(f"Iteration {iteration+1:3d}, Timesteps: {total_timesteps}, "
f"Avg Reward: {avg_reward:.2f}, Best: {best_avg_reward:.2f}")
if avg_reward >= 200:
print(f"\n🎉 Solved! Average reward {avg_reward:.2f} >= 200")
break
print(f"\nTraining completed!")
print(f"Total timesteps: {total_timesteps}")
print(f"Best average reward: {best_avg_reward:.2f}")
# 可視化
fig, ax = plt.subplots(figsize=(10, 5))
# 全エピソードの報酬
ax.plot(all_episode_rewards, alpha=0.3, color='steelblue', label='Episode Reward')
# 移動平均(100エピソード)
window = 100
moving_avg = [np.mean(all_episode_rewards[max(0, i-window):i+1])
for i in range(len(all_episode_rewards))]
ax.plot(moving_avg, linewidth=2, color='darkorange', label=f'Moving Average ({window})')
ax.axhline(y=200, color='red', linestyle='--', linewidth=2, label='Solved Threshold (200)')
ax.set_xlabel('Episode', fontsize=12, fontweight='bold')
ax.set_ylabel('Reward', fontsize=12, fontweight='bold')
ax.set_title('PPO on LunarLander-v2', fontsize=13, fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("\n✓ LunarLanderタスク完了")
print("✓ PPOによる安定した学習")
print("✓ 典型的な解決時間: 100-200万ステップ")
出力例:
=== PPO on LunarLander-v2 ===
Environment: LunarLander-v2
State dimension: 8
Action dimension: 4
Solved threshold: 200
Agent: PPO
Total parameters: 4,673
Training...
Iteration 10, Timesteps: 20480, Avg Reward: -145.67, Best: -145.67
Iteration 20, Timesteps: 40960, Avg Reward: -89.34, Best: -89.34
Iteration 30, Timesteps: 61440, Avg Reward: -45.23, Best: -45.23
Iteration 40, Timesteps: 81920, Avg Reward: 12.56, Best: 12.56
Iteration 50, Timesteps: 102400, Avg Reward: 56.78, Best: 56.78
Iteration 60, Timesteps: 122880, Avg Reward: 98.45, Best: 98.45
Iteration 70, Timesteps: 143360, Avg Reward: 134.67, Best: 134.67
Iteration 80, Timesteps: 163840, Avg Reward: 165.89, Best: 165.89
Iteration 90, Timesteps: 184320, Avg Reward: 185.34, Best: 185.34
Iteration 100, Timesteps: 204800, Avg Reward: 202.56, Best: 202.56
🎉 Solved! Average reward 202.56 >= 200
Training completed!
Total timesteps: 204800
Best average reward: 202.56
✓ LunarLanderタスク完了
✓ PPOによる安定した学習
✓ 典型的な解決時間: 100-200万ステップ
4.7 連続行動空間とGaussian Policy
4.7.1 連続行動空間の扱い
これまでは離散行動空間(CartPole、LunarLander)を扱いましたが、ロボット制御などでは連続行動空間が必要です。
Gaussian Policy:
行動を正規分布からサンプリング:
$$ \pi_\theta(a|s) = \mathcal{N}(\mu_\theta(s), \sigma_\theta(s)^2) $$ここで:
- $\mu_\theta(s)$: 平均(ニューラルネットワークで出力)
- $\sigma_\theta(s)$: 標準偏差(学習可能パラメータまたは固定値)
4.7.2 Gaussian Policy実装
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np
class ContinuousPolicyNetwork(nn.Module):
"""
Continuous action space用のPolicy Network
出力: 平均μと標準偏差σ
"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(ContinuousPolicyNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
# 平均μ
self.mu_head = nn.Linear(hidden_dim, action_dim)
# 標準偏差σ(log scaleで学習)
self.log_std_head = nn.Linear(hidden_dim, action_dim)
# Critic
self.value_head = nn.Linear(hidden_dim, 1)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
# 平均μ
mu = self.mu_head(x)
# 標準偏差σ(正の値を保証)
log_std = self.log_std_head(x)
log_std = torch.clamp(log_std, min=-20, max=2) # 数値安定性のためクリップ
std = torch.exp(log_std)
# 状態価値
value = self.value_head(x)
return mu, std, value
class ContinuousPPO:
"""連続行動空間用のPPO"""
def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99, epsilon=0.2):
self.gamma = gamma
self.epsilon = epsilon
self.network = ContinuousPolicyNetwork(state_dim, action_dim)
self.optimizer = torch.optim.Adam(self.network.parameters(), lr=lr)
def select_action(self, state):
"""
連続行動のサンプリング
Returns:
action: サンプリングされた行動
log_prob: log π(a|s)
value: V(s)
"""
state = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
mu, std, value = self.network(state)
# 正規分布からサンプリング
dist = Normal(mu, std)
action = dist.sample()
log_prob = dist.log_prob(action).sum(dim=-1) # 各次元の積
return action.squeeze().numpy(), log_prob.item(), value.item()
def evaluate_actions(self, states, actions):
"""
既存の行動を評価(PPO更新用)
Returns:
log_probs: log π(a|s)
values: V(s)
entropy: エントロピー
"""
mu, std, values = self.network(states)
dist = Normal(mu, std)
log_probs = dist.log_prob(actions).sum(dim=-1)
entropy = dist.entropy().sum(dim=-1)
return log_probs, values.squeeze(), entropy
# デモンストレーション
print("=== Continuous Action Space PPO ===\n")
# サンプル環境(例: Pendulum-v1)
state_dim = 3
action_dim = 1
agent = ContinuousPPO(state_dim, action_dim, lr=3e-4)
print(f"State dimension: {state_dim}")
print(f"Action dimension: {action_dim} (continuous)")
# サンプル状態
state = np.random.randn(state_dim)
# 行動選択
action, log_prob, value = agent.select_action(state)
print(f"\nSample state: {state}")
print(f"Sampled action: {action}")
print(f"Log probability: {log_prob:.4f}")
print(f"State value: {value:.4f}")
# 複数回サンプリング(確率的)
print("\nMultiple samples from same state:")
for i in range(5):
action, _, _ = agent.select_action(state)
print(f" Sample {i+1}: action = {action[0]:.4f}")
print("\n✓ Gaussian Policyの特徴:")
print(" • 連続行動空間に対応")
print(" • 平均μと標準偏差σを学習")
print(" • ロボット制御、自動運転などに適用可能")
print(" • 探索は標準偏差σで制御")
# 実際のPendulum環境での使用例
print("\n=== PPO on Pendulum-v1 (Continuous Control) ===")
import gym
env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
print(f"\nEnvironment: Pendulum-v1")
print(f" State space: {env.observation_space}")
print(f" Action space: {env.action_space}")
agent = ContinuousPPO(state_dim, action_dim, lr=3e-4)
print(f"\nAgent initialized for continuous control")
total_params = sum(p.numel() for p in agent.network.parameters())
print(f" Total parameters: {total_params:,}")
# 1エピソードのテスト
state = env.reset()
episode_reward = 0
for t in range(200):
action, log_prob, value = agent.select_action(state)
# Pendulumの行動範囲は[-2, 2]なので、適切にスケーリング
action_scaled = np.clip(action, -2.0, 2.0)
next_state, reward, done, _ = env.step(action_scaled)
episode_reward += reward
state = next_state
print(f"\nTest episode reward: {episode_reward:.2f}")
print("\n✓ 連続制御タスクでのPPO動作確認完了")
出力例:
=== Continuous Action Space PPO ===
State dimension: 3
Action dimension: 1 (continuous)
Sample state: [ 0.4967 -0.1383 0.6477]
Sampled action: [0.8732]
Log probability: -1.2345
State value: 0.1234
Multiple samples from same state:
Sample 1: action = 0.7654
Sample 2: action = 0.9123
Sample 3: action = 0.8456
Sample 4: action = 0.8901
Sample 5: action = 0.8234
✓ Gaussian Policyの特徴:
• 連続行動空間に対応
• 平均μと標準偏差σを学習
• ロボット制御、自動運転などに適用可能
• 探索は標準偏差σで制御
=== PPO on Pendulum-v1 (Continuous Control) ===
Environment: Pendulum-v1
State space: Box([-1. -1. -8.], [1. 1. 8.], (3,), float32)
Action space: Box(-2.0, 2.0, (1,), float32)
Agent initialized for continuous control
Total parameters: 133,121
Test episode reward: -1234.56
✓ 連続制御タスクでのPPO動作確認完了
4.8 まとめと発展トピック
本章で学んだこと
| トピック | 重要ポイント |
|---|---|
| Policy Gradient | 方策を直接最適化、連続行動対応、確率的方策 |
| REINFORCE | 最もシンプルなPG、モンテカルロ法、高分散 |
| Actor-Critic | ActorとCriticの組み合わせ、TD学習、低分散 |
| A2C | n-step returns、エントロピー正則化、並列環境 |
| PPO | Clipped objective、安全な更新、デファクトスタンダード |
| 連続制御 | Gaussian Policy、μとσの学習、ロボット制御 |
アルゴリズム比較
| アルゴリズム | 更新 | 分散 | サンプル効率 | 実装難易度 |
|---|---|---|---|---|
| REINFORCE | エピソード終了後 | 高 | 低 | 易 |
| Actor-Critic | 1ステップごと | 中 | 中 | 中 |
| A2C | n-stepごと | 中 | 中 | 中 |
| PPO | バッチ(複数epoch) | 低 | 高 | 中 |
発展トピック
Trust Region Policy Optimization (TRPO)
PPOの前身。KL divergenceで方策更新を制約。理論的保証が強いが、計算コストが高い。2次最適化、Fisher情報行列の計算が必要。
Soft Actor-Critic (SAC)
オフポリシーのActor-Critic。エントロピー最大化を目的関数に組み込み、ロバストな学習を実現。連続制御タスクで高性能。経験再生を使用しサンプル効率が高い。
Deterministic Policy Gradient (DPG / DDPG)
決定的方策(確率的でない)のPolicy Gradient。連続行動空間に特化。Actor-Criticアーキテクチャとオフポリシー学習。ロボット制御で広く使用。
Twin Delayed DDPG (TD3)
DDPGの改良版。2つのCriticネットワーク(Twin)、Actor更新の遅延、ターゲット方策のノイズ追加。過大評価バイアスを軽減。
Generalized Advantage Estimation (GAE)
Advantageの推定手法。λパラメータでバイアスと分散のトレードオフを調整。TD(λ)のPolicy Gradient版。PPOやA2Cで標準的に使用。
Multi-Agent Reinforcement Learning (MARL)
複数エージェントの協調・競争学習。MAPPO、QMIX、MADDPG等のアルゴリズム。ゲームAI、ロボット群制御、交通システムに応用。
演習問題
演習 4.1: REINFORCEの改善
課題: REINFORCEにベースライン(状態価値関数)を追加し、分散削減効果を検証してください。
実装内容:
- Criticネットワークの追加
- Advantage = R_t - V(s_t) の計算
- ベースラインあり・なしの学習曲線比較
演習 4.2: A2Cの並列環境実装
課題: 複数環境を並列に実行するA2Cを実装してください。
実装要件:
- multiprocessingまたはvectorized environmentsの使用
- 4〜16個の並列環境
- 学習速度とサンプル効率の改善を確認
演習 4.3: PPOのハイパーパラメータチューニング
課題: LunarLanderでPPOのハイパーパラメータを最適化してください。
調整パラメータ: epsilon (clip), learning rate, batch_size, epochs, GAE lambda
評価: 収束速度、最終性能、安定性
演習 4.4: Gaussian PolicyでPendulum制御
課題: 連続制御タスクPendulum-v1をPPOで解いてください。
実装内容:
- Gaussian Policyの実装
- 標準偏差σの減衰スケジュール
- -200以上の平均報酬を達成
演習 4.5: Atariゲームへの適用
課題: PPOをAtariゲーム(例: Pong)に適用してください。
実装要件:
- CNNベースのPolicy Network
- フレームスタッキング(4フレーム)
- 報酬クリッピング、Frame skipping
- 人間レベルの性能を目指す
演習 4.6: カスタム環境での応用
課題: 自分でOpenAI Gym環境を作成し、PPOで学習させてください。
例:
- 簡単な迷路ナビゲーション
- リソース管理ゲーム
- 簡単なロボットアーム制御
実装: gym.Envを継承した環境クラス、適切な報酬設計、PPOでの学習
次章予告
第5章では、モデルベース強化学習を学びます。環境のモデルを学習し、計画と学習を組み合わせた高度なアプローチを探ります。
次章のトピック:
・モデルベースvsモデルフリー
・環境モデルの学習(World Models)
・Planning手法(MCTS、MuZero)
・Dyna-Q、Model-based RL
・想像上での学習(Dreamer)
・サンプル効率の大幅改善
・実装:モデル学習とプランニング