本章では、現代の強化学習の基盤となる方策勾配法(Policy Gradient Methods)を解説します。なぜ直接的な方策最適化が強力なのかを理解し、数学的基礎をマスターし、Stable-Baselines3を用いたPPOの実装まで、最先端のアルゴリズムを学びます。
学習目標
本章を読むことで、以下ができるようになります:
- 価値ベース手法の限界と方策勾配法の利点を理解する
- 方策勾配定理を導出し、説明できる
- ベースライン付きREINFORCEアルゴリズムを実装する
- Actor-Criticアーキテクチャを理解し、実装する
- エントロピー正則化を用いたAdvantage Actor-Critic(A2C)を実装する
- PPOをマスターする: クリッピング、信頼領域、GAE
- ガウス方策による連続行動空間を扱う
- 実践的なRL応用にStable-Baselines3を使用する
4.1 なぜ方策勾配法なのか?
4.1.1 価値ベース手法の限界
第2章と第3章では、価値ベース手法(Q学習、DQN)を学びました。強力な手法ですが、固有の限界があります:
| 限界 | 説明 | 影響 |
|---|---|---|
| 離散行動のみ | $\arg\max_a Q(s,a)$演算が必要 | 連続制御(ロボティクス)を扱えない |
| 決定論的方策 | $\epsilon$-greedyは回避策 | 最適な確率的方策を学習できない |
| 高次元行動 | 指数的な行動空間 | 組合せ爆発 |
| 小さな方策変化 | Q値の変化が大きな方策シフトを引き起こす | 学習の不安定性 |
「じゃんけんでは、最適な方策は一様ランダムです。価値ベース手法はこれを自然に表現するのに苦労しますが、方策勾配法はエレガントに扱えます。」
4.1.2 直接的な方策最適化
方策勾配法は、パラメータ$\theta$で表現された方策$\pi_\theta(a|s)$を直接最適化します:
4.1.3 方策勾配法の利点
- 連続行動空間: ロボットの関節角度やステアリングを自然に扱える
- 確率的方策: 組み込みの探索、部分観測に最適
- 滑らかな最適化: $\theta$の小さな変化は方策の小さな変化につながる
- 収束保証: 明確に定義された目的関数での勾配上昇
4.1.4 方策のパラメータ化
離散行動の場合、softmaxを使用:
$$ \pi_\theta(a|s) = \frac{\exp(f_\theta(s, a))}{\sum_{a'} \exp(f_\theta(s, a'))} $$連続行動の場合、ガウス分布を使用:
$$ \pi_\theta(a|s) = \mathcal{N}(\mu_\theta(s), \sigma_\theta(s)^2) $$4.2 方策勾配定理
4.2.1 目的関数
期待収益を最大化したい:
$$ J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}[R(\tau)] = \mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^{T} \gamma^t r_t\right] $$ここで$\tau = (s_0, a_0, r_1, s_1, a_1, \ldots)$は方策$\pi_\theta$でサンプリングされた軌道です。
4.2.2 方策勾配定理
重要な洞察: 環境のダイナミクスを知らなくても勾配$\nabla_\theta J(\theta)$を計算できます!
$$ \nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}\left[\nabla_\theta \log \pi_\theta(a|s) \cdot Q^{\pi_\theta}(s, a)\right] $$または、軌道を使った等価な形式:
$$ \nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) \cdot G_t\right] $$ここで$G_t = \sum_{t'=t}^{T} \gamma^{t'-t} r_{t'}$は時刻$t$からの収益です。
4.2.3 数式の直感的理解
- $\nabla_\theta \log \pi_\theta(a|s)$: 行動$a$の確率を増加させる方向
- $Q^{\pi}(s,a)$または$G_t$: その行動はどれくらい良かったか?
- 結果: 良い行動の確率を増加させ、悪い行動の確率を減少させる
4.3 REINFORCEアルゴリズム
4.3.1 モンテカルロ方策勾配
REINFORCE(Williams, 1992)は、完全なエピソードを使用して方策勾配を推定します:
- $\pi_\theta$を使って軌道$\tau$をサンプリング
- 各タイムステップの収益$G_t$を計算
- 更新: $\theta \leftarrow \theta + \alpha \sum_{t} \nabla_\theta \log \pi_\theta(a_t|s_t) G_t$
4.3.2 高分散問題
REINFORCEは$G_t$がエピソード間で大きく変動するため、高分散に悩まされます。解決策はベースライン減算です。
$$ \nabla_\theta J(\theta) = \mathbb{E}\left[\nabla_\theta \log \pi_\theta(a_t|s_t) (G_t - b(s_t))\right] $$最適なベースラインは$b(s) = V^{\pi}(s)$で、これによりアドバンテージが得られます:
$$ A^{\pi}(s, a) = Q^{\pi}(s, a) - V^{\pi}(s) $$4.3.3 REINFORCEの実装
# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0
# - matplotlib>=3.7.0
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from collections import deque
print("=== REINFORCE アルゴリズムの実装 ===\n")
class PolicyNetwork(nn.Module):
"""
REINFORCE用の方策ネットワーク
状態が与えられると行動確率を出力します。
"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(PolicyNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
logits = self.fc3(x)
return F.softmax(logits, dim=-1)
class ValueNetwork(nn.Module):
"""
ベースライン用の価値ネットワーク
分散を減らすためにV(s)を推定します。
"""
def __init__(self, state_dim, hidden_dim=128):
super(ValueNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, 1)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
return self.fc3(x)
class REINFORCEWithBaseline:
"""学習されたベースライン(価値関数)付きREINFORCE"""
def __init__(self, state_dim, action_dim, lr_policy=0.001, lr_value=0.001, gamma=0.99):
self.gamma = gamma
self.policy = PolicyNetwork(state_dim, action_dim)
self.value = ValueNetwork(state_dim)
self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=lr_policy)
self.value_optimizer = optim.Adam(self.value.parameters(), lr=lr_value)
# エピソード保存
self.saved_log_probs = []
self.saved_values = []
self.rewards = []
def select_action(self, state):
"""行動を選択し、対数確率を保存"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.policy(state_tensor)
value = self.value(state_tensor)
dist = torch.distributions.Categorical(action_probs)
action = dist.sample()
self.saved_log_probs.append(dist.log_prob(action))
self.saved_values.append(value)
return action.item()
def update(self):
"""エピソード終了後に方策と価値ネットワークを更新"""
R = 0
returns = []
# 収益を計算(逆順)
for r in reversed(self.rewards):
R = r + self.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns, dtype=torch.float32)
# 安定性のため収益を正規化
if len(returns) > 1:
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 損失を計算
policy_losses = []
value_losses = []
for log_prob, value, G in zip(self.saved_log_probs, self.saved_values, returns):
advantage = G - value.squeeze().detach()
policy_losses.append(-log_prob * advantage)
value_losses.append(F.mse_loss(value.squeeze(), G))
# 方策を更新
self.policy_optimizer.zero_grad()
policy_loss = torch.stack(policy_losses).sum()
policy_loss.backward()
self.policy_optimizer.step()
# 価値関数を更新
self.value_optimizer.zero_grad()
value_loss = torch.stack(value_losses).sum()
value_loss.backward()
self.value_optimizer.step()
# エピソードデータをクリア
self.saved_log_probs = []
self.saved_values = []
self.rewards = []
return policy_loss.item(), value_loss.item()
# 訓練
print("環境: CartPole-v1")
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = REINFORCEWithBaseline(state_dim, action_dim, lr_policy=0.01, lr_value=0.01)
print(f" 状態次元: {state_dim}")
print(f" 行動次元: {action_dim}")
print(f" 方策パラメータ数: {sum(p.numel() for p in agent.policy.parameters()):,}")
print(f" 価値パラメータ数: {sum(p.numel() for p in agent.value.parameters()):,}")
num_episodes = 500
episode_rewards = []
moving_avg = deque(maxlen=100)
print("\nベースライン付きREINFORCEを訓練中...")
for episode in range(num_episodes):
state, _ = env.reset()
episode_reward = 0
for t in range(500):
action = agent.select_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
agent.rewards.append(reward)
episode_reward += reward
state = next_state
if done:
break
policy_loss, value_loss = agent.update()
episode_rewards.append(episode_reward)
moving_avg.append(episode_reward)
if (episode + 1) % 100 == 0:
avg = np.mean(moving_avg)
print(f"エピソード {episode+1:3d} | 平均報酬: {avg:.1f} | 方策損失: {policy_loss:.3f}")
env.close()
print(f"\n最終平均(直近100): {np.mean(moving_avg):.1f}")
print("REINFORCE訓練完了!")
期待される出力:
=== REINFORCE アルゴリズムの実装 ===
環境: CartPole-v1
状態次元: 4
行動次元: 2
方策パラメータ数: 17,026
価値パラメータ数: 16,897
ベースライン付きREINFORCEを訓練中...
エピソード 100 | 平均報酬: 45.2 | 方策損失: 12.345
エピソード 200 | 平均報酬: 156.8 | 方策損失: 5.678
エピソード 300 | 平均報酬: 287.3 | 方策損失: 2.345
エピソード 400 | 平均報酬: 412.6 | 方策損失: 1.234
エピソード 500 | 平均報酬: 478.9 | 方策損失: 0.567
最終平均(直近100): 478.9
REINFORCE訓練完了!
4.4 Actor-Critic手法
4.4.1 方策学習と価値学習の統合
Actor-Critic手法は2つのコンポーネントを使用します:
- Actor: 行動を選択する方策ネットワーク$\pi_\theta(a|s)$
- Critic: 状態を評価する価値ネットワーク$V_\phi(s)$
REINFORCE(エピソード完了を待つ)とは異なり、Actor-CriticはTD学習を使用してオンライン更新を行います。
4.4.2 アドバンテージ推定としてのTD誤差
1ステップTD誤差は、アドバンテージの不偏推定量として機能します:
$$ \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) \approx A(s_t, a_t) $$4.4.3 Actor-Criticの実装
# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym
print("=== Actor-Criticの実装 ===\n")
class ActorCriticNetwork(nn.Module):
"""
ActorとCriticヘッドを持つ共有ネットワーク
初期層を共有することでサンプル効率が向上します。
"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(ActorCriticNetwork, self).__init__()
# 共有層
self.shared = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# Actorヘッド
self.actor = nn.Linear(hidden_dim, action_dim)
# Criticヘッド
self.critic = nn.Linear(hidden_dim, 1)
def forward(self, state):
features = self.shared(state)
action_probs = F.softmax(self.actor(features), dim=-1)
state_value = self.critic(features)
return action_probs, state_value
class ActorCritic:
"""TD学習を用いた1ステップActor-Critic"""
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
self.gamma = gamma
self.network = ActorCriticNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
def select_action(self, state):
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs, value = self.network(state_tensor)
dist = torch.distributions.Categorical(action_probs)
action = dist.sample()
return action.item(), dist.log_prob(action), value
def update(self, log_prob, value, reward, next_state, done):
"""各ステップでのTD更新"""
# 次状態の価値を計算
if done:
next_value = torch.tensor([0.0])
else:
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
with torch.no_grad():
_, next_value = self.network(next_state_tensor)
# TD誤差(アドバンテージ推定)
td_target = reward + self.gamma * next_value * (1 - float(done))
td_error = td_target - value
# 損失
actor_loss = -log_prob * td_error.detach() # 方策勾配
critic_loss = td_error.pow(2) # 価値関数MSE
loss = actor_loss + 0.5 * critic_loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
# 訓練
print("CartPole-v1でActor-Criticを訓練中...")
env = gym.make('CartPole-v1')
agent = ActorCritic(state_dim=4, action_dim=2, lr=0.002)
num_episodes = 300
episode_rewards = []
for episode in range(num_episodes):
state, _ = env.reset()
episode_reward = 0
for t in range(500):
action, log_prob, value = agent.select_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
agent.update(log_prob, value, reward, next_state, done)
episode_reward += reward
state = next_state
if done:
break
episode_rewards.append(episode_reward)
if (episode + 1) % 50 == 0:
avg = np.mean(episode_rewards[-100:])
print(f"エピソード {episode+1:3d} | 平均報酬: {avg:.1f}")
env.close()
print(f"\n最終平均: {np.mean(episode_rewards[-100:]):.1f}")
print("\nREINFORCEに対するActor-Criticの利点:")
print(" - 各ステップで更新(エピソード終了ではない)")
print(" - 低分散(MCではなくTDを使用)")
print(" - 継続タスクに対応")
4.5 A2C(Advantage Actor-Critic)
4.5.1 基本的なActor-Criticからの改良
A2C(Advantage Actor-Critic)は以下の強化を加えます:
- nステップ収益: バイアス-分散トレードオフのバランス
- エントロピー正則化: 探索を促進
- 並列環境: サンプル多様性の向上
4.5.2 Nステップ収益
1ステップTDの代わりに、nステップ収益を使用:
$$ G_t^{(n)} = r_t + \gamma r_{t+1} + \cdots + \gamma^{n-1} r_{t+n-1} + \gamma^n V(s_{t+n}) $$4.5.3 エントロピー正則化
探索を促進するためにエントロピーボーナスを追加:
$$ L = -\mathbb{E}[\log \pi(a|s) A(s,a)] + \beta H(\pi(\cdot|s)) $$ここで$H(\pi) = -\sum_a \pi(a|s) \log \pi(a|s)$はエントロピーです。
4.5.4 A2Cの実装
# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym
print("=== A2C(Advantage Actor-Critic)の実装 ===\n")
class A2CNetwork(nn.Module):
"""より大きな容量を持つA2Cネットワーク"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(A2CNetwork, self).__init__()
self.shared = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
self.actor = nn.Linear(hidden_dim, action_dim)
self.critic = nn.Linear(hidden_dim, 1)
def forward(self, state):
features = self.shared(state)
logits = self.actor(features)
value = self.critic(features)
return logits, value
class A2C:
"""
nステップ収益とエントロピー正則化を持つAdvantage Actor-Critic
"""
def __init__(self, state_dim, action_dim, lr=0.0007, gamma=0.99,
n_steps=5, entropy_coef=0.01, value_coef=0.5):
self.gamma = gamma
self.n_steps = n_steps
self.entropy_coef = entropy_coef
self.value_coef = value_coef
self.network = A2CNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
def select_action(self, state):
state_tensor = torch.FloatTensor(state).unsqueeze(0)
logits, value = self.network(state_tensor)
dist = torch.distributions.Categorical(logits=logits)
action = dist.sample()
return (action.item(), dist.log_prob(action),
dist.entropy(), value)
def compute_returns(self, rewards, values, dones, next_value):
"""nステップ収益とアドバンテージを計算"""
returns = []
R = next_value
for step in reversed(range(len(rewards))):
R = rewards[step] + self.gamma * R * (1 - dones[step])
returns.insert(0, R)
returns = torch.tensor(returns, dtype=torch.float32)
values = torch.cat(values).squeeze()
advantages = returns - values.detach()
return returns, advantages
def update(self, log_probs, entropies, values, returns, advantages):
"""エントロピー正則化を用いたA2C更新"""
log_probs = torch.cat(log_probs)
entropies = torch.cat(entropies)
values = torch.cat(values).squeeze()
# アドバンテージを用いたActor損失
actor_loss = -(log_probs * advantages.detach()).mean()
# Critic損失
critic_loss = F.mse_loss(values, returns)
# エントロピーボーナス(エントロピーを最大化するため負)
entropy_loss = -entropies.mean()
# 統合損失
loss = (actor_loss +
self.value_coef * critic_loss +
self.entropy_coef * entropy_loss)
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
self.optimizer.step()
return actor_loss.item(), critic_loss.item(), entropies.mean().item()
# 訓練
print("CartPole-v1でA2Cを訓練中...")
env = gym.make('CartPole-v1')
agent = A2C(state_dim=4, action_dim=2, n_steps=5, entropy_coef=0.01)
print(f" n_steps: {agent.n_steps}")
print(f" entropy_coef: {agent.entropy_coef}")
print(f" value_coef: {agent.value_coef}")
num_episodes = 500
episode_rewards = []
for episode in range(num_episodes):
state, _ = env.reset()
episode_reward = 0
log_probs, entropies, values, rewards, dones = [], [], [], [], []
done = False
while not done:
action, log_prob, entropy, value = agent.select_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
log_probs.append(log_prob)
entropies.append(entropy)
values.append(value)
rewards.append(reward)
dones.append(float(done))
episode_reward += reward
state = next_state
# n_stepsごとまたはエピソード終了時に更新
if len(rewards) >= agent.n_steps or done:
if done:
next_value = 0
else:
with torch.no_grad():
_, next_value = agent.network(
torch.FloatTensor(next_state).unsqueeze(0))
next_value = next_value.item()
returns, advantages = agent.compute_returns(
rewards, values, dones, next_value)
actor_loss, critic_loss, entropy = agent.update(
log_probs, entropies, values, returns, advantages)
log_probs, entropies, values, rewards, dones = [], [], [], [], []
episode_rewards.append(episode_reward)
if (episode + 1) % 100 == 0:
avg = np.mean(episode_rewards[-100:])
print(f"エピソード {episode+1:3d} | 平均: {avg:.1f} | "
f"Actor損失: {actor_loss:.3f} | エントロピー: {entropy:.3f}")
env.close()
print(f"\n最終平均: {np.mean(episode_rewards[-100:]):.1f}")
4.6 PPO(Proximal Policy Optimization)
4.6.1 なぜ信頼領域が重要か
方策勾配の根本的な問題: 大きな更新は良い方策を破壊する可能性がある。
「方策空間で大きすぎるステップを踏むと、性能が壊滅的に崩壊し、二度と回復しない可能性があります。」
TRPO(Trust Region Policy Optimization)は、KLダイバージェンスを制約することでこれに対処しました:
$$ \max_\theta \mathbb{E}\left[\frac{\pi_\theta(a|s)}{\pi_{\theta_{old}}(a|s)} A(s,a)\right] \quad \text{s.t. } D_{KL}(\pi_{\theta_{old}} || \pi_\theta) \leq \delta $$しかし、TRPOは複雑な二次最適化(共役勾配法、Fisher情報行列)を必要とします。
4.6.2 PPOのクリップ目的関数
PPOは、より単純なクリップ目的関数で同様の安定性を実現します:
$$ L^{CLIP}(\theta) = \mathbb{E}_t\left[\min\left(r_t(\theta) A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) A_t\right)\right] $$ここで確率比は:
$$ r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)} $$4.6.3 クリッピングの仕組み
クリップ関数は比率が1から離れすぎるのを防ぎます:
- $A_t > 0$(良い行動)の場合: 比率は$[1, 1+\epsilon]$にクリップ
- $A_t < 0$(悪い行動)の場合: 比率は$[1-\epsilon, 1]$にクリップ
4.6.4 汎化アドバンテージ推定(GAE)
PPOは通常、アドバンテージ推定にGAEを使用し、バイアスと分散のバランスを取ります:
$$ \hat{A}_t^{GAE(\gamma, \lambda)} = \sum_{l=0}^{\infty} (\gamma\lambda)^l \delta_{t+l} $$ここで$\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$はTD誤差です。
- $\lambda = 0$: 1ステップTD(低分散、高バイアス)
- $\lambda = 1$: モンテカルロ(高分散、バイアスなし)
- $\lambda \approx 0.95$: 良いバランス(一般的に使用)
4.6.5 完全なPPO実装
# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym
print("=== PPO(Proximal Policy Optimization)の実装 ===\n")
class PPONetwork(nn.Module):
"""PPO Actor-Criticネットワーク"""
def __init__(self, state_dim, action_dim, hidden_dim=64):
super(PPONetwork, self).__init__()
self.shared = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh()
)
self.actor = nn.Linear(hidden_dim, action_dim)
self.critic = nn.Linear(hidden_dim, 1)
def forward(self, state):
features = self.shared(state)
logits = self.actor(features)
value = self.critic(features)
return logits, value
def get_action_and_value(self, state, action=None):
logits, value = self.forward(state)
dist = torch.distributions.Categorical(logits=logits)
if action is None:
action = dist.sample()
return action, dist.log_prob(action), dist.entropy(), value
class PPO:
"""
以下を備えたProximal Policy Optimization:
- クリップ目的関数
- アドバンテージ推定のためのGAE
- 更新ごとに複数エポック
"""
def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
gae_lambda=0.95, clip_epsilon=0.2, epochs=10,
batch_size=64, entropy_coef=0.01, value_coef=0.5):
self.gamma = gamma
self.gae_lambda = gae_lambda
self.clip_epsilon = clip_epsilon
self.epochs = epochs
self.batch_size = batch_size
self.entropy_coef = entropy_coef
self.value_coef = value_coef
self.network = PPONetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
def select_action(self, state):
"""データ収集用の行動選択"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
action, log_prob, _, value = self.network.get_action_and_value(
state_tensor)
return action.item(), log_prob.item(), value.item()
def compute_gae(self, rewards, values, dones, next_value):
"""
汎化アドバンテージ推定を計算
GAEは低バイアスを維持しながら分散を削減します。
"""
advantages = []
gae = 0
values = values + [next_value]
for step in reversed(range(len(rewards))):
# TD誤差
delta = (rewards[step] +
self.gamma * values[step + 1] * (1 - dones[step]) -
values[step])
# GAE
gae = delta + self.gamma * self.gae_lambda * (1 - dones[step]) * gae
advantages.insert(0, gae)
advantages = torch.tensor(advantages, dtype=torch.float32)
returns = advantages + torch.tensor(values[:-1], dtype=torch.float32)
return advantages, returns
def update(self, states, actions, old_log_probs, returns, advantages):
"""
クリップ目的関数によるPPO更新
収集されたデータに対して複数エポックを実行します。
"""
states = torch.FloatTensor(np.array(states))
actions = torch.LongTensor(actions)
old_log_probs = torch.FloatTensor(old_log_probs)
# アドバンテージを正規化
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
dataset_size = len(states)
for epoch in range(self.epochs):
# シャッフルしてミニバッチを作成
indices = np.random.permutation(dataset_size)
for start in range(0, dataset_size, self.batch_size):
end = start + self.batch_size
batch_idx = indices[start:end]
batch_states = states[batch_idx]
batch_actions = actions[batch_idx]
batch_old_log_probs = old_log_probs[batch_idx]
batch_returns = returns[batch_idx]
batch_advantages = advantages[batch_idx]
# 現在の方策の値を取得
_, new_log_probs, entropy, values = \
self.network.get_action_and_value(batch_states, batch_actions)
# 確率比
ratio = torch.exp(new_log_probs - batch_old_log_probs)
# クリップされた代理目的関数
surr1 = ratio * batch_advantages
surr2 = torch.clamp(ratio,
1 - self.clip_epsilon,
1 + self.clip_epsilon) * batch_advantages
actor_loss = -torch.min(surr1, surr2).mean()
# 価値損失
critic_loss = F.mse_loss(values.squeeze(), batch_returns)
# エントロピーボーナス
entropy_loss = -entropy.mean()
# 総損失
loss = (actor_loss +
self.value_coef * critic_loss +
self.entropy_coef * entropy_loss)
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
self.optimizer.step()
return actor_loss.item(), critic_loss.item()
# 訓練
print("CartPole-v1でPPOを訓練\n")
env = gym.make('CartPole-v1')
agent = PPO(state_dim=4, action_dim=2, lr=3e-4, epochs=10)
print(f"ハイパーパラメータ:")
print(f" clip_epsilon: {agent.clip_epsilon}")
print(f" gae_lambda: {agent.gae_lambda}")
print(f" 更新あたりのエポック数: {agent.epochs}")
print(f" batch_size: {agent.batch_size}")
num_iterations = 100
rollout_steps = 2048
episode_rewards = []
all_rewards = []
print("\n訓練中...")
total_steps = 0
for iteration in range(num_iterations):
# ロールアウトデータを収集
states, actions, log_probs, rewards, values, dones = [], [], [], [], [], []
state, _ = env.reset()
episode_reward = 0
for _ in range(rollout_steps):
action, log_prob, value = agent.select_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
states.append(state)
actions.append(action)
log_probs.append(log_prob)
rewards.append(reward)
values.append(value)
dones.append(float(done))
episode_reward += reward
total_steps += 1
state = next_state
if done:
all_rewards.append(episode_reward)
episode_reward = 0
state, _ = env.reset()
# GAEと収益を計算
_, _, next_value = agent.select_action(state)
advantages, returns = agent.compute_gae(rewards, values, dones, next_value)
# PPO更新
actor_loss, critic_loss = agent.update(
states, actions, log_probs, returns, advantages)
if (iteration + 1) % 10 == 0:
avg_reward = np.mean(all_rewards[-100:]) if all_rewards else 0
print(f"反復 {iteration+1:3d} | ステップ: {total_steps:6d} | "
f"平均報酬: {avg_reward:.1f} | "
f"Actor損失: {actor_loss:.4f}")
env.close()
print(f"\n最終平均: {np.mean(all_rewards[-100:]):.1f}")
print("\nPPOの主要な特徴:")
print(" - クリップ目的関数が破壊的な更新を防止")
print(" - 複数エポックがサンプル効率を向上")
print(" - GAEがアドバンテージ推定のバイアス-分散をバランス")
print(" - シンプルな実装でありながら非常に効果的")
期待される出力:
=== PPO(Proximal Policy Optimization)の実装 ===
CartPole-v1でPPOを訓練
ハイパーパラメータ:
clip_epsilon: 0.2
gae_lambda: 0.95
更新あたりのエポック数: 10
batch_size: 64
訓練中...
反復 10 | ステップ: 20480 | 平均報酬: 156.3 | Actor損失: 0.0234
反復 20 | ステップ: 40960 | 平均報酬: 287.5 | Actor損失: 0.0156
反復 30 | ステップ: 61440 | 平均報酬: 398.2 | Actor損失: 0.0089
反復 40 | ステップ: 81920 | 平均報酬: 456.7 | Actor損失: 0.0045
反復 50 | ステップ: 102400 | 平均報酬: 482.1 | Actor損失: 0.0023
...
反復 100 | ステップ: 204800 | 平均報酬: 498.7 | Actor損失: 0.0012
最終平均: 498.7
PPOの主要な特徴:
- クリップ目的関数が破壊的な更新を防止
- 複数エポックがサンプル効率を向上
- GAEがアドバンテージ推定のバイアス-分散をバランス
- シンプルな実装でありながら非常に効果的
4.6.6 なぜPPOがこれほど人気なのか
| 側面 | PPOの利点 |
|---|---|
| 安定性 | クリッピングが壊滅的な更新を防止 |
| シンプルさ | 一次最適化のみ(Fisher行列不要) |
| サンプル効率 | 複数エポックが収集データを再利用 |
| 汎用性 | チューニングなしで多様なタスクに対応 |
| スケーラビリティ | 環境間で容易に並列化 |
注目すべきPPO応用例:
- OpenAI Five(Dota 2)
- ChatGPT RLHF(人間のフィードバックからの強化学習)
- DeepMindロボティクス研究
- Waymo自動運転
4.7 連続行動空間
4.7.1 ガウス方策
連続行動(例: ロボットの関節トルク)には、ガウス分布を使用します:
$$ \pi_\theta(a|s) = \mathcal{N}(\mu_\theta(s), \sigma_\theta(s)^2) $$ネットワークは以下を出力します:
- $\mu_\theta(s)$: 平均行動(どの行動を取るか)
- $\sigma_\theta(s)$: 標準偏差(探索の量)
4.7.2 ガウス方策の実装
# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gymnasium as gym
print("=== ガウス方策による連続行動空間 ===\n")
class ContinuousPolicyNetwork(nn.Module):
"""
連続行動用の方策ネットワーク
ガウス分布の平均とlog_stdを出力します。
"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(ContinuousPolicyNetwork, self).__init__()
self.shared = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# 平均ヘッド
self.mu = nn.Linear(hidden_dim, action_dim)
# Log std(学習パラメータまたは状態依存)
self.log_std = nn.Linear(hidden_dim, action_dim)
# 価値ヘッド
self.value = nn.Linear(hidden_dim, 1)
def forward(self, state):
features = self.shared(state)
mu = self.mu(features)
log_std = self.log_std(features)
log_std = torch.clamp(log_std, -20, 2) # 数値安定性
std = torch.exp(log_std)
value = self.value(features)
return mu, std, value
class ContinuousPPO:
"""連続行動空間用のPPO"""
def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99):
self.gamma = gamma
self.action_dim = action_dim
self.network = ContinuousPolicyNetwork(state_dim, action_dim)
self.optimizer = torch.optim.Adam(self.network.parameters(), lr=lr)
def select_action(self, state):
state_tensor = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
mu, std, value = self.network(state_tensor)
# ガウス分布からサンプリング
dist = torch.distributions.Normal(mu, std)
action = dist.sample()
log_prob = dist.log_prob(action).sum(dim=-1) # 行動次元で合計
return action.squeeze().numpy(), log_prob.item(), value.item()
def evaluate_actions(self, states, actions):
mu, std, values = self.network(states)
dist = torch.distributions.Normal(mu, std)
log_probs = dist.log_prob(actions).sum(dim=-1)
entropy = dist.entropy().sum(dim=-1)
return log_probs, entropy, values
# Pendulum環境でのデモンストレーション
print("Pendulum-v1(連続制御)でテスト\n")
env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
print(f"環境: Pendulum-v1")
print(f" 状態空間: {env.observation_space}")
print(f" 行動空間: {env.action_space}")
agent = ContinuousPPO(state_dim, action_dim)
# テストエピソード
state, _ = env.reset()
episode_reward = 0
print("\nガウス方策から行動をサンプリング:")
for step in range(5):
action, log_prob, value = agent.select_action(state)
# 行動を有効範囲にクリップ
action_clipped = np.clip(action, -2.0, 2.0)
print(f" ステップ {step}: 行動={action_clipped[0]:.3f}, "
f"log_prob={log_prob:.3f}, 価値={value:.3f}")
next_state, reward, terminated, truncated, _ = env.step(action_clipped)
episode_reward += reward
state = next_state
env.close()
print(f"\nテストエピソード報酬: {episode_reward:.1f}")
print("\nガウス方策の特徴:")
print(" - 連続行動に自然(関節トルク、ステアリング)")
print(" - サンプリングによる探索(stdが探索を制御)")
print(" - 再パラメータ化トリックが勾配フローを可能に")
4.8 Stable-Baselines3の実践例
4.8.1 Stable-Baselines3の紹介
Stable-Baselines3(SB3)は、信頼性が高く、十分にテストされたRLアルゴリズムの実装です。ベストプラクティスに従った本番環境対応のコードを提供します。
4.8.2 Stable-Baselines3でのPPO
# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - stable-baselines3>=2.1.0
# - tensorboard>=2.14.0(オプション、ログ用)
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback
import numpy as np
print("=== Stable-Baselines3 PPOの例 ===\n")
# 環境を作成
env_id = "LunarLander-v2"
print(f"環境: {env_id}")
# 単純な訓練用の単一環境
env = gym.make(env_id)
print(f" 観測空間: {env.observation_space}")
print(f" 行動空間: {env.action_space}")
env.close()
# より良いパフォーマンスのためのベクトル化環境を作成
n_envs = 4
vec_env = make_vec_env(env_id, n_envs=n_envs)
print(f" 並列環境数: {n_envs}")
# カスタムハイパーパラメータでPPOモデルを作成
model = PPO(
"MlpPolicy", # 方策ネットワークタイプ
vec_env,
learning_rate=3e-4, # 学習率
n_steps=2048, # 更新あたりの環境ごとのステップ数
batch_size=64, # ミニバッチサイズ
n_epochs=10, # 更新あたりのエポック数
gamma=0.99, # 割引率
gae_lambda=0.95, # GAE lambda
clip_range=0.2, # PPOクリップ範囲
ent_coef=0.01, # エントロピー係数
vf_coef=0.5, # 価値関数係数
verbose=1,
tensorboard_log="./ppo_lunarlander_tensorboard/"
)
print("\nPPOモデル設定:")
print(f" 方策: {model.policy.__class__.__name__}")
print(f" 学習率: {model.learning_rate}")
print(f" クリップ範囲: {model.clip_range}")
print(f" GAE lambda: {model.gae_lambda}")
# 訓練
print("\nPPOを訓練中...")
total_timesteps = 100000
model.learn(
total_timesteps=total_timesteps,
progress_bar=True # tqdmが必要
)
print("\n訓練完了!")
# 評価
print("\n訓練済みモデルを評価中...")
eval_env = gym.make(env_id)
mean_reward, std_reward = evaluate_policy(
model,
eval_env,
n_eval_episodes=10,
deterministic=True
)
print(f"平均報酬: {mean_reward:.2f} +/- {std_reward:.2f}")
print(f"(LunarLanderの解決閾値: 200)")
# モデルを保存
model.save("ppo_lunarlander")
print("\nモデルを 'ppo_lunarlander.zip' に保存しました")
# 読み込みとテスト
print("\n保存したモデルを読み込んでテスト中...")
loaded_model = PPO.load("ppo_lunarlander")
# レンダリング情報付きのテストエピソード
state, _ = eval_env.reset()
episode_reward = 0
done = False
while not done:
action, _ = loaded_model.predict(state, deterministic=True)
state, reward, terminated, truncated, _ = eval_env.step(action)
done = terminated or truncated
episode_reward += reward
print(f"テストエピソード報酬: {episode_reward:.2f}")
vec_env.close()
eval_env.close()
print("\nSB3 PPOの特徴:")
print(" - 本番環境対応の実装")
print(" - 並列化のためのベクトル化環境")
print(" - 組み込みのTensorBoardログ")
print(" - 簡単な保存/読み込み機能")
print(" - 充実したドキュメントとコミュニティ")
期待される出力:
=== Stable-Baselines3 PPOの例 ===
環境: LunarLander-v2
観測空間: Box([-inf, ...], [inf, ...], (8,), float32)
行動空間: Discrete(4)
並列環境数: 4
PPOモデル設定:
方策: ActorCriticPolicy
学習率: 0.0003
クリップ範囲: 0.2
GAE lambda: 0.95
PPOを訓練中...
| rollout/ | |
| ep_len_mean | 89.3 |
| ep_rew_mean | -156 |
| time/ | |
| fps | 1245 |
| iterations | 1 |
...
| rollout/ | |
| ep_len_mean | 287 |
| ep_rew_mean | 234 |
訓練完了!
訓練済みモデルを評価中...
平均報酬: 256.34 +/- 23.12
(LunarLanderの解決閾値: 200)
モデルを 'ppo_lunarlander.zip' に保存しました
保存したモデルを読み込んでテスト中...
テストエピソード報酬: 267.45
4.8.3 TensorBoardモニタリング
# TensorBoardログを表示するには、ターミナルで実行:
# tensorboard --logdir ./ppo_lunarlander_tensorboard/
# 監視すべき主要メトリクス:
# - rollout/ep_rew_mean: 平均エピソード報酬
# - rollout/ep_len_mean: 平均エピソード長
# - train/policy_loss: Actor損失
# - train/value_loss: Critic損失
# - train/entropy_loss: 探索のためのエントロピー
# - train/approx_kl: KLダイバージェンス(小さいままであるべき)
# - train/clip_fraction: クリッピングが発動する頻度
print("TensorBoardメトリクスガイド:")
print(" ep_rew_mean: 時間とともに増加すべき")
print(" approx_kl: 高すぎる場合(>0.02)、学習率を下げる")
print(" clip_fraction: 約10-20%が典型的")
print(" entropy: 方策が決定論的になるにつれて減少すべき")
まとめ
章のまとめ
| トピック | 重要ポイント |
|---|---|
| 方策勾配 | $\pi_\theta$の直接最適化; 連続行動を扱える |
| REINFORCE | モンテカルロPG; 高分散; 安定性のためベースラインを使用 |
| Actor-Critic | 方策学習と価値学習を統合; TDベースの更新 |
| A2C | Nステップ収益; エントロピー正則化; 同期訓練 |
| PPO | クリップ目的関数; GAE; 複数エポック; 業界標準 |
| 連続制御 | 学習された平均とstdを持つガウス方策 |
アルゴリズム比較
| アルゴリズム | 更新タイプ | 分散 | サンプル効率 | 複雑さ |
|---|---|---|---|---|
| REINFORCE | エピソード終了 | 高 | 低 | シンプル |
| Actor-Critic | 各ステップ | 中 | 中 | 中 |
| A2C | Nステップ | 中 | 中 | 中 |
| PPO | バッチ(複数エポック) | 低 | 高 | 中 |
演習問題
演習4.1: ベースライン比較
課題: CartPoleでベースラインあり/なしのREINFORCEを比較する。
ステップ:
- バニラREINFORCE(ベースラインなし)を実装
- 学習されたV(s)ベースライン付きREINFORCEを実装
- 学習曲線をプロットし、分散を比較
演習4.2: PPOハイパーパラメータ研究
課題: LunarLanderでPPOのハイパーパラメータ感度を調査する。
変動させるパラメータ:
- clip_epsilon: [0.1, 0.2, 0.3]
- n_epochs: [3, 10, 20]
- gae_lambda: [0.9, 0.95, 0.99]
演習4.3: 連続制御
課題: ガウス方策を用いてPendulum-v1でPPOを訓練する。
要件:
- 適切な行動スケーリングを実装
- 平均報酬-200以上を達成
- 行動分布の進化を可視化
演習4.4: エントロピースケジューリング
課題: エントロピー係数のアニーリングを実装する。
アプローチ:
- 探索のため高エントロピー(0.1)から開始
- 活用のため低エントロピー(0.001)に減衰
- 固定エントロピー係数と比較
演習4.5: SB3カスタム環境
課題: カスタムGymnasium環境でPPOを訓練する。
ステップ:
- シンプルなカスタム環境を作成(例: ゴールに到達)
- SB3のcheck_env()を使用して検証
- PPOで訓練しパフォーマンスを評価
演習4.6: マルチ環境訓練
課題: 単一環境と並列環境の訓練を比較する。
メトリクス:
- 報酬閾値に達するまでの実時間
- サンプル効率(総環境ステップ数)
- 学習の安定性(報酬の分散)
次章プレビュー
第5章では、RLの高度な応用と最前線を探求します:
予定トピック:
- モデルベースRLとワールドモデル
- マルチエージェント強化学習
- オフラインRLと模倣学習
- 人間のフィードバックからの強化学習(RLHF)
- 実世界展開の課題
- 現在の研究動向