EN | JP | Last sync: 2026-01-10

第4章: 方策勾配法

直接的な方策最適化: REINFORCE、Actor-Critic、A2C、PPOの理論と実装

読了時間: 30-35分 難易度: 中級〜上級 コード例: 8 演習: 6

本章では、現代の強化学習の基盤となる方策勾配法(Policy Gradient Methods)を解説します。なぜ直接的な方策最適化が強力なのかを理解し、数学的基礎をマスターし、Stable-Baselines3を用いたPPOの実装まで、最先端のアルゴリズムを学びます。

学習目標

本章を読むことで、以下ができるようになります:


4.1 なぜ方策勾配法なのか?

4.1.1 価値ベース手法の限界

第2章と第3章では、価値ベース手法(Q学習、DQN)を学びました。強力な手法ですが、固有の限界があります:

限界 説明 影響
離散行動のみ $\arg\max_a Q(s,a)$演算が必要 連続制御(ロボティクス)を扱えない
決定論的方策 $\epsilon$-greedyは回避策 最適な確率的方策を学習できない
高次元行動 指数的な行動空間 組合せ爆発
小さな方策変化 Q値の変化が大きな方策シフトを引き起こす 学習の不安定性

「じゃんけんでは、最適な方策は一様ランダムです。価値ベース手法はこれを自然に表現するのに苦労しますが、方策勾配法はエレガントに扱えます。」

4.1.2 直接的な方策最適化

方策勾配法は、パラメータ$\theta$で表現された方策$\pi_\theta(a|s)$を直接最適化します:

graph LR subgraph "価値ベース(DQN)" S1["状態 s"] --> Q["Q(s,a)"] Q --> AM["argmax"] AM --> A1["行動 a"] style Q fill:#e74c3c,color:#fff end subgraph "方策ベース" S2["状態 s"] --> P["方策 pi(a|s; theta)"] P --> A2["行動 a(サンプリング)"] style P fill:#27ae60,color:#fff end

4.1.3 方策勾配法の利点

  1. 連続行動空間: ロボットの関節角度やステアリングを自然に扱える
  2. 確率的方策: 組み込みの探索、部分観測に最適
  3. 滑らかな最適化: $\theta$の小さな変化は方策の小さな変化につながる
  4. 収束保証: 明確に定義された目的関数での勾配上昇

4.1.4 方策のパラメータ化

離散行動の場合、softmaxを使用:

$$ \pi_\theta(a|s) = \frac{\exp(f_\theta(s, a))}{\sum_{a'} \exp(f_\theta(s, a'))} $$

連続行動の場合、ガウス分布を使用:

$$ \pi_\theta(a|s) = \mathcal{N}(\mu_\theta(s), \sigma_\theta(s)^2) $$

4.2 方策勾配定理

4.2.1 目的関数

期待収益を最大化したい:

$$ J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}[R(\tau)] = \mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^{T} \gamma^t r_t\right] $$

ここで$\tau = (s_0, a_0, r_1, s_1, a_1, \ldots)$は方策$\pi_\theta$でサンプリングされた軌道です。

4.2.2 方策勾配定理

重要な洞察: 環境のダイナミクスを知らなくても勾配$\nabla_\theta J(\theta)$を計算できます!

$$ \nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}\left[\nabla_\theta \log \pi_\theta(a|s) \cdot Q^{\pi_\theta}(s, a)\right] $$

または、軌道を使った等価な形式:

$$ \nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) \cdot G_t\right] $$

ここで$G_t = \sum_{t'=t}^{T} \gamma^{t'-t} r_{t'}$は時刻$t$からの収益です。

4.2.3 数式の直感的理解

graph TB subgraph "方策勾配更新" Action["行動 a を実行"] Reward["G_t を観測"] GoodAction{{"G_t > 0?"}} Increase["pi(a|s) を増加"] Decrease["pi(a|s) を減少"] Action --> Reward Reward --> GoodAction GoodAction -->|Yes| Increase GoodAction -->|No| Decrease style Increase fill:#27ae60,color:#fff style Decrease fill:#e74c3c,color:#fff end

4.3 REINFORCEアルゴリズム

4.3.1 モンテカルロ方策勾配

REINFORCE(Williams, 1992)は、完全なエピソードを使用して方策勾配を推定します:

  1. $\pi_\theta$を使って軌道$\tau$をサンプリング
  2. 各タイムステップの収益$G_t$を計算
  3. 更新: $\theta \leftarrow \theta + \alpha \sum_{t} \nabla_\theta \log \pi_\theta(a_t|s_t) G_t$

4.3.2 高分散問題

REINFORCEは$G_t$がエピソード間で大きく変動するため、高分散に悩まされます。解決策はベースライン減算です。

$$ \nabla_\theta J(\theta) = \mathbb{E}\left[\nabla_\theta \log \pi_\theta(a_t|s_t) (G_t - b(s_t))\right] $$

最適なベースラインは$b(s) = V^{\pi}(s)$で、これによりアドバンテージが得られます:

$$ A^{\pi}(s, a) = Q^{\pi}(s, a) - V^{\pi}(s) $$

4.3.3 REINFORCEの実装

# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0
# - matplotlib>=3.7.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from collections import deque

print("=== REINFORCE アルゴリズムの実装 ===\n")

class PolicyNetwork(nn.Module):
    """
    REINFORCE用の方策ネットワーク

    状態が与えられると行動確率を出力します。
    """
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        logits = self.fc3(x)
        return F.softmax(logits, dim=-1)


class ValueNetwork(nn.Module):
    """
    ベースライン用の価値ネットワーク

    分散を減らすためにV(s)を推定します。
    """
    def __init__(self, state_dim, hidden_dim=128):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        return self.fc3(x)


class REINFORCEWithBaseline:
    """学習されたベースライン(価値関数)付きREINFORCE"""

    def __init__(self, state_dim, action_dim, lr_policy=0.001, lr_value=0.001, gamma=0.99):
        self.gamma = gamma
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.value = ValueNetwork(state_dim)

        self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=lr_policy)
        self.value_optimizer = optim.Adam(self.value.parameters(), lr=lr_value)

        # エピソード保存
        self.saved_log_probs = []
        self.saved_values = []
        self.rewards = []

    def select_action(self, state):
        """行動を選択し、対数確率を保存"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs = self.policy(state_tensor)
        value = self.value(state_tensor)

        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()

        self.saved_log_probs.append(dist.log_prob(action))
        self.saved_values.append(value)

        return action.item()

    def update(self):
        """エピソード終了後に方策と価値ネットワークを更新"""
        R = 0
        returns = []

        # 収益を計算(逆順)
        for r in reversed(self.rewards):
            R = r + self.gamma * R
            returns.insert(0, R)

        returns = torch.tensor(returns, dtype=torch.float32)

        # 安定性のため収益を正規化
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        # 損失を計算
        policy_losses = []
        value_losses = []

        for log_prob, value, G in zip(self.saved_log_probs, self.saved_values, returns):
            advantage = G - value.squeeze().detach()
            policy_losses.append(-log_prob * advantage)
            value_losses.append(F.mse_loss(value.squeeze(), G))

        # 方策を更新
        self.policy_optimizer.zero_grad()
        policy_loss = torch.stack(policy_losses).sum()
        policy_loss.backward()
        self.policy_optimizer.step()

        # 価値関数を更新
        self.value_optimizer.zero_grad()
        value_loss = torch.stack(value_losses).sum()
        value_loss.backward()
        self.value_optimizer.step()

        # エピソードデータをクリア
        self.saved_log_probs = []
        self.saved_values = []
        self.rewards = []

        return policy_loss.item(), value_loss.item()


# 訓練
print("環境: CartPole-v1")
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

agent = REINFORCEWithBaseline(state_dim, action_dim, lr_policy=0.01, lr_value=0.01)

print(f"  状態次元: {state_dim}")
print(f"  行動次元: {action_dim}")
print(f"  方策パラメータ数: {sum(p.numel() for p in agent.policy.parameters()):,}")
print(f"  価値パラメータ数: {sum(p.numel() for p in agent.value.parameters()):,}")

num_episodes = 500
episode_rewards = []
moving_avg = deque(maxlen=100)

print("\nベースライン付きREINFORCEを訓練中...")
for episode in range(num_episodes):
    state, _ = env.reset()
    episode_reward = 0

    for t in range(500):
        action = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        agent.rewards.append(reward)
        episode_reward += reward
        state = next_state

        if done:
            break

    policy_loss, value_loss = agent.update()

    episode_rewards.append(episode_reward)
    moving_avg.append(episode_reward)

    if (episode + 1) % 100 == 0:
        avg = np.mean(moving_avg)
        print(f"エピソード {episode+1:3d} | 平均報酬: {avg:.1f} | 方策損失: {policy_loss:.3f}")

env.close()
print(f"\n最終平均(直近100): {np.mean(moving_avg):.1f}")
print("REINFORCE訓練完了!")

期待される出力:

=== REINFORCE アルゴリズムの実装 ===

環境: CartPole-v1
  状態次元: 4
  行動次元: 2
  方策パラメータ数: 17,026
  価値パラメータ数: 16,897

ベースライン付きREINFORCEを訓練中...
エピソード 100 | 平均報酬: 45.2 | 方策損失: 12.345
エピソード 200 | 平均報酬: 156.8 | 方策損失: 5.678
エピソード 300 | 平均報酬: 287.3 | 方策損失: 2.345
エピソード 400 | 平均報酬: 412.6 | 方策損失: 1.234
エピソード 500 | 平均報酬: 478.9 | 方策損失: 0.567

最終平均(直近100): 478.9
REINFORCE訓練完了!

4.4 Actor-Critic手法

4.4.1 方策学習と価値学習の統合

Actor-Critic手法は2つのコンポーネントを使用します:

REINFORCE(エピソード完了を待つ)とは異なり、Actor-CriticはTD学習を使用してオンライン更新を行います。

graph TB subgraph "Actor-Criticアーキテクチャ" State["状態 s_t"] --> Actor["Actor pi_theta(a|s)"] State --> Critic["Critic V_phi(s)"] Actor --> Action["行動 a_t"] Critic --> Value["価値 V(s_t)"] Action --> Env["環境"] Env --> Reward["r_t, s_{t+1}"] Reward --> TDError["TD誤差: delta = r + gamma*V(s') - V(s)"] Value --> TDError TDError --> ActorUpdate["Actor: theta += alpha * delta * grad log pi"] TDError --> CriticUpdate["Critic: phi -= beta * delta * grad V"] style Actor fill:#27ae60,color:#fff style Critic fill:#3498db,color:#fff style TDError fill:#f39c12,color:#fff end

4.4.2 アドバンテージ推定としてのTD誤差

1ステップTD誤差は、アドバンテージの不偏推定量として機能します:

$$ \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) \approx A(s_t, a_t) $$

4.4.3 Actor-Criticの実装

# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym

print("=== Actor-Criticの実装 ===\n")

class ActorCriticNetwork(nn.Module):
    """
    ActorとCriticヘッドを持つ共有ネットワーク

    初期層を共有することでサンプル効率が向上します。
    """
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(ActorCriticNetwork, self).__init__()

        # 共有層
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )

        # Actorヘッド
        self.actor = nn.Linear(hidden_dim, action_dim)

        # Criticヘッド
        self.critic = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)
        action_probs = F.softmax(self.actor(features), dim=-1)
        state_value = self.critic(features)
        return action_probs, state_value


class ActorCritic:
    """TD学習を用いた1ステップActor-Critic"""

    def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
        self.gamma = gamma
        self.network = ActorCriticNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)

    def select_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs, value = self.network(state_tensor)

        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()

        return action.item(), dist.log_prob(action), value

    def update(self, log_prob, value, reward, next_state, done):
        """各ステップでのTD更新"""
        # 次状態の価値を計算
        if done:
            next_value = torch.tensor([0.0])
        else:
            next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
            with torch.no_grad():
                _, next_value = self.network(next_state_tensor)

        # TD誤差(アドバンテージ推定)
        td_target = reward + self.gamma * next_value * (1 - float(done))
        td_error = td_target - value

        # 損失
        actor_loss = -log_prob * td_error.detach()  # 方策勾配
        critic_loss = td_error.pow(2)  # 価値関数MSE

        loss = actor_loss + 0.5 * critic_loss

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()


# 訓練
print("CartPole-v1でActor-Criticを訓練中...")
env = gym.make('CartPole-v1')
agent = ActorCritic(state_dim=4, action_dim=2, lr=0.002)

num_episodes = 300
episode_rewards = []

for episode in range(num_episodes):
    state, _ = env.reset()
    episode_reward = 0

    for t in range(500):
        action, log_prob, value = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        agent.update(log_prob, value, reward, next_state, done)

        episode_reward += reward
        state = next_state

        if done:
            break

    episode_rewards.append(episode_reward)

    if (episode + 1) % 50 == 0:
        avg = np.mean(episode_rewards[-100:])
        print(f"エピソード {episode+1:3d} | 平均報酬: {avg:.1f}")

env.close()
print(f"\n最終平均: {np.mean(episode_rewards[-100:]):.1f}")
print("\nREINFORCEに対するActor-Criticの利点:")
print("  - 各ステップで更新(エピソード終了ではない)")
print("  - 低分散(MCではなくTDを使用)")
print("  - 継続タスクに対応")

4.5 A2C(Advantage Actor-Critic)

4.5.1 基本的なActor-Criticからの改良

A2C(Advantage Actor-Critic)は以下の強化を加えます:

4.5.2 Nステップ収益

1ステップTDの代わりに、nステップ収益を使用:

$$ G_t^{(n)} = r_t + \gamma r_{t+1} + \cdots + \gamma^{n-1} r_{t+n-1} + \gamma^n V(s_{t+n}) $$

4.5.3 エントロピー正則化

探索を促進するためにエントロピーボーナスを追加:

$$ L = -\mathbb{E}[\log \pi(a|s) A(s,a)] + \beta H(\pi(\cdot|s)) $$

ここで$H(\pi) = -\sum_a \pi(a|s) \log \pi(a|s)$はエントロピーです。

4.5.4 A2Cの実装

# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym

print("=== A2C(Advantage Actor-Critic)の実装 ===\n")

class A2CNetwork(nn.Module):
    """より大きな容量を持つA2Cネットワーク"""

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(A2CNetwork, self).__init__()

        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )

        self.actor = nn.Linear(hidden_dim, action_dim)
        self.critic = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)
        logits = self.actor(features)
        value = self.critic(features)
        return logits, value


class A2C:
    """
    nステップ収益とエントロピー正則化を持つAdvantage Actor-Critic
    """

    def __init__(self, state_dim, action_dim, lr=0.0007, gamma=0.99,
                 n_steps=5, entropy_coef=0.01, value_coef=0.5):
        self.gamma = gamma
        self.n_steps = n_steps
        self.entropy_coef = entropy_coef
        self.value_coef = value_coef

        self.network = A2CNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)

    def select_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        logits, value = self.network(state_tensor)

        dist = torch.distributions.Categorical(logits=logits)
        action = dist.sample()

        return (action.item(), dist.log_prob(action),
                dist.entropy(), value)

    def compute_returns(self, rewards, values, dones, next_value):
        """nステップ収益とアドバンテージを計算"""
        returns = []
        R = next_value

        for step in reversed(range(len(rewards))):
            R = rewards[step] + self.gamma * R * (1 - dones[step])
            returns.insert(0, R)

        returns = torch.tensor(returns, dtype=torch.float32)
        values = torch.cat(values).squeeze()
        advantages = returns - values.detach()

        return returns, advantages

    def update(self, log_probs, entropies, values, returns, advantages):
        """エントロピー正則化を用いたA2C更新"""
        log_probs = torch.cat(log_probs)
        entropies = torch.cat(entropies)
        values = torch.cat(values).squeeze()

        # アドバンテージを用いたActor損失
        actor_loss = -(log_probs * advantages.detach()).mean()

        # Critic損失
        critic_loss = F.mse_loss(values, returns)

        # エントロピーボーナス(エントロピーを最大化するため負)
        entropy_loss = -entropies.mean()

        # 統合損失
        loss = (actor_loss +
                self.value_coef * critic_loss +
                self.entropy_coef * entropy_loss)

        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
        self.optimizer.step()

        return actor_loss.item(), critic_loss.item(), entropies.mean().item()


# 訓練
print("CartPole-v1でA2Cを訓練中...")
env = gym.make('CartPole-v1')
agent = A2C(state_dim=4, action_dim=2, n_steps=5, entropy_coef=0.01)

print(f"  n_steps: {agent.n_steps}")
print(f"  entropy_coef: {agent.entropy_coef}")
print(f"  value_coef: {agent.value_coef}")

num_episodes = 500
episode_rewards = []

for episode in range(num_episodes):
    state, _ = env.reset()
    episode_reward = 0

    log_probs, entropies, values, rewards, dones = [], [], [], [], []

    done = False
    while not done:
        action, log_prob, entropy, value = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        log_probs.append(log_prob)
        entropies.append(entropy)
        values.append(value)
        rewards.append(reward)
        dones.append(float(done))

        episode_reward += reward
        state = next_state

        # n_stepsごとまたはエピソード終了時に更新
        if len(rewards) >= agent.n_steps or done:
            if done:
                next_value = 0
            else:
                with torch.no_grad():
                    _, next_value = agent.network(
                        torch.FloatTensor(next_state).unsqueeze(0))
                    next_value = next_value.item()

            returns, advantages = agent.compute_returns(
                rewards, values, dones, next_value)
            actor_loss, critic_loss, entropy = agent.update(
                log_probs, entropies, values, returns, advantages)

            log_probs, entropies, values, rewards, dones = [], [], [], [], []

    episode_rewards.append(episode_reward)

    if (episode + 1) % 100 == 0:
        avg = np.mean(episode_rewards[-100:])
        print(f"エピソード {episode+1:3d} | 平均: {avg:.1f} | "
              f"Actor損失: {actor_loss:.3f} | エントロピー: {entropy:.3f}")

env.close()
print(f"\n最終平均: {np.mean(episode_rewards[-100:]):.1f}")

4.6 PPO(Proximal Policy Optimization)

4.6.1 なぜ信頼領域が重要か

方策勾配の根本的な問題: 大きな更新は良い方策を破壊する可能性がある

「方策空間で大きすぎるステップを踏むと、性能が壊滅的に崩壊し、二度と回復しない可能性があります。」

TRPO(Trust Region Policy Optimization)は、KLダイバージェンスを制約することでこれに対処しました:

$$ \max_\theta \mathbb{E}\left[\frac{\pi_\theta(a|s)}{\pi_{\theta_{old}}(a|s)} A(s,a)\right] \quad \text{s.t. } D_{KL}(\pi_{\theta_{old}} || \pi_\theta) \leq \delta $$

しかし、TRPOは複雑な二次最適化(共役勾配法、Fisher情報行列)を必要とします。

4.6.2 PPOのクリップ目的関数

PPOは、より単純なクリップ目的関数で同様の安定性を実現します:

$$ L^{CLIP}(\theta) = \mathbb{E}_t\left[\min\left(r_t(\theta) A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) A_t\right)\right] $$

ここで確率比は:

$$ r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)} $$

4.6.3 クリッピングの仕組み

クリップ関数は比率が1から離れすぎるのを防ぎます:

graph TB subgraph "PPOクリッピングメカニズム" Ratio["確率比 r(theta)"] Advantage{{"A > 0?"}} GoodAction["良い行動: piを増加したい"] BadAction["悪い行動: piを減少したい"] ClipHigh["1+epsilonでクリップ(増加しすぎを防止)"] ClipLow["1-epsilonでクリップ(減少しすぎを防止)"] Min["クリップ済みと未クリップの最小値を取る"] Ratio --> Advantage Advantage -->|Yes| GoodAction Advantage -->|No| BadAction GoodAction --> ClipHigh BadAction --> ClipLow ClipHigh --> Min ClipLow --> Min style ClipHigh fill:#27ae60,color:#fff style ClipLow fill:#e74c3c,color:#fff end

4.6.4 汎化アドバンテージ推定(GAE)

PPOは通常、アドバンテージ推定にGAEを使用し、バイアスと分散のバランスを取ります:

$$ \hat{A}_t^{GAE(\gamma, \lambda)} = \sum_{l=0}^{\infty} (\gamma\lambda)^l \delta_{t+l} $$

ここで$\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$はTD誤差です。

4.6.5 完全なPPO実装

# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym

print("=== PPO(Proximal Policy Optimization)の実装 ===\n")

class PPONetwork(nn.Module):
    """PPO Actor-Criticネットワーク"""

    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(PPONetwork, self).__init__()

        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh()
        )

        self.actor = nn.Linear(hidden_dim, action_dim)
        self.critic = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)
        logits = self.actor(features)
        value = self.critic(features)
        return logits, value

    def get_action_and_value(self, state, action=None):
        logits, value = self.forward(state)
        dist = torch.distributions.Categorical(logits=logits)

        if action is None:
            action = dist.sample()

        return action, dist.log_prob(action), dist.entropy(), value


class PPO:
    """
    以下を備えたProximal Policy Optimization:
    - クリップ目的関数
    - アドバンテージ推定のためのGAE
    - 更新ごとに複数エポック
    """

    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
                 gae_lambda=0.95, clip_epsilon=0.2, epochs=10,
                 batch_size=64, entropy_coef=0.01, value_coef=0.5):

        self.gamma = gamma
        self.gae_lambda = gae_lambda
        self.clip_epsilon = clip_epsilon
        self.epochs = epochs
        self.batch_size = batch_size
        self.entropy_coef = entropy_coef
        self.value_coef = value_coef

        self.network = PPONetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)

    def select_action(self, state):
        """データ収集用の行動選択"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)

        with torch.no_grad():
            action, log_prob, _, value = self.network.get_action_and_value(
                state_tensor)

        return action.item(), log_prob.item(), value.item()

    def compute_gae(self, rewards, values, dones, next_value):
        """
        汎化アドバンテージ推定を計算

        GAEは低バイアスを維持しながら分散を削減します。
        """
        advantages = []
        gae = 0

        values = values + [next_value]

        for step in reversed(range(len(rewards))):
            # TD誤差
            delta = (rewards[step] +
                    self.gamma * values[step + 1] * (1 - dones[step]) -
                    values[step])

            # GAE
            gae = delta + self.gamma * self.gae_lambda * (1 - dones[step]) * gae
            advantages.insert(0, gae)

        advantages = torch.tensor(advantages, dtype=torch.float32)
        returns = advantages + torch.tensor(values[:-1], dtype=torch.float32)

        return advantages, returns

    def update(self, states, actions, old_log_probs, returns, advantages):
        """
        クリップ目的関数によるPPO更新

        収集されたデータに対して複数エポックを実行します。
        """
        states = torch.FloatTensor(np.array(states))
        actions = torch.LongTensor(actions)
        old_log_probs = torch.FloatTensor(old_log_probs)

        # アドバンテージを正規化
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        dataset_size = len(states)

        for epoch in range(self.epochs):
            # シャッフルしてミニバッチを作成
            indices = np.random.permutation(dataset_size)

            for start in range(0, dataset_size, self.batch_size):
                end = start + self.batch_size
                batch_idx = indices[start:end]

                batch_states = states[batch_idx]
                batch_actions = actions[batch_idx]
                batch_old_log_probs = old_log_probs[batch_idx]
                batch_returns = returns[batch_idx]
                batch_advantages = advantages[batch_idx]

                # 現在の方策の値を取得
                _, new_log_probs, entropy, values = \
                    self.network.get_action_and_value(batch_states, batch_actions)

                # 確率比
                ratio = torch.exp(new_log_probs - batch_old_log_probs)

                # クリップされた代理目的関数
                surr1 = ratio * batch_advantages
                surr2 = torch.clamp(ratio,
                                   1 - self.clip_epsilon,
                                   1 + self.clip_epsilon) * batch_advantages

                actor_loss = -torch.min(surr1, surr2).mean()

                # 価値損失
                critic_loss = F.mse_loss(values.squeeze(), batch_returns)

                # エントロピーボーナス
                entropy_loss = -entropy.mean()

                # 総損失
                loss = (actor_loss +
                       self.value_coef * critic_loss +
                       self.entropy_coef * entropy_loss)

                self.optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
                self.optimizer.step()

        return actor_loss.item(), critic_loss.item()


# 訓練
print("CartPole-v1でPPOを訓練\n")
env = gym.make('CartPole-v1')
agent = PPO(state_dim=4, action_dim=2, lr=3e-4, epochs=10)

print(f"ハイパーパラメータ:")
print(f"  clip_epsilon: {agent.clip_epsilon}")
print(f"  gae_lambda: {agent.gae_lambda}")
print(f"  更新あたりのエポック数: {agent.epochs}")
print(f"  batch_size: {agent.batch_size}")

num_iterations = 100
rollout_steps = 2048
episode_rewards = []
all_rewards = []

print("\n訓練中...")
total_steps = 0

for iteration in range(num_iterations):
    # ロールアウトデータを収集
    states, actions, log_probs, rewards, values, dones = [], [], [], [], [], []

    state, _ = env.reset()
    episode_reward = 0

    for _ in range(rollout_steps):
        action, log_prob, value = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        states.append(state)
        actions.append(action)
        log_probs.append(log_prob)
        rewards.append(reward)
        values.append(value)
        dones.append(float(done))

        episode_reward += reward
        total_steps += 1

        state = next_state

        if done:
            all_rewards.append(episode_reward)
            episode_reward = 0
            state, _ = env.reset()

    # GAEと収益を計算
    _, _, next_value = agent.select_action(state)
    advantages, returns = agent.compute_gae(rewards, values, dones, next_value)

    # PPO更新
    actor_loss, critic_loss = agent.update(
        states, actions, log_probs, returns, advantages)

    if (iteration + 1) % 10 == 0:
        avg_reward = np.mean(all_rewards[-100:]) if all_rewards else 0
        print(f"反復 {iteration+1:3d} | ステップ: {total_steps:6d} | "
              f"平均報酬: {avg_reward:.1f} | "
              f"Actor損失: {actor_loss:.4f}")

env.close()
print(f"\n最終平均: {np.mean(all_rewards[-100:]):.1f}")
print("\nPPOの主要な特徴:")
print("  - クリップ目的関数が破壊的な更新を防止")
print("  - 複数エポックがサンプル効率を向上")
print("  - GAEがアドバンテージ推定のバイアス-分散をバランス")
print("  - シンプルな実装でありながら非常に効果的")

期待される出力:

=== PPO(Proximal Policy Optimization)の実装 ===

CartPole-v1でPPOを訓練

ハイパーパラメータ:
  clip_epsilon: 0.2
  gae_lambda: 0.95
  更新あたりのエポック数: 10
  batch_size: 64

訓練中...
反復  10 | ステップ:  20480 | 平均報酬: 156.3 | Actor損失: 0.0234
反復  20 | ステップ:  40960 | 平均報酬: 287.5 | Actor損失: 0.0156
反復  30 | ステップ:  61440 | 平均報酬: 398.2 | Actor損失: 0.0089
反復  40 | ステップ:  81920 | 平均報酬: 456.7 | Actor損失: 0.0045
反復  50 | ステップ: 102400 | 平均報酬: 482.1 | Actor損失: 0.0023
...
反復 100 | ステップ: 204800 | 平均報酬: 498.7 | Actor損失: 0.0012

最終平均: 498.7

PPOの主要な特徴:
  - クリップ目的関数が破壊的な更新を防止
  - 複数エポックがサンプル効率を向上
  - GAEがアドバンテージ推定のバイアス-分散をバランス
  - シンプルな実装でありながら非常に効果的

4.6.6 なぜPPOがこれほど人気なのか

側面 PPOの利点
安定性 クリッピングが壊滅的な更新を防止
シンプルさ 一次最適化のみ(Fisher行列不要)
サンプル効率 複数エポックが収集データを再利用
汎用性 チューニングなしで多様なタスクに対応
スケーラビリティ 環境間で容易に並列化

注目すべきPPO応用例:


4.7 連続行動空間

4.7.1 ガウス方策

連続行動(例: ロボットの関節トルク)には、ガウス分布を使用します:

$$ \pi_\theta(a|s) = \mathcal{N}(\mu_\theta(s), \sigma_\theta(s)^2) $$

ネットワークは以下を出力します:

4.7.2 ガウス方策の実装

# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gymnasium as gym

print("=== ガウス方策による連続行動空間 ===\n")

class ContinuousPolicyNetwork(nn.Module):
    """
    連続行動用の方策ネットワーク

    ガウス分布の平均とlog_stdを出力します。
    """

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(ContinuousPolicyNetwork, self).__init__()

        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )

        # 平均ヘッド
        self.mu = nn.Linear(hidden_dim, action_dim)

        # Log std(学習パラメータまたは状態依存)
        self.log_std = nn.Linear(hidden_dim, action_dim)

        # 価値ヘッド
        self.value = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)

        mu = self.mu(features)
        log_std = self.log_std(features)
        log_std = torch.clamp(log_std, -20, 2)  # 数値安定性
        std = torch.exp(log_std)

        value = self.value(features)

        return mu, std, value


class ContinuousPPO:
    """連続行動空間用のPPO"""

    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99):
        self.gamma = gamma
        self.action_dim = action_dim

        self.network = ContinuousPolicyNetwork(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=lr)

    def select_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)

        with torch.no_grad():
            mu, std, value = self.network(state_tensor)

        # ガウス分布からサンプリング
        dist = torch.distributions.Normal(mu, std)
        action = dist.sample()
        log_prob = dist.log_prob(action).sum(dim=-1)  # 行動次元で合計

        return action.squeeze().numpy(), log_prob.item(), value.item()

    def evaluate_actions(self, states, actions):
        mu, std, values = self.network(states)

        dist = torch.distributions.Normal(mu, std)
        log_probs = dist.log_prob(actions).sum(dim=-1)
        entropy = dist.entropy().sum(dim=-1)

        return log_probs, entropy, values


# Pendulum環境でのデモンストレーション
print("Pendulum-v1(連続制御)でテスト\n")

env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]

print(f"環境: Pendulum-v1")
print(f"  状態空間: {env.observation_space}")
print(f"  行動空間: {env.action_space}")

agent = ContinuousPPO(state_dim, action_dim)

# テストエピソード
state, _ = env.reset()
episode_reward = 0

print("\nガウス方策から行動をサンプリング:")
for step in range(5):
    action, log_prob, value = agent.select_action(state)

    # 行動を有効範囲にクリップ
    action_clipped = np.clip(action, -2.0, 2.0)

    print(f"  ステップ {step}: 行動={action_clipped[0]:.3f}, "
          f"log_prob={log_prob:.3f}, 価値={value:.3f}")

    next_state, reward, terminated, truncated, _ = env.step(action_clipped)
    episode_reward += reward
    state = next_state

env.close()
print(f"\nテストエピソード報酬: {episode_reward:.1f}")
print("\nガウス方策の特徴:")
print("  - 連続行動に自然(関節トルク、ステアリング)")
print("  - サンプリングによる探索(stdが探索を制御)")
print("  - 再パラメータ化トリックが勾配フローを可能に")

4.8 Stable-Baselines3の実践例

4.8.1 Stable-Baselines3の紹介

Stable-Baselines3(SB3)は、信頼性が高く、十分にテストされたRLアルゴリズムの実装です。ベストプラクティスに従った本番環境対応のコードを提供します。

4.8.2 Stable-Baselines3でのPPO

# 必要なライブラリ:
# - Python 3.9+
# - gymnasium>=0.29.0
# - stable-baselines3>=2.1.0
# - tensorboard>=2.14.0(オプション、ログ用)

import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback
import numpy as np

print("=== Stable-Baselines3 PPOの例 ===\n")

# 環境を作成
env_id = "LunarLander-v2"
print(f"環境: {env_id}")

# 単純な訓練用の単一環境
env = gym.make(env_id)
print(f"  観測空間: {env.observation_space}")
print(f"  行動空間: {env.action_space}")
env.close()

# より良いパフォーマンスのためのベクトル化環境を作成
n_envs = 4
vec_env = make_vec_env(env_id, n_envs=n_envs)
print(f"  並列環境数: {n_envs}")

# カスタムハイパーパラメータでPPOモデルを作成
model = PPO(
    "MlpPolicy",           # 方策ネットワークタイプ
    vec_env,
    learning_rate=3e-4,    # 学習率
    n_steps=2048,          # 更新あたりの環境ごとのステップ数
    batch_size=64,         # ミニバッチサイズ
    n_epochs=10,           # 更新あたりのエポック数
    gamma=0.99,            # 割引率
    gae_lambda=0.95,       # GAE lambda
    clip_range=0.2,        # PPOクリップ範囲
    ent_coef=0.01,         # エントロピー係数
    vf_coef=0.5,           # 価値関数係数
    verbose=1,
    tensorboard_log="./ppo_lunarlander_tensorboard/"
)

print("\nPPOモデル設定:")
print(f"  方策: {model.policy.__class__.__name__}")
print(f"  学習率: {model.learning_rate}")
print(f"  クリップ範囲: {model.clip_range}")
print(f"  GAE lambda: {model.gae_lambda}")

# 訓練
print("\nPPOを訓練中...")
total_timesteps = 100000

model.learn(
    total_timesteps=total_timesteps,
    progress_bar=True  # tqdmが必要
)

print("\n訓練完了!")

# 評価
print("\n訓練済みモデルを評価中...")
eval_env = gym.make(env_id)
mean_reward, std_reward = evaluate_policy(
    model,
    eval_env,
    n_eval_episodes=10,
    deterministic=True
)
print(f"平均報酬: {mean_reward:.2f} +/- {std_reward:.2f}")
print(f"(LunarLanderの解決閾値: 200)")

# モデルを保存
model.save("ppo_lunarlander")
print("\nモデルを 'ppo_lunarlander.zip' に保存しました")

# 読み込みとテスト
print("\n保存したモデルを読み込んでテスト中...")
loaded_model = PPO.load("ppo_lunarlander")

# レンダリング情報付きのテストエピソード
state, _ = eval_env.reset()
episode_reward = 0
done = False

while not done:
    action, _ = loaded_model.predict(state, deterministic=True)
    state, reward, terminated, truncated, _ = eval_env.step(action)
    done = terminated or truncated
    episode_reward += reward

print(f"テストエピソード報酬: {episode_reward:.2f}")

vec_env.close()
eval_env.close()

print("\nSB3 PPOの特徴:")
print("  - 本番環境対応の実装")
print("  - 並列化のためのベクトル化環境")
print("  - 組み込みのTensorBoardログ")
print("  - 簡単な保存/読み込み機能")
print("  - 充実したドキュメントとコミュニティ")

期待される出力:

=== Stable-Baselines3 PPOの例 ===

環境: LunarLander-v2
  観測空間: Box([-inf, ...], [inf, ...], (8,), float32)
  行動空間: Discrete(4)
  並列環境数: 4

PPOモデル設定:
  方策: ActorCriticPolicy
  学習率: 0.0003
  クリップ範囲: 0.2
  GAE lambda: 0.95

PPOを訓練中...
| rollout/           |          |
|    ep_len_mean     | 89.3     |
|    ep_rew_mean     | -156     |
| time/              |          |
|    fps             | 1245     |
|    iterations      | 1        |
...
| rollout/           |          |
|    ep_len_mean     | 287      |
|    ep_rew_mean     | 234      |

訓練完了!

訓練済みモデルを評価中...
平均報酬: 256.34 +/- 23.12
(LunarLanderの解決閾値: 200)

モデルを 'ppo_lunarlander.zip' に保存しました

保存したモデルを読み込んでテスト中...
テストエピソード報酬: 267.45

4.8.3 TensorBoardモニタリング

# TensorBoardログを表示するには、ターミナルで実行:
# tensorboard --logdir ./ppo_lunarlander_tensorboard/

# 監視すべき主要メトリクス:
# - rollout/ep_rew_mean: 平均エピソード報酬
# - rollout/ep_len_mean: 平均エピソード長
# - train/policy_loss: Actor損失
# - train/value_loss: Critic損失
# - train/entropy_loss: 探索のためのエントロピー
# - train/approx_kl: KLダイバージェンス(小さいままであるべき)
# - train/clip_fraction: クリッピングが発動する頻度

print("TensorBoardメトリクスガイド:")
print("  ep_rew_mean: 時間とともに増加すべき")
print("  approx_kl: 高すぎる場合(>0.02)、学習率を下げる")
print("  clip_fraction: 約10-20%が典型的")
print("  entropy: 方策が決定論的になるにつれて減少すべき")

まとめ

章のまとめ

トピック 重要ポイント
方策勾配 $\pi_\theta$の直接最適化; 連続行動を扱える
REINFORCE モンテカルロPG; 高分散; 安定性のためベースラインを使用
Actor-Critic 方策学習と価値学習を統合; TDベースの更新
A2C Nステップ収益; エントロピー正則化; 同期訓練
PPO クリップ目的関数; GAE; 複数エポック; 業界標準
連続制御 学習された平均とstdを持つガウス方策

アルゴリズム比較

アルゴリズム 更新タイプ 分散 サンプル効率 複雑さ
REINFORCE エピソード終了 シンプル
Actor-Critic 各ステップ
A2C Nステップ
PPO バッチ(複数エポック)

演習問題

演習4.1: ベースライン比較

課題: CartPoleでベースラインあり/なしのREINFORCEを比較する。

ステップ:

演習4.2: PPOハイパーパラメータ研究

課題: LunarLanderでPPOのハイパーパラメータ感度を調査する。

変動させるパラメータ:

演習4.3: 連続制御

課題: ガウス方策を用いてPendulum-v1でPPOを訓練する。

要件:

演習4.4: エントロピースケジューリング

課題: エントロピー係数のアニーリングを実装する。

アプローチ:

演習4.5: SB3カスタム環境

課題: カスタムGymnasium環境でPPOを訓練する。

ステップ:

演習4.6: マルチ環境訓練

課題: 単一環境と並列環境の訓練を比較する。

メトリクス:


次章プレビュー

第5章では、RLの高度な応用と最前線を探求します:

予定トピック:
- モデルベースRLとワールドモデル
- マルチエージェント強化学習
- オフラインRLと模倣学習
- 人間のフィードバックからの強化学習(RLHF)
- 実世界展開の課題
- 現在の研究動向

免責事項