JP | EN | 最終同期: 2026-01-10

第5章: 最新RLと応用

SACとRLHFからモデルベースRL、実世界展開まで

読了時間: 35-40分 難易度: 上級 コード例: 7 演習問題: 5

この章では、2025年時点で最も先進的かつ影響力のある強化学習技術を解説します。連続制御のためのSAC、ChatGPTやClaudeなど最新の言語モデルを支えるRLHF、DreamerV3やMuZeroなどのモデルベースアプローチ、Decision Transformerを用いたオフラインRL、そして実践的な展開戦略について学びます。

学習目標

この章を読むことで、以下の内容を習得できます:


5.1 SAC(Soft Actor-Critic)

5.1.1 最大エントロピー強化学習

Soft Actor-Critic(SAC)は、最大エントロピー強化学習フレームワークに基づく最先端のオフポリシーアルゴリズムです。期待リターンと方策のエントロピーの両方を最大化するように方策を最適化し、より頑健で探索的な行動を実現します。

最大エントロピー目的関数

従来のRLは期待累積報酬を最大化します:

$$ J(\pi) = \mathbb{E}_{\tau \sim \pi} \left[ \sum_{t=0}^{\infty} \gamma^t r(s_t, a_t) \right] $$

SACは探索を促進するためにエントロピーボーナスを追加します:

$$ J(\pi) = \mathbb{E}_{\tau \sim \pi} \left[ \sum_{t=0}^{\infty} \gamma^t \left( r(s_t, a_t) + \alpha \mathcal{H}(\pi(\cdot | s_t)) \right) \right] $$

ここで $\mathcal{H}(\pi) = -\sum_a \pi(a|s) \log \pi(a|s)$ は方策のエントロピー、$\alpha$ は温度パラメータです。

最大エントロピーが機能する理由

メリット 説明
探索 高いエントロピーは多様な行動を試すことを促し、早期収束を防ぐ
頑健性 確率的方策は摂動やモデル誤差に対してより頑健
マルチモーダル解 1つに収束するのではなく、複数の良い戦略を捉えることができる
転移学習 エントロピー正則化された方策は新しいタスクへの転移性が高い

5.1.2 SACアーキテクチャ

graph TB subgraph "SAC アーキテクチャ" S[状態 s] --> Actor[Actor ネットワーク
ガウス方策] S --> Q1[Q-Network 1] S --> Q2[Q-Network 2] Actor --> |行動サンプリング| A[行動 a] Actor --> |対数確率| LP[log pi] Q1 --> MIN[min Q1, Q2] Q2 --> MIN MIN --> |Soft Q値| SQ[Q - alpha * log pi] LP --> SQ SQ --> |方策更新| Actor TQ1[Target Q1] --> |ソフト更新| Q1 TQ2[Target Q2] --> |ソフト更新| Q2 end style Actor fill:#27ae60,color:#fff style Q1 fill:#e74c3c,color:#fff style Q2 fill:#e74c3c,color:#fff style MIN fill:#f39c12,color:#fff

主要コンポーネント

5.1.3 SACの目的関数

Soft Q関数の更新(ベルマンバックアップ):

$$ Q(s_t, a_t) \leftarrow r_t + \gamma \mathbb{E}_{s_{t+1}} \left[ V(s_{t+1}) \right] $$

ソフト価値関数は次のように定義されます:

$$ V(s) = \mathbb{E}_{a \sim \pi} \left[ Q(s, a) - \alpha \log \pi(a|s) \right] $$

方策の更新(エントロピー付きQを最大化):

$$ \mathcal{L}_\pi(\theta) = \mathbb{E}_{s \sim \mathcal{D}} \left[ \mathbb{E}_{a \sim \pi_\theta} \left[ \alpha \log \pi_\theta(a|s) - Q(s, a) \right] \right] $$

自動温度調整

$$ \mathcal{L}_\alpha = \mathbb{E}_{a \sim \pi} \left[ -\alpha \left( \log \pi(a|s) + \bar{\mathcal{H}} \right) \right] $$

ここで $\bar{\mathcal{H}}$ はターゲットエントロピー(連続行動では通常 $-\dim(\mathcal{A})$)です。

5.1.4 SAC実装

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0
# - torch>=2.0.0
# - gymnasium>=0.29.0

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np
from collections import deque
import random

class GaussianPolicy(nn.Module):
    """
    SAC用ガウス方策(再パラメータ化トリック使用)
    平均とlog_stdを出力し、tanhスケーリングでサンプリング
    """

    def __init__(self, state_dim, action_dim, hidden_dim=256,
                 log_std_min=-20, log_std_max=2):
        super().__init__()
        self.log_std_min = log_std_min
        self.log_std_max = log_std_max

        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.mean = nn.Linear(hidden_dim, action_dim)
        self.log_std = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        mean = self.mean(x)
        log_std = torch.clamp(self.log_std(x), self.log_std_min, self.log_std_max)
        return mean, log_std

    def sample(self, state):
        """再パラメータ化トリック + tanh圧縮で行動をサンプリング"""
        mean, log_std = self.forward(state)
        std = log_std.exp()

        # 再パラメータ化: a = mu + sigma * epsilon
        normal = Normal(mean, std)
        x_t = normal.rsample()  # 微分可能なサンプリング

        # tanhで[-1, 1]に圧縮
        action = torch.tanh(x_t)

        # tanh補正付き対数確率
        log_prob = normal.log_prob(x_t)
        log_prob -= torch.log(1 - action.pow(2) + 1e-6)
        log_prob = log_prob.sum(dim=-1, keepdim=True)

        return action, log_prob


class SoftQNetwork(nn.Module):
    """SAC用Q-Network: Q(s, a) -> スカラー値"""

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super().__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        x = torch.cat([state, action], dim=-1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)


class SAC:
    """
    自動温度調整付きSoft Actor-Critic

    主な特徴:
    - 頑健な探索のための最大エントロピーRL
    - 過大評価を防ぐダブルQ学習
    - 経験再生を使ったオフポリシー学習
    - 自動alpha(温度)調整
    """

    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
                 tau=0.005, alpha=0.2, auto_alpha=True):
        self.gamma = gamma
        self.tau = tau
        self.auto_alpha = auto_alpha

        # ネットワーク
        self.actor = GaussianPolicy(state_dim, action_dim)
        self.q1 = SoftQNetwork(state_dim, action_dim)
        self.q2 = SoftQNetwork(state_dim, action_dim)
        self.q1_target = SoftQNetwork(state_dim, action_dim)
        self.q2_target = SoftQNetwork(state_dim, action_dim)

        # ターゲットに重みをコピー
        self.q1_target.load_state_dict(self.q1.state_dict())
        self.q2_target.load_state_dict(self.q2.state_dict())

        # オプティマイザ
        self.actor_optim = torch.optim.Adam(self.actor.parameters(), lr=lr)
        self.q1_optim = torch.optim.Adam(self.q1.parameters(), lr=lr)
        self.q2_optim = torch.optim.Adam(self.q2.parameters(), lr=lr)

        # 自動温度調整
        if auto_alpha:
            self.target_entropy = -action_dim  # ヒューリスティック: -dim(A)
            self.log_alpha = torch.zeros(1, requires_grad=True)
            self.alpha_optim = torch.optim.Adam([self.log_alpha], lr=lr)
            self.alpha = self.log_alpha.exp().item()
        else:
            self.alpha = alpha

        # リプレイバッファ
        self.buffer = deque(maxlen=100000)

    def select_action(self, state, evaluate=False):
        """行動選択: 訓練時は確率的、評価時は決定的"""
        state = torch.FloatTensor(state).unsqueeze(0)

        if evaluate:
            with torch.no_grad():
                mean, _ = self.actor(state)
                return torch.tanh(mean).cpu().numpy()[0]
        else:
            with torch.no_grad():
                action, _ = self.actor.sample(state)
                return action.cpu().numpy()[0]

    def update(self, batch_size=256):
        """SAC更新ステップを1回実行"""
        if len(self.buffer) < batch_size:
            return

        # バッチをサンプリング
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*batch)

        state = torch.FloatTensor(np.array(state))
        action = torch.FloatTensor(np.array(action))
        reward = torch.FloatTensor(reward).unsqueeze(1)
        next_state = torch.FloatTensor(np.array(next_state))
        done = torch.FloatTensor(done).unsqueeze(1)

        # --- Q関数の更新 ---
        with torch.no_grad():
            next_action, next_log_prob = self.actor.sample(next_state)
            target_q1 = self.q1_target(next_state, next_action)
            target_q2 = self.q2_target(next_state, next_action)
            target_q = torch.min(target_q1, target_q2) - self.alpha * next_log_prob
            target_value = reward + (1 - done) * self.gamma * target_q

        q1_loss = F.mse_loss(self.q1(state, action), target_value)
        q2_loss = F.mse_loss(self.q2(state, action), target_value)

        self.q1_optim.zero_grad()
        q1_loss.backward()
        self.q1_optim.step()

        self.q2_optim.zero_grad()
        q2_loss.backward()
        self.q2_optim.step()

        # --- 方策の更新 ---
        new_action, log_prob = self.actor.sample(state)
        q1_new = self.q1(state, new_action)
        q2_new = self.q2(state, new_action)
        q_new = torch.min(q1_new, q2_new)

        actor_loss = (self.alpha * log_prob - q_new).mean()

        self.actor_optim.zero_grad()
        actor_loss.backward()
        self.actor_optim.step()

        # --- Alpha(温度)の更新 ---
        if self.auto_alpha:
            alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()

            self.alpha_optim.zero_grad()
            alpha_loss.backward()
            self.alpha_optim.step()

            self.alpha = self.log_alpha.exp().item()

        # --- ターゲットネットワークのソフト更新 ---
        for param, target_param in zip(self.q1.parameters(), self.q1_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        for param, target_param in zip(self.q2.parameters(), self.q2_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)


# --- 訓練例 ---
if __name__ == "__main__":
    import gymnasium as gym

    print("SAC訓練: Pendulum-v1")
    print("=" * 50)

    env = gym.make('Pendulum-v1')
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]

    agent = SAC(state_dim, action_dim, auto_alpha=True)

    for episode in range(100):
        state, _ = env.reset()
        episode_reward = 0

        for step in range(200):
            action = agent.select_action(state)
            action_scaled = action * 2.0  # Pendulumの行動範囲 [-2, 2]

            next_state, reward, terminated, truncated, _ = env.step(action_scaled)
            done = terminated or truncated

            agent.buffer.append((state, action, reward, next_state, float(done)))
            agent.update()

            episode_reward += reward
            state = next_state

            if done:
                break

        if (episode + 1) % 10 == 0:
            print(f"エピソード {episode+1}, 報酬: {episode_reward:.1f}, Alpha: {agent.alpha:.3f}")

    print("\n訓練完了!")

SACのポイント: 再パラメータ化トリックにより、確率的サンプリングを通じてエンドツーエンドの勾配フローが可能になります。自動温度調整は探索を動的に調整します - 序盤は高いalphaで探索、後半は低いalphaで活用。SACは連続制御において最もサンプル効率の良いモデルフリーアルゴリズムの1つと考えられています。


5.2 RLHF: 人間のフィードバックからの強化学習

RLHFは、言語モデルをChatGPTやClaudeのような有用なAIアシスタントに変換した重要な技術です。このセクションでは、RLHFの仕組みとAIアラインメントにおける重要性を解説します。

5.2.1 RLHFが重要な理由

事前学習された言語モデル(GPTなど)は、次のトークンを予測するように訓練されています。しかし、この目的関数は以下を直接最適化しません:

RLHFはこのギャップを埋めます。人間の嗜好を使って「良い」出力とは何かを定義し、RLを使ってモデルをその嗜好に向けて最適化します。

5.2.2 RLHFパイプライン

flowchart TB subgraph "ステップ1: 教師ありファインチューニング(SFT)" PT[事前学習済みLLM] --> |デモンストレーションでファインチューニング| SFT[SFTモデル] HD[人間のデモンストレーション
高品質な応答] --> SFT end subgraph "ステップ2: 報酬モデル訓練" SFT --> |応答を生成| RESP[応答ペア
プロンプトxに対するy1, y2] RESP --> |人間がランク付け| PREF[嗜好データ
y1 > y2] PREF --> |訓練| RM[報酬モデル
r = RM x,y] end subgraph "ステップ3: PPOによるRLファインチューニング" SFT --> |初期化| POLICY[方策モデル] RM --> |報酬を提供| PPO[PPO訓練] POLICY --> |生成| GEN[生成された応答] GEN --> RM PPO --> |更新| POLICY KL[KLペナルティ
SFTに近づく] --> PPO end POLICY --> FINAL[RLHF調整済みモデル
ChatGPT, Claude] style PT fill:#e3f2fd style SFT fill:#fff3e0 style RM fill:#f3e5f5 style POLICY fill:#e8f5e9 style FINAL fill:#ffcdd2

5.2.3 ステップ1: 教師ありファインチューニング(SFT)

事前学習済みLLMを高品質なデモンストレーションでファインチューニングします:

$$ \mathcal{L}_{\text{SFT}} = -\mathbb{E}_{(x, y) \sim \mathcal{D}_{\text{demo}}} \left[ \log \pi_\theta(y | x) \right] $$

ここで $\mathcal{D}_{\text{demo}}$ はプロンプトに対する人間が書いた応答を含みます。

なぜ最初にSFTを行うのか

5.2.4 ステップ2: 報酬モデル訓練

比較データから人間の嗜好を予測する報酬モデルを訓練します。

データ収集

  1. 多様なデータセットからプロンプトをサンプリング
  2. SFTモデルを使って複数の応答を生成
  3. 人間のアノテーターが応答をランク付け(例:A > B)

Bradley-Terryモデル

応答 $y_1$ が $y_2$ より好まれる確率をモデル化:

$$ P(y_1 \succ y_2 | x) = \sigma(r(x, y_1) - r(x, y_2)) $$

ここで $\sigma$ はシグモイド関数、$r(x, y)$ は報酬モデルです。

報酬モデルの損失関数

$$ \mathcal{L}_{\text{RM}} = -\mathbb{E}_{(x, y_w, y_l) \sim \mathcal{D}} \left[ \log \sigma(r(x, y_w) - r(x, y_l)) \right] $$

ここで $y_w$ は好まれる(勝ち)応答、$y_l$ は拒否された(負け)応答です。

# 報酬モデル訓練(概念的なコード)
import torch
import torch.nn as nn

class RewardModel(nn.Module):
    """
    RLHF用報酬モデル
    (プロンプト, 応答)を受け取り、スカラー報酬を出力
    """

    def __init__(self, base_model):
        super().__init__()
        self.backbone = base_model  # 事前学習済みLLM
        self.reward_head = nn.Linear(base_model.hidden_size, 1)

    def forward(self, input_ids, attention_mask):
        # 最終隠れ状態を取得
        outputs = self.backbone(input_ids, attention_mask=attention_mask)
        last_hidden = outputs.last_hidden_state[:, -1, :]  # [CLS]または最終トークン
        reward = self.reward_head(last_hidden)
        return reward.squeeze(-1)


def reward_model_loss(reward_model, chosen_ids, rejected_ids, attention_mask_c, attention_mask_r):
    """
    嗜好学習のためのBradley-Terry損失
    """
    r_chosen = reward_model(chosen_ids, attention_mask_c)
    r_rejected = reward_model(rejected_ids, attention_mask_r)

    # 損失: -log(sigmoid(r_chosen - r_rejected))
    loss = -torch.log(torch.sigmoid(r_chosen - r_rejected)).mean()
    return loss

5.2.5 ステップ3: PPOファインチューニング

PPOを使って方策(LLM)を最適化し、SFTモデルに近い状態を保ちながら報酬モデルのスコアを最大化します。

RLHF目的関数

$$ \mathcal{L}_{\text{RLHF}} = \mathbb{E}_{x \sim \mathcal{D}, y \sim \pi_\theta} \left[ r(x, y) - \beta \cdot \text{KL}(\pi_\theta(y|x) \| \pi_{\text{SFT}}(y|x)) \right] $$

KLペナルティは方策が以下を防ぎます:

言語モデルのためのPPO

第4章のPPOクリップ目的関数をトークンレベルの行動に適用します:

$$ \mathcal{L}^{\text{CLIP}}(\theta) = \mathbb{E}_t \left[ \min(r_t(\theta) \hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) \hat{A}_t) \right] $$

ここで $r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\text{old}}(a_t|s_t)}$ は各トークンの確率比です。

# RLHF PPO訓練(簡略化した概念コード)
def compute_rlhf_reward(response_ids, prompt_ids, reward_model, ref_model, policy_model, beta=0.1):
    """
    KLペナルティ付きRLHF報酬を計算

    reward = r(x, y) - beta * KL(policy || reference)
    """
    # 報酬モデルから報酬を取得
    reward_score = reward_model(prompt_ids, response_ids)

    # トークンごとのKLダイバージェンスを計算
    with torch.no_grad():
        ref_logprobs = ref_model.get_log_probs(prompt_ids, response_ids)
    policy_logprobs = policy_model.get_log_probs(prompt_ids, response_ids)

    kl_div = (policy_logprobs - ref_logprobs).sum(dim=-1)

    # 合計報酬
    total_reward = reward_score - beta * kl_div
    return total_reward


def ppo_step(policy_model, old_logprobs, states, actions, advantages, epsilon=0.2):
    """
    言語モデル方策のためのPPO更新
    """
    new_logprobs = policy_model.get_log_probs(states, actions)

    # 確率比
    ratio = torch.exp(new_logprobs - old_logprobs)

    # クリップ目的関数
    surr1 = ratio * advantages
    surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * advantages

    loss = -torch.min(surr1, surr2).mean()
    return loss

5.2.6 DPO: Direct Preference Optimization

DPO(Direct Preference Optimization)は、報酬モデルをスキップして嗜好を直接最適化する、RLHFのよりシンプルな代替手法です。

重要な洞察

RLHF目的関数の下での最適方策は閉形式で表現できます:

$$ \pi^*(y|x) = \frac{1}{Z(x)} \pi_{\text{ref}}(y|x) \exp\left(\frac{1}{\beta} r(x, y)\right) $$

これは報酬を方策の観点から表現できることを意味します:

$$ r(x, y) = \beta \log \frac{\pi^*(y|x)}{\pi_{\text{ref}}(y|x)} + \beta \log Z(x) $$

DPO損失関数

Bradley-Terry嗜好モデルに代入すると、DPO目的関数が得られます:

$$ \mathcal{L}_{\text{DPO}}(\theta) = -\mathbb{E}_{(x, y_w, y_l)} \left[ \log \sigma \left( \beta \log \frac{\pi_\theta(y_w|x)}{\pi_{\text{ref}}(y_w|x)} - \beta \log \frac{\pi_\theta(y_l|x)}{\pi_{\text{ref}}(y_l|x)} \right) \right] $$

DPO vs RLHF

観点 RLHF DPO
報酬モデル 必要(別途訓練) 不要(暗黙的)
RL訓練 オンラインサンプリングでPPO 教師あり学習のみ
複雑さ 高い(3段階) 低い(SFT後1段階)
安定性 不安定になりやすい(RLの問題) より安定(教師あり学習)
柔軟性 新しいプロンプトで反復可能 嗜好データセットに固定
# DPO実装(簡略化)
def dpo_loss(policy_model, ref_model, chosen_ids, rejected_ids, beta=0.1):
    """
    Direct Preference Optimization損失

    報酬モデル不要 - 嗜好を直接最適化
    """
    # 方策と参照モデルから対数確率を取得
    policy_chosen_logps = policy_model.get_log_probs(chosen_ids)
    policy_rejected_logps = policy_model.get_log_probs(rejected_ids)

    with torch.no_grad():
        ref_chosen_logps = ref_model.get_log_probs(chosen_ids)
        ref_rejected_logps = ref_model.get_log_probs(rejected_ids)

    # 対数比を計算
    chosen_ratio = beta * (policy_chosen_logps - ref_chosen_logps)
    rejected_ratio = beta * (policy_rejected_logps - ref_rejected_logps)

    # DPO損失: -log sigmoid(chosen_ratio - rejected_ratio)
    loss = -torch.log(torch.sigmoid(chosen_ratio - rejected_ratio)).mean()
    return loss

AIアラインメントにおけるRLHFの重要性: RLHFは単にモデルをより有用にするだけでなく、AI安全性にとって重要な技術です。人間のフィードバックから学習することで、純粋な次トークン予測から生じる有害な行動からモデルを遠ざけることができます。RLHFの「H」は、AIシステムに組み込みたい人間の価値観を表しています。


5.3 モデルベース強化学習

モデルベースRLは環境ダイナミクスのモデルを学習し、それを計画に使用することで、モデルフリー手法よりもはるかに高いサンプル効率を実現します。

5.3.1 ワールドモデルの概念

ワールドモデルは環境が行動にどう反応するかを予測します:

$$ \hat{s}_{t+1}, \hat{r}_t = f_\theta(s_t, a_t) $$

これにより、エージェントは実際に環境で行動を取らずに結果を「想像」できます。

graph LR subgraph "モデルフリーRL" E1[実環境] --> |経験| P1[方策] P1 --> |行動| E1 end subgraph "モデルベースRL" E2[実環境] --> |経験| WM[ワールドモデル] WM --> |想像された経験| P2[方策] P2 --> |行動| E2 WM --> |計画| PLAN[MCTS / MPC] PLAN --> P2 end style WM fill:#f39c12,color:#fff style PLAN fill:#9b59b6,color:#fff

モデルベースRLの利点

利点 説明
サンプル効率 環境との相互作用を10-100分の1に削減して学習
想像力 訓練用に無制限の合成経験を生成
計画 先読みして行動シーケンスを評価
転移 ワールドモデルは同じ環境の新しいタスクに転移可能

5.3.2 DreamerV3(2023-2025)

DreamerV3は、150以上の多様なタスクで優れた結果を達成した最先端のワールドモデルアルゴリズムで、Minecraftでダイヤモンドをゼロから収集することにも成功しています。

主要なイノベーション

DreamerV3アーキテクチャ

graph TB subgraph "DreamerV3 ワールドモデル" OBS[観測 x_t] --> ENC[エンコーダ CNN/MLP] ENC --> POST[事後分布 z_t
離散潜在] H[再帰状態 h_t] --> PRIOR[事前分布 z_t] A[行動 a_t-1] --> PRIOR POST --> DEC[デコーダ] H --> DEC DEC --> PRED[予測 x_t, r_t] POST --> |更新| H H --> |次ステップ| H end subgraph "想像による訓練" H --> |展開| IMAG[想像された軌道] IMAG --> ACTOR[Actor損失] IMAG --> CRITIC[Critic損失] end style POST fill:#27ae60,color:#fff style IMAG fill:#e74c3c,color:#fff

想像の中での学習

DreamerV3は完全に想像の中で方策を訓練します:

  1. 実経験を収集:環境と相互作用し、バッファに保存
  2. ワールドモデルを訓練:観測と報酬を予測することを学習
  3. 軌道を想像:現在の方策でワールドモデルを展開
  4. Actor-Criticを訓練:想像されたリターンを使って方策を更新

Minecraftダイヤモンド達成

DreamerV3は、人間のデモンストレーションやカリキュラム学習なしでMinecraftでダイヤモンドを収集した最初のアルゴリズムです。これは以下を必要とするタスクです:

5.3.3 MuZeroの原理

MuZero(DeepMind、2020)は、学習したダイナミクスモデルとモンテカルロ木探索(MCTS)を組み合わせ、ゲームのルールを知らずにAtari、チェス、将棋、囲碁で超人的なパフォーマンスを達成しました。

重要な洞察

MuZeroは3つのコンポーネントを学習します:

MuZeroアーキテクチャ

graph LR O[観測 o_t] --> |h| S0[隠れ状態 s_0] S0 --> |f| PV0[方策 p_0
価値 v_0] S0 --> |g with a_0| S1[状態 s_1
報酬 r_0] S1 --> |f| PV1[p_1, v_1] S1 --> |g with a_1| S2[状態 s_2
報酬 r_1] S2 --> |f| PV2[p_2, v_2] subgraph "MCTS計画" PV0 --> MCTS[木探索] PV1 --> MCTS PV2 --> MCTS MCTS --> ACT[行動選択] end style MCTS fill:#9b59b6,color:#fff

MuZeroがルールなしで機能する理由

モデルベース vs モデルフリーの比較

観点 モデルフリー(DQN, PPO) モデルベース(DreamerV3, MuZero)
サンプル効率 低い(数百万ステップ) 高い(10-100分の1)
計算量 ステップあたり低い 高い(計画のオーバーヘッド)
モデル誤差 誤りうるモデルがない 誤差が蓄積する可能性
汎化 タスク固有 モデルは転移可能

5.4 オフラインRLとDecision Transformer

オフラインRL(バッチRLとも呼ばれる)は、環境との相互作用なしに固定されたデータセットから方策を学習します。実世界での相互作用が高価または危険な場合に重要です。

5.4.1 オフラインRLの課題

標準的なRLアルゴリズムは分布シフトのためにオフラインデータでは失敗します:

オフラインRLを使う場面

5.4.2 Decision Transformer

Decision Transformer(2021)はRLをシーケンスモデリング問題として再定式化し、Transformerを使って望ましいリターンに条件付けて行動を生成します。

重要な洞察

価値を予測してリターンを最適化する代わりに、望ましいリターンに条件付けてそれを達成する行動を予測します。

シーケンス形式

Decision Transformerは軌道をシーケンスとしてモデル化:

$$ \tau = (\hat{R}_1, s_1, a_1, \hat{R}_2, s_2, a_2, \ldots, \hat{R}_T, s_T, a_T) $$

ここで $\hat{R}_t = \sum_{t'=t}^{T} r_{t'}$ はreturn-to-go(時刻 $t$ からの残りリターン)です。

graph LR subgraph "Decision Transformer入力" R1[Return-to-go 1] --> EMB[トークン埋め込み] S1[状態 1] --> EMB A1[行動 1] --> EMB R2[Return-to-go 2] --> EMB S2[状態 2] --> EMB end EMB --> TF[Transformer
因果的注意機構] TF --> PRED[予測行動 a_2] style TF fill:#9b59b6,color:#fff style PRED fill:#27ae60,color:#fff

訓練目的

オフライン軌道での単純な教師あり学習:

$$ \mathcal{L} = \mathbb{E}_{\tau \sim \mathcal{D}} \left[ \sum_t \| a_t - \hat{a}_t \|^2 \right] $$

ここで $\hat{a}_t$ はコンテキスト $(\hat{R}_1, s_1, a_1, \ldots, \hat{R}_t, s_t)$ が与えられたときの予測行動です。

推論:リターン条件付け

テスト時に望ましいリターンを指定すると、モデルはそれを達成する行動を生成します:

  1. $\hat{R}_1$ を望ましい総リターン(例:エキスパートレベルの性能)に設定
  2. $s_1$ を観測し、$a_1$ を予測
  3. $a_1$ を実行し、$r_1, s_2$ を観測
  4. $\hat{R}_2 = \hat{R}_1 - r_1$ に更新
  5. 繰り返し
# Decision Transformer(簡略化実装)
import torch
import torch.nn as nn

class DecisionTransformer(nn.Module):
    """
    オフラインRL用Decision Transformer
    return-to-goに条件付けて行動を生成
    """

    def __init__(self, state_dim, action_dim, hidden_dim=128,
                 n_layers=3, n_heads=1, max_length=20):
        super().__init__()

        self.hidden_dim = hidden_dim
        self.max_length = max_length

        # 各モダリティの埋め込み
        self.state_embed = nn.Linear(state_dim, hidden_dim)
        self.action_embed = nn.Linear(action_dim, hidden_dim)
        self.return_embed = nn.Linear(1, hidden_dim)

        # 位置埋め込み
        self.pos_embed = nn.Embedding(max_length * 3, hidden_dim)

        # Transformer
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, nhead=n_heads,
            dim_feedforward=hidden_dim * 4, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)

        # 行動予測ヘッド
        self.action_head = nn.Linear(hidden_dim, action_dim)

    def forward(self, returns_to_go, states, actions, timesteps):
        """
        順伝播: コンテキストから行動を予測

        Args:
            returns_to_go: (batch, seq_len, 1)
            states: (batch, seq_len, state_dim)
            actions: (batch, seq_len, action_dim)
            timesteps: (batch, seq_len)
        """
        batch_size, seq_len = states.shape[0], states.shape[1]

        # 各モダリティを埋め込み
        state_embeddings = self.state_embed(states)
        action_embeddings = self.action_embed(actions)
        return_embeddings = self.return_embed(returns_to_go)

        # インターリーブ: [R_1, s_1, a_1, R_2, s_2, a_2, ...]
        # 形状: (batch, seq_len * 3, hidden_dim)
        stacked = torch.stack([return_embeddings, state_embeddings, action_embeddings], dim=2)
        stacked = stacked.reshape(batch_size, seq_len * 3, self.hidden_dim)

        # 位置埋め込みを追加
        positions = torch.arange(seq_len * 3, device=states.device).unsqueeze(0)
        stacked = stacked + self.pos_embed(positions)

        # 因果マスク(過去のみ参照可能)
        mask = nn.Transformer.generate_square_subsequent_mask(seq_len * 3).to(states.device)

        # Transformer順伝播
        output = self.transformer(stacked, mask=mask)

        # 行動を予測(状態位置から: インデックス 1, 4, 7, ...)
        state_positions = torch.arange(1, seq_len * 3, 3)
        action_preds = self.action_head(output[:, state_positions, :])

        return action_preds

    def get_action(self, returns_to_go, states, actions, timesteps):
        """現在の状態に対する行動を取得(推論モード)"""
        action_preds = self.forward(returns_to_go, states, actions, timesteps)
        return action_preds[:, -1, :]  # 最後の予測行動を返す


# 使用例
if __name__ == "__main__":
    print("Decision Transformerの例")
    print("=" * 50)

    state_dim, action_dim = 4, 2
    model = DecisionTransformer(state_dim, action_dim)

    # オフラインデータをシミュレート
    batch_size, seq_len = 8, 10
    returns = torch.randn(batch_size, seq_len, 1)
    states = torch.randn(batch_size, seq_len, state_dim)
    actions = torch.randn(batch_size, seq_len, action_dim)
    timesteps = torch.arange(seq_len).unsqueeze(0).expand(batch_size, -1)

    # 順伝播
    pred_actions = model(returns, states, actions, timesteps)
    print(f"入力形状: states {states.shape}")
    print(f"出力形状: predicted actions {pred_actions.shape}")

    # 推論: 新しい状態に対する行動を取得
    action = model.get_action(returns, states, actions, timesteps)
    print(f"推論行動: {action[0].detach().numpy()}")

Decision Transformerの結果


5.5 マルチエージェント強化学習

マルチエージェントRL(MARL)は、複数のエージェントが共有環境で学習し相互作用することを扱います。

5.5.1 CTDE: 集中訓練分散実行

MARLの主要なパラダイム:

graph TB subgraph "集中訓練" G[グローバル状態] --> CRITIC[集中Critic] O1[エージェント1の観測] --> CRITIC O2[エージェント2の観測] --> CRITIC CRITIC --> |価値推定| TRAIN[訓練信号] end subgraph "分散実行" O1_E[ローカル観測1] --> A1[エージェント1方策] O2_E[ローカル観測2] --> A2[エージェント2方策] A1 --> ACT1[行動1] A2 --> ACT2[行動2] end style CRITIC fill:#e74c3c,color:#fff style A1 fill:#27ae60,color:#fff style A2 fill:#27ae60,color:#fff

5.5.2 協調的 vs 競争的設定

設定 説明
協調的 共有報酬、チーム目標 ロボット群、StarCraftマイクロマネジメント
競争的 ゼロサム、敵対的 チェス、囲碁、ポーカー
混合 協調と競争の両方 交通、交渉、社会的ジレンマ

5.5.3 主要なアルゴリズム


5.6 安全な強化学習

実世界のRLアプリケーションでは、訓練と展開の両方で安全制約が必要です。

5.6.1 制約付きMDP

標準MDPをコスト制約で拡張:

$$ \max_\pi J(\pi) = \mathbb{E}\left[\sum_t \gamma^t r_t\right] \quad \text{s.t.} \quad C_i(\pi) = \mathbb{E}\left[\sum_t \gamma^t c_i(s_t, a_t)\right] \leq d_i $$

ここで $c_i$ はコスト関数(例:危険な行動)、$d_i$ は閾値です。

5.6.2 解法

5.6.3 実世界の安全要件

ドメイン 安全制約
ロボティクス 関節制限、衝突回避、力制限
自動運転 車線境界、最小距離、速度制限
医療 薬剤投与量制限、治療安全閾値
金融 ポジション制限、ドローダウン制約、リスク予算

5.7 実世界への応用

5.7.1 ロボティクス:Sim-to-Real転移

シミュレーションでロボットを訓練し、実世界に転移します。

主要な技術

SLIMシステム(2024)

スタンフォードのSim-to-real Learning for Manipulation:

5.7.2 自動運転

RLは以下に使用されます:

5.7.3 ゲームAI

システム ゲーム 達成
AlphaGo 囲碁 世界チャンピオン李世ドルに勝利
AlphaStar StarCraft II グランドマスターレベル、トッププロに勝利
OpenAI Five Dota 2 世界チャンピオンOGに勝利
MuZero Atari、チェス、将棋 ルールを知らずに超人的性能

5.7.4 リソース最適化


5.8 Stable-Baselines3の実践例

5.8.1 連続制御でのSAC

# Requirements:
# - Python 3.9+
# - stable-baselines3>=2.1.0
# - gymnasium>=0.29.0

"""
Stable-Baselines3 SAC例
最小限のコードでプロフェッショナルグレードのRL訓練
"""

import gymnasium as gym
from stable_baselines3 import SAC
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import numpy as np

def train_sac_pendulum():
    """モニタリングと評価付きでPendulum-v1でSACを訓練"""

    print("Stable-Baselines3でSAC訓練")
    print("=" * 50)

    # モニタリング付き環境を作成
    env = gym.make("Pendulum-v1")
    env = Monitor(env)

    # 評価用環境
    eval_env = gym.make("Pendulum-v1")
    eval_env = Monitor(eval_env)

    # 最適化されたハイパーパラメータでSAC
    model = SAC(
        "MlpPolicy",
        env,
        learning_rate=3e-4,
        buffer_size=100000,
        learning_starts=1000,
        batch_size=256,
        tau=0.005,              # ソフト更新係数
        gamma=0.99,
        train_freq=1,
        gradient_steps=1,
        ent_coef="auto",        # 自動エントロピー調整
        verbose=1,
        tensorboard_log="./sac_pendulum_tensorboard/"
    )

    # モニタリング用コールバック
    eval_callback = EvalCallback(
        eval_env,
        best_model_save_path="./logs/best_model/",
        log_path="./logs/",
        eval_freq=5000,
        deterministic=True,
        render=False
    )

    checkpoint_callback = CheckpointCallback(
        save_freq=10000,
        save_path="./logs/checkpoints/",
        name_prefix="sac_pendulum"
    )

    # 訓練
    print("\n訓練を開始...")
    model.learn(
        total_timesteps=50000,
        callback=[eval_callback, checkpoint_callback],
        progress_bar=True
    )

    # 最終性能を評価
    mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10)
    print(f"\n最終評価: {mean_reward:.2f} +/- {std_reward:.2f}")

    # 最終モデルを保存
    model.save("sac_pendulum_final")
    print("モデルを sac_pendulum_final.zip に保存しました")

    return model


def train_ppo_lunarlander():
    """ベクトル化環境でLunarLanderでPPOを訓練"""

    from stable_baselines3 import PPO
    from stable_baselines3.common.env_util import make_vec_env

    print("\nベクトル化環境でPPO訓練")
    print("=" * 50)

    # ベクトル化環境(4つの並列環境)
    env = make_vec_env("LunarLander-v2", n_envs=4)

    model = PPO(
        "MlpPolicy",
        env,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        verbose=1
    )

    print("100kタイムステップで訓練中...")
    model.learn(total_timesteps=100000, progress_bar=True)

    # 評価
    eval_env = gym.make("LunarLander-v2")
    mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10)
    print(f"評価: {mean_reward:.2f} +/- {std_reward:.2f}")

    return model


def hyperparameter_tuning_example():
    """Optunaによるハイパーパラメータチューニング例(オプション)"""

    try:
        import optuna
        from stable_baselines3 import PPO

        print("\nOptunaによるハイパーパラメータチューニング")
        print("=" * 50)

        def objective(trial):
            """Optunaの目的関数"""
            lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True)
            n_steps = trial.suggest_categorical("n_steps", [256, 512, 1024, 2048])
            gamma = trial.suggest_float("gamma", 0.9, 0.9999)

            env = gym.make("CartPole-v1")
            model = PPO("MlpPolicy", env, learning_rate=lr, n_steps=n_steps, gamma=gamma, verbose=0)
            model.learn(total_timesteps=10000)

            mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
            return mean_reward

        study = optuna.create_study(direction="maximize")
        study.optimize(objective, n_trials=10, show_progress_bar=True)

        print(f"\n最適ハイパーパラメータ: {study.best_params}")
        print(f"最高報酬: {study.best_value:.2f}")

    except ImportError:
        print("Optunaがインストールされていません。実行: pip install optuna")


if __name__ == "__main__":
    # 例1: PendulumでSAC
    sac_model = train_sac_pendulum()

    # 例2: LunarLanderでPPO
    ppo_model = train_ppo_lunarlander()

    # 例3: ハイパーパラメータチューニング(オプション)
    # hyperparameter_tuning_example()

    print("\n" + "=" * 50)
    print("すべての訓練例が完了しました!")
    print("TensorBoardログを表示: tensorboard --logdir ./sac_pendulum_tensorboard/")

まとめと今後の展望

学んだこと

今後の方向性(2025年以降)

方向性 説明
RL基盤モデル タスクや環境を超えて転移する事前学習モデル
RLHFの改善 Constitutional AI、議論、スケーラブルな監視
大規模Sim-to-Real シミュレーションから多様な実ロボットへのより良い転移
階層的RL 再利用可能なスキルと時間的抽象化の学習
マルチモーダルRL ロボティクスのためのビジョン・言語・行動モデル

参考資料


演習問題

演習5.1: SAC温度パラメータの分析

問題:固定alpha値(0.05、0.1、0.2、0.5)と自動調整をPendulum-v1で比較してください。

タスク

期待される洞察:自動調整は手動チューニングなしで良好な性能を達成するはずです。

演習5.2: RLHF報酬モデル

問題:テキスト分類の嗜好に対する簡単な報酬モデルを実装してください。

タスク

ヒント:事前学習済みの文エンコーダ(例:sentence-transformers)をバックボーンとして使用してください。

演習5.3: Decision Transformer実装

問題:CartPoleオフラインデータでDecision Transformerを訓練してください。

タスク

ヒント:短いコンテキスト長(5-10ステップ)から始めると反復が速くなります。

演習5.4: マルチエージェント協調タスク

問題:エージェントが協力する必要がある簡単なマルチエージェント環境を実装してください。

タスク

ヒント:マルチエージェント環境にはPettingZooを使用してください。

演習5.5: 制約付き安全なRL

問題:標準RLタスクに安全制約を追加してください。

タスク

ヒント:完全なCPOの前に、シンプルな報酬ペナルティアプローチから始めてください。


免責事項