この章では、2025年時点で最も先進的かつ影響力のある強化学習技術を解説します。連続制御のためのSAC、ChatGPTやClaudeなど最新の言語モデルを支えるRLHF、DreamerV3やMuZeroなどのモデルベースアプローチ、Decision Transformerを用いたオフラインRL、そして実践的な展開戦略について学びます。
学習目標
この章を読むことで、以下の内容を習得できます:
- SAC(Soft Actor-Critic)と最大エントロピー強化学習を理解する
- RLHF(人間のフィードバックからの強化学習)がChatGPTやClaudeなどのLLMをどのように調整しているかを説明できる
- DPO(Direct Preference Optimization)をRLHFの代替手法として理解する
- モデルベースRLの原理:DreamerV3とMuZeroを理解する
- Decision Transformerを用いたオフラインRLを適用できる
- マルチエージェントRLと安全な強化学習を理解する
- Stable-Baselines3を使って実践的なRLプロジェクトを実装できる
5.1 SAC(Soft Actor-Critic)
5.1.1 最大エントロピー強化学習
Soft Actor-Critic(SAC)は、最大エントロピー強化学習フレームワークに基づく最先端のオフポリシーアルゴリズムです。期待リターンと方策のエントロピーの両方を最大化するように方策を最適化し、より頑健で探索的な行動を実現します。
最大エントロピー目的関数
従来のRLは期待累積報酬を最大化します:
$$ J(\pi) = \mathbb{E}_{\tau \sim \pi} \left[ \sum_{t=0}^{\infty} \gamma^t r(s_t, a_t) \right] $$SACは探索を促進するためにエントロピーボーナスを追加します:
$$ J(\pi) = \mathbb{E}_{\tau \sim \pi} \left[ \sum_{t=0}^{\infty} \gamma^t \left( r(s_t, a_t) + \alpha \mathcal{H}(\pi(\cdot | s_t)) \right) \right] $$ここで $\mathcal{H}(\pi) = -\sum_a \pi(a|s) \log \pi(a|s)$ は方策のエントロピー、$\alpha$ は温度パラメータです。
最大エントロピーが機能する理由
| メリット | 説明 |
|---|---|
| 探索 | 高いエントロピーは多様な行動を試すことを促し、早期収束を防ぐ |
| 頑健性 | 確率的方策は摂動やモデル誤差に対してより頑健 |
| マルチモーダル解 | 1つに収束するのではなく、複数の良い戦略を捉えることができる |
| 転移学習 | エントロピー正則化された方策は新しいタスクへの転移性が高い |
5.1.2 SACアーキテクチャ
ガウス方策] S --> Q1[Q-Network 1] S --> Q2[Q-Network 2] Actor --> |行動サンプリング| A[行動 a] Actor --> |対数確率| LP[log pi] Q1 --> MIN[min Q1, Q2] Q2 --> MIN MIN --> |Soft Q値| SQ[Q - alpha * log pi] LP --> SQ SQ --> |方策更新| Actor TQ1[Target Q1] --> |ソフト更新| Q1 TQ2[Target Q2] --> |ソフト更新| Q2 end style Actor fill:#27ae60,color:#fff style Q1 fill:#e74c3c,color:#fff style Q2 fill:#e74c3c,color:#fff style MIN fill:#f39c12,color:#fff
主要コンポーネント
- Actor(ガウス方策):連続行動の平均 $\mu$ と標準偏差 $\sigma$ を出力
- ダブルQ-Network:過大評価バイアスを軽減する2つのQ関数
- 自動温度調整:エントロピーと報酬のバランスを取る最適な $\alpha$ を学習
- ソフト更新:安定性のためのターゲットネットワーク
5.1.3 SACの目的関数
Soft Q関数の更新(ベルマンバックアップ):
$$ Q(s_t, a_t) \leftarrow r_t + \gamma \mathbb{E}_{s_{t+1}} \left[ V(s_{t+1}) \right] $$ソフト価値関数は次のように定義されます:
$$ V(s) = \mathbb{E}_{a \sim \pi} \left[ Q(s, a) - \alpha \log \pi(a|s) \right] $$方策の更新(エントロピー付きQを最大化):
$$ \mathcal{L}_\pi(\theta) = \mathbb{E}_{s \sim \mathcal{D}} \left[ \mathbb{E}_{a \sim \pi_\theta} \left[ \alpha \log \pi_\theta(a|s) - Q(s, a) \right] \right] $$自動温度調整:
$$ \mathcal{L}_\alpha = \mathbb{E}_{a \sim \pi} \left[ -\alpha \left( \log \pi(a|s) + \bar{\mathcal{H}} \right) \right] $$ここで $\bar{\mathcal{H}}$ はターゲットエントロピー(連続行動では通常 $-\dim(\mathcal{A})$)です。
5.1.4 SAC実装
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0
# - torch>=2.0.0
# - gymnasium>=0.29.0
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np
from collections import deque
import random
class GaussianPolicy(nn.Module):
"""
SAC用ガウス方策(再パラメータ化トリック使用)
平均とlog_stdを出力し、tanhスケーリングでサンプリング
"""
def __init__(self, state_dim, action_dim, hidden_dim=256,
log_std_min=-20, log_std_max=2):
super().__init__()
self.log_std_min = log_std_min
self.log_std_max = log_std_max
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.mean = nn.Linear(hidden_dim, action_dim)
self.log_std = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
mean = self.mean(x)
log_std = torch.clamp(self.log_std(x), self.log_std_min, self.log_std_max)
return mean, log_std
def sample(self, state):
"""再パラメータ化トリック + tanh圧縮で行動をサンプリング"""
mean, log_std = self.forward(state)
std = log_std.exp()
# 再パラメータ化: a = mu + sigma * epsilon
normal = Normal(mean, std)
x_t = normal.rsample() # 微分可能なサンプリング
# tanhで[-1, 1]に圧縮
action = torch.tanh(x_t)
# tanh補正付き対数確率
log_prob = normal.log_prob(x_t)
log_prob -= torch.log(1 - action.pow(2) + 1e-6)
log_prob = log_prob.sum(dim=-1, keepdim=True)
return action, log_prob
class SoftQNetwork(nn.Module):
"""SAC用Q-Network: Q(s, a) -> スカラー値"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super().__init__()
self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, 1)
def forward(self, state, action):
x = torch.cat([state, action], dim=-1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
class SAC:
"""
自動温度調整付きSoft Actor-Critic
主な特徴:
- 頑健な探索のための最大エントロピーRL
- 過大評価を防ぐダブルQ学習
- 経験再生を使ったオフポリシー学習
- 自動alpha(温度)調整
"""
def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
tau=0.005, alpha=0.2, auto_alpha=True):
self.gamma = gamma
self.tau = tau
self.auto_alpha = auto_alpha
# ネットワーク
self.actor = GaussianPolicy(state_dim, action_dim)
self.q1 = SoftQNetwork(state_dim, action_dim)
self.q2 = SoftQNetwork(state_dim, action_dim)
self.q1_target = SoftQNetwork(state_dim, action_dim)
self.q2_target = SoftQNetwork(state_dim, action_dim)
# ターゲットに重みをコピー
self.q1_target.load_state_dict(self.q1.state_dict())
self.q2_target.load_state_dict(self.q2.state_dict())
# オプティマイザ
self.actor_optim = torch.optim.Adam(self.actor.parameters(), lr=lr)
self.q1_optim = torch.optim.Adam(self.q1.parameters(), lr=lr)
self.q2_optim = torch.optim.Adam(self.q2.parameters(), lr=lr)
# 自動温度調整
if auto_alpha:
self.target_entropy = -action_dim # ヒューリスティック: -dim(A)
self.log_alpha = torch.zeros(1, requires_grad=True)
self.alpha_optim = torch.optim.Adam([self.log_alpha], lr=lr)
self.alpha = self.log_alpha.exp().item()
else:
self.alpha = alpha
# リプレイバッファ
self.buffer = deque(maxlen=100000)
def select_action(self, state, evaluate=False):
"""行動選択: 訓練時は確率的、評価時は決定的"""
state = torch.FloatTensor(state).unsqueeze(0)
if evaluate:
with torch.no_grad():
mean, _ = self.actor(state)
return torch.tanh(mean).cpu().numpy()[0]
else:
with torch.no_grad():
action, _ = self.actor.sample(state)
return action.cpu().numpy()[0]
def update(self, batch_size=256):
"""SAC更新ステップを1回実行"""
if len(self.buffer) < batch_size:
return
# バッチをサンプリング
batch = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*batch)
state = torch.FloatTensor(np.array(state))
action = torch.FloatTensor(np.array(action))
reward = torch.FloatTensor(reward).unsqueeze(1)
next_state = torch.FloatTensor(np.array(next_state))
done = torch.FloatTensor(done).unsqueeze(1)
# --- Q関数の更新 ---
with torch.no_grad():
next_action, next_log_prob = self.actor.sample(next_state)
target_q1 = self.q1_target(next_state, next_action)
target_q2 = self.q2_target(next_state, next_action)
target_q = torch.min(target_q1, target_q2) - self.alpha * next_log_prob
target_value = reward + (1 - done) * self.gamma * target_q
q1_loss = F.mse_loss(self.q1(state, action), target_value)
q2_loss = F.mse_loss(self.q2(state, action), target_value)
self.q1_optim.zero_grad()
q1_loss.backward()
self.q1_optim.step()
self.q2_optim.zero_grad()
q2_loss.backward()
self.q2_optim.step()
# --- 方策の更新 ---
new_action, log_prob = self.actor.sample(state)
q1_new = self.q1(state, new_action)
q2_new = self.q2(state, new_action)
q_new = torch.min(q1_new, q2_new)
actor_loss = (self.alpha * log_prob - q_new).mean()
self.actor_optim.zero_grad()
actor_loss.backward()
self.actor_optim.step()
# --- Alpha(温度)の更新 ---
if self.auto_alpha:
alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()
self.alpha_optim.zero_grad()
alpha_loss.backward()
self.alpha_optim.step()
self.alpha = self.log_alpha.exp().item()
# --- ターゲットネットワークのソフト更新 ---
for param, target_param in zip(self.q1.parameters(), self.q1_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for param, target_param in zip(self.q2.parameters(), self.q2_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
# --- 訓練例 ---
if __name__ == "__main__":
import gymnasium as gym
print("SAC訓練: Pendulum-v1")
print("=" * 50)
env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
agent = SAC(state_dim, action_dim, auto_alpha=True)
for episode in range(100):
state, _ = env.reset()
episode_reward = 0
for step in range(200):
action = agent.select_action(state)
action_scaled = action * 2.0 # Pendulumの行動範囲 [-2, 2]
next_state, reward, terminated, truncated, _ = env.step(action_scaled)
done = terminated or truncated
agent.buffer.append((state, action, reward, next_state, float(done)))
agent.update()
episode_reward += reward
state = next_state
if done:
break
if (episode + 1) % 10 == 0:
print(f"エピソード {episode+1}, 報酬: {episode_reward:.1f}, Alpha: {agent.alpha:.3f}")
print("\n訓練完了!")
SACのポイント: 再パラメータ化トリックにより、確率的サンプリングを通じてエンドツーエンドの勾配フローが可能になります。自動温度調整は探索を動的に調整します - 序盤は高いalphaで探索、後半は低いalphaで活用。SACは連続制御において最もサンプル効率の良いモデルフリーアルゴリズムの1つと考えられています。
5.2 RLHF: 人間のフィードバックからの強化学習
RLHFは、言語モデルをChatGPTやClaudeのような有用なAIアシスタントに変換した重要な技術です。このセクションでは、RLHFの仕組みとAIアラインメントにおける重要性を解説します。
5.2.1 RLHFが重要な理由
事前学習された言語モデル(GPTなど)は、次のトークンを予測するように訓練されています。しかし、この目的関数は以下を直接最適化しません:
- 有用性:有益で正確な回答を提供すること
- 無害性:有害、偏った、または毒性のある出力を避けること
- 誠実性:不確実性を認め、幻覚を起こさないこと
RLHFはこのギャップを埋めます。人間の嗜好を使って「良い」出力とは何かを定義し、RLを使ってモデルをその嗜好に向けて最適化します。
5.2.2 RLHFパイプライン
高品質な応答] --> SFT end subgraph "ステップ2: 報酬モデル訓練" SFT --> |応答を生成| RESP[応答ペア
プロンプトxに対するy1, y2] RESP --> |人間がランク付け| PREF[嗜好データ
y1 > y2] PREF --> |訓練| RM[報酬モデル
r = RM x,y] end subgraph "ステップ3: PPOによるRLファインチューニング" SFT --> |初期化| POLICY[方策モデル] RM --> |報酬を提供| PPO[PPO訓練] POLICY --> |生成| GEN[生成された応答] GEN --> RM PPO --> |更新| POLICY KL[KLペナルティ
SFTに近づく] --> PPO end POLICY --> FINAL[RLHF調整済みモデル
ChatGPT, Claude] style PT fill:#e3f2fd style SFT fill:#fff3e0 style RM fill:#f3e5f5 style POLICY fill:#e8f5e9 style FINAL fill:#ffcdd2
5.2.3 ステップ1: 教師ありファインチューニング(SFT)
事前学習済みLLMを高品質なデモンストレーションでファインチューニングします:
$$ \mathcal{L}_{\text{SFT}} = -\mathbb{E}_{(x, y) \sim \mathcal{D}_{\text{demo}}} \left[ \log \pi_\theta(y | x) \right] $$ここで $\mathcal{D}_{\text{demo}}$ はプロンプトに対する人間が書いた応答を含みます。
なぜ最初にSFTを行うのか
- RLの良い出発点を提供(事前学習モデルは任意のテキストを出力する)
- 有用な応答のフォーマットとスタイルをモデルに教える
- 報酬モデルの訓練を容易にする(応答がより妥当になる)
5.2.4 ステップ2: 報酬モデル訓練
比較データから人間の嗜好を予測する報酬モデルを訓練します。
データ収集
- 多様なデータセットからプロンプトをサンプリング
- SFTモデルを使って複数の応答を生成
- 人間のアノテーターが応答をランク付け(例:A > B)
Bradley-Terryモデル
応答 $y_1$ が $y_2$ より好まれる確率をモデル化:
$$ P(y_1 \succ y_2 | x) = \sigma(r(x, y_1) - r(x, y_2)) $$ここで $\sigma$ はシグモイド関数、$r(x, y)$ は報酬モデルです。
報酬モデルの損失関数
$$ \mathcal{L}_{\text{RM}} = -\mathbb{E}_{(x, y_w, y_l) \sim \mathcal{D}} \left[ \log \sigma(r(x, y_w) - r(x, y_l)) \right] $$ここで $y_w$ は好まれる(勝ち)応答、$y_l$ は拒否された(負け)応答です。
# 報酬モデル訓練(概念的なコード)
import torch
import torch.nn as nn
class RewardModel(nn.Module):
"""
RLHF用報酬モデル
(プロンプト, 応答)を受け取り、スカラー報酬を出力
"""
def __init__(self, base_model):
super().__init__()
self.backbone = base_model # 事前学習済みLLM
self.reward_head = nn.Linear(base_model.hidden_size, 1)
def forward(self, input_ids, attention_mask):
# 最終隠れ状態を取得
outputs = self.backbone(input_ids, attention_mask=attention_mask)
last_hidden = outputs.last_hidden_state[:, -1, :] # [CLS]または最終トークン
reward = self.reward_head(last_hidden)
return reward.squeeze(-1)
def reward_model_loss(reward_model, chosen_ids, rejected_ids, attention_mask_c, attention_mask_r):
"""
嗜好学習のためのBradley-Terry損失
"""
r_chosen = reward_model(chosen_ids, attention_mask_c)
r_rejected = reward_model(rejected_ids, attention_mask_r)
# 損失: -log(sigmoid(r_chosen - r_rejected))
loss = -torch.log(torch.sigmoid(r_chosen - r_rejected)).mean()
return loss
5.2.5 ステップ3: PPOファインチューニング
PPOを使って方策(LLM)を最適化し、SFTモデルに近い状態を保ちながら報酬モデルのスコアを最大化します。
RLHF目的関数
$$ \mathcal{L}_{\text{RLHF}} = \mathbb{E}_{x \sim \mathcal{D}, y \sim \pi_\theta} \left[ r(x, y) - \beta \cdot \text{KL}(\pi_\theta(y|x) \| \pi_{\text{SFT}}(y|x)) \right] $$KLペナルティは方策が以下を防ぎます:
- 自然言語から大きく逸脱すること(報酬ハッキング)
- 報酬モデルの欠陥を悪用すること
- 高スコアだが無意味な退化した出力を生成すること
言語モデルのためのPPO
第4章のPPOクリップ目的関数をトークンレベルの行動に適用します:
$$ \mathcal{L}^{\text{CLIP}}(\theta) = \mathbb{E}_t \left[ \min(r_t(\theta) \hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) \hat{A}_t) \right] $$ここで $r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\text{old}}(a_t|s_t)}$ は各トークンの確率比です。
# RLHF PPO訓練(簡略化した概念コード)
def compute_rlhf_reward(response_ids, prompt_ids, reward_model, ref_model, policy_model, beta=0.1):
"""
KLペナルティ付きRLHF報酬を計算
reward = r(x, y) - beta * KL(policy || reference)
"""
# 報酬モデルから報酬を取得
reward_score = reward_model(prompt_ids, response_ids)
# トークンごとのKLダイバージェンスを計算
with torch.no_grad():
ref_logprobs = ref_model.get_log_probs(prompt_ids, response_ids)
policy_logprobs = policy_model.get_log_probs(prompt_ids, response_ids)
kl_div = (policy_logprobs - ref_logprobs).sum(dim=-1)
# 合計報酬
total_reward = reward_score - beta * kl_div
return total_reward
def ppo_step(policy_model, old_logprobs, states, actions, advantages, epsilon=0.2):
"""
言語モデル方策のためのPPO更新
"""
new_logprobs = policy_model.get_log_probs(states, actions)
# 確率比
ratio = torch.exp(new_logprobs - old_logprobs)
# クリップ目的関数
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * advantages
loss = -torch.min(surr1, surr2).mean()
return loss
5.2.6 DPO: Direct Preference Optimization
DPO(Direct Preference Optimization)は、報酬モデルをスキップして嗜好を直接最適化する、RLHFのよりシンプルな代替手法です。
重要な洞察
RLHF目的関数の下での最適方策は閉形式で表現できます:
$$ \pi^*(y|x) = \frac{1}{Z(x)} \pi_{\text{ref}}(y|x) \exp\left(\frac{1}{\beta} r(x, y)\right) $$これは報酬を方策の観点から表現できることを意味します:
$$ r(x, y) = \beta \log \frac{\pi^*(y|x)}{\pi_{\text{ref}}(y|x)} + \beta \log Z(x) $$DPO損失関数
Bradley-Terry嗜好モデルに代入すると、DPO目的関数が得られます:
$$ \mathcal{L}_{\text{DPO}}(\theta) = -\mathbb{E}_{(x, y_w, y_l)} \left[ \log \sigma \left( \beta \log \frac{\pi_\theta(y_w|x)}{\pi_{\text{ref}}(y_w|x)} - \beta \log \frac{\pi_\theta(y_l|x)}{\pi_{\text{ref}}(y_l|x)} \right) \right] $$DPO vs RLHF
| 観点 | RLHF | DPO |
|---|---|---|
| 報酬モデル | 必要(別途訓練) | 不要(暗黙的) |
| RL訓練 | オンラインサンプリングでPPO | 教師あり学習のみ |
| 複雑さ | 高い(3段階) | 低い(SFT後1段階) |
| 安定性 | 不安定になりやすい(RLの問題) | より安定(教師あり学習) |
| 柔軟性 | 新しいプロンプトで反復可能 | 嗜好データセットに固定 |
# DPO実装(簡略化)
def dpo_loss(policy_model, ref_model, chosen_ids, rejected_ids, beta=0.1):
"""
Direct Preference Optimization損失
報酬モデル不要 - 嗜好を直接最適化
"""
# 方策と参照モデルから対数確率を取得
policy_chosen_logps = policy_model.get_log_probs(chosen_ids)
policy_rejected_logps = policy_model.get_log_probs(rejected_ids)
with torch.no_grad():
ref_chosen_logps = ref_model.get_log_probs(chosen_ids)
ref_rejected_logps = ref_model.get_log_probs(rejected_ids)
# 対数比を計算
chosen_ratio = beta * (policy_chosen_logps - ref_chosen_logps)
rejected_ratio = beta * (policy_rejected_logps - ref_rejected_logps)
# DPO損失: -log sigmoid(chosen_ratio - rejected_ratio)
loss = -torch.log(torch.sigmoid(chosen_ratio - rejected_ratio)).mean()
return loss
AIアラインメントにおけるRLHFの重要性: RLHFは単にモデルをより有用にするだけでなく、AI安全性にとって重要な技術です。人間のフィードバックから学習することで、純粋な次トークン予測から生じる有害な行動からモデルを遠ざけることができます。RLHFの「H」は、AIシステムに組み込みたい人間の価値観を表しています。
5.3 モデルベース強化学習
モデルベースRLは環境ダイナミクスのモデルを学習し、それを計画に使用することで、モデルフリー手法よりもはるかに高いサンプル効率を実現します。
5.3.1 ワールドモデルの概念
ワールドモデルは環境が行動にどう反応するかを予測します:
$$ \hat{s}_{t+1}, \hat{r}_t = f_\theta(s_t, a_t) $$これにより、エージェントは実際に環境で行動を取らずに結果を「想像」できます。
モデルベースRLの利点
| 利点 | 説明 |
|---|---|
| サンプル効率 | 環境との相互作用を10-100分の1に削減して学習 |
| 想像力 | 訓練用に無制限の合成経験を生成 |
| 計画 | 先読みして行動シーケンスを評価 |
| 転移 | ワールドモデルは同じ環境の新しいタスクに転移可能 |
5.3.2 DreamerV3(2023-2025)
DreamerV3は、150以上の多様なタスクで優れた結果を達成した最先端のワールドモデルアルゴリズムで、Minecraftでダイヤモンドをゼロから収集することにも成功しています。
主要なイノベーション
- Symlog予測:異なるスケールの報酬と価値を処理
- 離散潜在状態:安定した学習のためのカテゴリカル分布を使用
- 固定ハイパーパラメータ:大きく異なるドメインでも同じ設定が機能
DreamerV3アーキテクチャ
離散潜在] H[再帰状態 h_t] --> PRIOR[事前分布 z_t] A[行動 a_t-1] --> PRIOR POST --> DEC[デコーダ] H --> DEC DEC --> PRED[予測 x_t, r_t] POST --> |更新| H H --> |次ステップ| H end subgraph "想像による訓練" H --> |展開| IMAG[想像された軌道] IMAG --> ACTOR[Actor損失] IMAG --> CRITIC[Critic損失] end style POST fill:#27ae60,color:#fff style IMAG fill:#e74c3c,color:#fff
想像の中での学習
DreamerV3は完全に想像の中で方策を訓練します:
- 実経験を収集:環境と相互作用し、バッファに保存
- ワールドモデルを訓練:観測と報酬を予測することを学習
- 軌道を想像:現在の方策でワールドモデルを展開
- Actor-Criticを訓練:想像されたリターンを使って方策を更新
Minecraftダイヤモンド達成
DreamerV3は、人間のデモンストレーションやカリキュラム学習なしでMinecraftでダイヤモンドを収集した最初のアルゴリズムです。これは以下を必要とするタスクです:
- 20分以上のエピソード
- オープンエンドの探索
- 階層的なサブゴール(木材 -> 石 -> 鉄 -> ダイヤモンド)
- 疎な報酬
5.3.3 MuZeroの原理
MuZero(DeepMind、2020)は、学習したダイナミクスモデルとモンテカルロ木探索(MCTS)を組み合わせ、ゲームのルールを知らずにAtari、チェス、将棋、囲碁で超人的なパフォーマンスを達成しました。
重要な洞察
MuZeroは3つのコンポーネントを学習します:
- 表現:$h(o_t) \rightarrow s_0$ - 観測を隠れ状態にエンコード
- ダイナミクス:$g(s^k, a^k) \rightarrow s^{k+1}, r^k$ - 次の状態と報酬を予測
- 予測:$f(s^k) \rightarrow p^k, v^k$ - 方策と価値を出力
MuZeroアーキテクチャ
価値 v_0] S0 --> |g with a_0| S1[状態 s_1
報酬 r_0] S1 --> |f| PV1[p_1, v_1] S1 --> |g with a_1| S2[状態 s_2
報酬 r_1] S2 --> |f| PV2[p_2, v_2] subgraph "MCTS計画" PV0 --> MCTS[木探索] PV1 --> MCTS PV2 --> MCTS MCTS --> ACT[行動選択] end style MCTS fill:#9b59b6,color:#fff
MuZeroがルールなしで機能する理由
- 学習されたダイナミクス:計画に必要なもの(価値関連の特徴)のみを予測
- 隠れ状態:観測の無関係な詳細を抽象化
- MCTS:学習されたモデルを使って計画し、モデル誤差を補正
- エンドツーエンド:すべてのコンポーネントが価値予測のために共同で訓練
モデルベース vs モデルフリーの比較
| 観点 | モデルフリー(DQN, PPO) | モデルベース(DreamerV3, MuZero) |
|---|---|---|
| サンプル効率 | 低い(数百万ステップ) | 高い(10-100分の1) |
| 計算量 | ステップあたり低い | 高い(計画のオーバーヘッド) |
| モデル誤差 | 誤りうるモデルがない | 誤差が蓄積する可能性 |
| 汎化 | タスク固有 | モデルは転移可能 |
5.4 オフラインRLとDecision Transformer
オフラインRL(バッチRLとも呼ばれる)は、環境との相互作用なしに固定されたデータセットから方策を学習します。実世界での相互作用が高価または危険な場合に重要です。
5.4.1 オフラインRLの課題
標準的なRLアルゴリズムは分布シフトのためにオフラインデータでは失敗します:
- 方策がデータセットで見たことのない行動を選択する可能性
- 見たことのない行動のQ値は信頼できない(外挿誤差)
- 探索によって間違いを修正する方法がない
オフラインRLを使う場面
- 医療:患者記録から治療方針を学習
- ロボティクス:損傷のリスクなしにデモンストレーションデータを使用
- 推薦システム:ログに記録されたユーザー相互作用から学習
- 自動運転:記録された人間の運転で訓練
5.4.2 Decision Transformer
Decision Transformer(2021)はRLをシーケンスモデリング問題として再定式化し、Transformerを使って望ましいリターンに条件付けて行動を生成します。
重要な洞察
価値を予測してリターンを最適化する代わりに、望ましいリターンに条件付けてそれを達成する行動を予測します。
シーケンス形式
Decision Transformerは軌道をシーケンスとしてモデル化:
$$ \tau = (\hat{R}_1, s_1, a_1, \hat{R}_2, s_2, a_2, \ldots, \hat{R}_T, s_T, a_T) $$ここで $\hat{R}_t = \sum_{t'=t}^{T} r_{t'}$ はreturn-to-go(時刻 $t$ からの残りリターン)です。
因果的注意機構] TF --> PRED[予測行動 a_2] style TF fill:#9b59b6,color:#fff style PRED fill:#27ae60,color:#fff
訓練目的
オフライン軌道での単純な教師あり学習:
$$ \mathcal{L} = \mathbb{E}_{\tau \sim \mathcal{D}} \left[ \sum_t \| a_t - \hat{a}_t \|^2 \right] $$ここで $\hat{a}_t$ はコンテキスト $(\hat{R}_1, s_1, a_1, \ldots, \hat{R}_t, s_t)$ が与えられたときの予測行動です。
推論:リターン条件付け
テスト時に望ましいリターンを指定すると、モデルはそれを達成する行動を生成します:
- $\hat{R}_1$ を望ましい総リターン(例:エキスパートレベルの性能)に設定
- $s_1$ を観測し、$a_1$ を予測
- $a_1$ を実行し、$r_1, s_2$ を観測
- $\hat{R}_2 = \hat{R}_1 - r_1$ に更新
- 繰り返し
# Decision Transformer(簡略化実装)
import torch
import torch.nn as nn
class DecisionTransformer(nn.Module):
"""
オフラインRL用Decision Transformer
return-to-goに条件付けて行動を生成
"""
def __init__(self, state_dim, action_dim, hidden_dim=128,
n_layers=3, n_heads=1, max_length=20):
super().__init__()
self.hidden_dim = hidden_dim
self.max_length = max_length
# 各モダリティの埋め込み
self.state_embed = nn.Linear(state_dim, hidden_dim)
self.action_embed = nn.Linear(action_dim, hidden_dim)
self.return_embed = nn.Linear(1, hidden_dim)
# 位置埋め込み
self.pos_embed = nn.Embedding(max_length * 3, hidden_dim)
# Transformer
encoder_layer = nn.TransformerEncoderLayer(
d_model=hidden_dim, nhead=n_heads,
dim_feedforward=hidden_dim * 4, batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
# 行動予測ヘッド
self.action_head = nn.Linear(hidden_dim, action_dim)
def forward(self, returns_to_go, states, actions, timesteps):
"""
順伝播: コンテキストから行動を予測
Args:
returns_to_go: (batch, seq_len, 1)
states: (batch, seq_len, state_dim)
actions: (batch, seq_len, action_dim)
timesteps: (batch, seq_len)
"""
batch_size, seq_len = states.shape[0], states.shape[1]
# 各モダリティを埋め込み
state_embeddings = self.state_embed(states)
action_embeddings = self.action_embed(actions)
return_embeddings = self.return_embed(returns_to_go)
# インターリーブ: [R_1, s_1, a_1, R_2, s_2, a_2, ...]
# 形状: (batch, seq_len * 3, hidden_dim)
stacked = torch.stack([return_embeddings, state_embeddings, action_embeddings], dim=2)
stacked = stacked.reshape(batch_size, seq_len * 3, self.hidden_dim)
# 位置埋め込みを追加
positions = torch.arange(seq_len * 3, device=states.device).unsqueeze(0)
stacked = stacked + self.pos_embed(positions)
# 因果マスク(過去のみ参照可能)
mask = nn.Transformer.generate_square_subsequent_mask(seq_len * 3).to(states.device)
# Transformer順伝播
output = self.transformer(stacked, mask=mask)
# 行動を予測(状態位置から: インデックス 1, 4, 7, ...)
state_positions = torch.arange(1, seq_len * 3, 3)
action_preds = self.action_head(output[:, state_positions, :])
return action_preds
def get_action(self, returns_to_go, states, actions, timesteps):
"""現在の状態に対する行動を取得(推論モード)"""
action_preds = self.forward(returns_to_go, states, actions, timesteps)
return action_preds[:, -1, :] # 最後の予測行動を返す
# 使用例
if __name__ == "__main__":
print("Decision Transformerの例")
print("=" * 50)
state_dim, action_dim = 4, 2
model = DecisionTransformer(state_dim, action_dim)
# オフラインデータをシミュレート
batch_size, seq_len = 8, 10
returns = torch.randn(batch_size, seq_len, 1)
states = torch.randn(batch_size, seq_len, state_dim)
actions = torch.randn(batch_size, seq_len, action_dim)
timesteps = torch.arange(seq_len).unsqueeze(0).expand(batch_size, -1)
# 順伝播
pred_actions = model(returns, states, actions, timesteps)
print(f"入力形状: states {states.shape}")
print(f"出力形状: predicted actions {pred_actions.shape}")
# 推論: 新しい状態に対する行動を取得
action = model.get_action(returns, states, actions, timesteps)
print(f"推論行動: {action[0].detach().numpy()}")
Decision Transformerの結果
- AtariとD4RLベンチマークで専門的なオフラインRLアルゴリズムに匹敵またはそれを上回る
- 新しいリターンターゲットへのゼロショット汎化
- 多くのオフラインRL手法より実装がシンプル(単なる教師あり学習)
- 注意機構を通じて長期的なクレジット割り当てを自然に処理
5.5 マルチエージェント強化学習
マルチエージェントRL(MARL)は、複数のエージェントが共有環境で学習し相互作用することを扱います。
5.5.1 CTDE: 集中訓練分散実行
MARLの主要なパラダイム:
- 訓練:グローバル状態とすべてのエージェントの観測にアクセス可能
- 実行:各エージェントはローカルな観測のみに基づいて行動
5.5.2 協調的 vs 競争的設定
| 設定 | 説明 | 例 |
|---|---|---|
| 協調的 | 共有報酬、チーム目標 | ロボット群、StarCraftマイクロマネジメント |
| 競争的 | ゼロサム、敵対的 | チェス、囲碁、ポーカー |
| 混合 | 協調と競争の両方 | 交通、交渉、社会的ジレンマ |
5.5.3 主要なアルゴリズム
- QMIX:分散Q関数を集中ミキシングネットワークで学習
- MAPPO:共有Criticを持つマルチエージェントPPO(シンプルだが効果的)
- MADDPG:連続行動用のマルチエージェントDDPG
5.6 安全な強化学習
実世界のRLアプリケーションでは、訓練と展開の両方で安全制約が必要です。
5.6.1 制約付きMDP
標準MDPをコスト制約で拡張:
$$ \max_\pi J(\pi) = \mathbb{E}\left[\sum_t \gamma^t r_t\right] \quad \text{s.t.} \quad C_i(\pi) = \mathbb{E}\left[\sum_t \gamma^t c_i(s_t, a_t)\right] \leq d_i $$ここで $c_i$ はコスト関数(例:危険な行動)、$d_i$ は閾値です。
5.6.2 解法
- ラグランジュ法:制約を学習された乗数付きペナルティ項に変換
- CPO(Constrained Policy Optimization):制約保証付き信頼領域
- 報酬シェーピング:危険な状態/行動にペナルティを追加
- シールドベース手法:危険な行動を安全なデフォルトでオーバーライド
5.6.3 実世界の安全要件
| ドメイン | 安全制約 |
|---|---|
| ロボティクス | 関節制限、衝突回避、力制限 |
| 自動運転 | 車線境界、最小距離、速度制限 |
| 医療 | 薬剤投与量制限、治療安全閾値 |
| 金融 | ポジション制限、ドローダウン制約、リスク予算 |
5.7 実世界への応用
5.7.1 ロボティクス:Sim-to-Real転移
シミュレーションでロボットを訓練し、実世界に転移します。
主要な技術
- ドメインランダム化:シミュレーションで物理、視覚、ノイズをランダム化
- システム同定:実ロボットのパラメータを推定
- プログレッシブ訓練:環境のリアリズムを徐々に増加
SLIMシステム(2024)
スタンフォードのSim-to-real Learning for Manipulation:
- Isaac Gymシミュレーションで訓練
- 実ロボットアームに転移
- 未知の物体や変形可能な材料を扱える
5.7.2 自動運転
RLは以下に使用されます:
- 意思決定:車線変更、交差点、合流
- 運動計画:軌道最適化
- シミュレーション:リアルな交通シナリオの生成
5.7.3 ゲームAI
| システム | ゲーム | 達成 |
|---|---|---|
| AlphaGo | 囲碁 | 世界チャンピオン李世ドルに勝利 |
| AlphaStar | StarCraft II | グランドマスターレベル、トッププロに勝利 |
| OpenAI Five | Dota 2 | 世界チャンピオンOGに勝利 |
| MuZero | Atari、チェス、将棋 | ルールを知らずに超人的性能 |
5.7.4 リソース最適化
- データセンター冷却:DeepMindがGoogleの冷却エネルギーを40%削減
- ネットワークルーティング:リアルタイムでトラフィックフローを最適化
- 電力グリッド:再生可能エネルギーで供給と需要をバランス
5.8 Stable-Baselines3の実践例
5.8.1 連続制御でのSAC
# Requirements:
# - Python 3.9+
# - stable-baselines3>=2.1.0
# - gymnasium>=0.29.0
"""
Stable-Baselines3 SAC例
最小限のコードでプロフェッショナルグレードのRL訓練
"""
import gymnasium as gym
from stable_baselines3 import SAC
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import numpy as np
def train_sac_pendulum():
"""モニタリングと評価付きでPendulum-v1でSACを訓練"""
print("Stable-Baselines3でSAC訓練")
print("=" * 50)
# モニタリング付き環境を作成
env = gym.make("Pendulum-v1")
env = Monitor(env)
# 評価用環境
eval_env = gym.make("Pendulum-v1")
eval_env = Monitor(eval_env)
# 最適化されたハイパーパラメータでSAC
model = SAC(
"MlpPolicy",
env,
learning_rate=3e-4,
buffer_size=100000,
learning_starts=1000,
batch_size=256,
tau=0.005, # ソフト更新係数
gamma=0.99,
train_freq=1,
gradient_steps=1,
ent_coef="auto", # 自動エントロピー調整
verbose=1,
tensorboard_log="./sac_pendulum_tensorboard/"
)
# モニタリング用コールバック
eval_callback = EvalCallback(
eval_env,
best_model_save_path="./logs/best_model/",
log_path="./logs/",
eval_freq=5000,
deterministic=True,
render=False
)
checkpoint_callback = CheckpointCallback(
save_freq=10000,
save_path="./logs/checkpoints/",
name_prefix="sac_pendulum"
)
# 訓練
print("\n訓練を開始...")
model.learn(
total_timesteps=50000,
callback=[eval_callback, checkpoint_callback],
progress_bar=True
)
# 最終性能を評価
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10)
print(f"\n最終評価: {mean_reward:.2f} +/- {std_reward:.2f}")
# 最終モデルを保存
model.save("sac_pendulum_final")
print("モデルを sac_pendulum_final.zip に保存しました")
return model
def train_ppo_lunarlander():
"""ベクトル化環境でLunarLanderでPPOを訓練"""
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
print("\nベクトル化環境でPPO訓練")
print("=" * 50)
# ベクトル化環境(4つの並列環境)
env = make_vec_env("LunarLander-v2", n_envs=4)
model = PPO(
"MlpPolicy",
env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
verbose=1
)
print("100kタイムステップで訓練中...")
model.learn(total_timesteps=100000, progress_bar=True)
# 評価
eval_env = gym.make("LunarLander-v2")
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10)
print(f"評価: {mean_reward:.2f} +/- {std_reward:.2f}")
return model
def hyperparameter_tuning_example():
"""Optunaによるハイパーパラメータチューニング例(オプション)"""
try:
import optuna
from stable_baselines3 import PPO
print("\nOptunaによるハイパーパラメータチューニング")
print("=" * 50)
def objective(trial):
"""Optunaの目的関数"""
lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True)
n_steps = trial.suggest_categorical("n_steps", [256, 512, 1024, 2048])
gamma = trial.suggest_float("gamma", 0.9, 0.9999)
env = gym.make("CartPole-v1")
model = PPO("MlpPolicy", env, learning_rate=lr, n_steps=n_steps, gamma=gamma, verbose=0)
model.learn(total_timesteps=10000)
mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
return mean_reward
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=10, show_progress_bar=True)
print(f"\n最適ハイパーパラメータ: {study.best_params}")
print(f"最高報酬: {study.best_value:.2f}")
except ImportError:
print("Optunaがインストールされていません。実行: pip install optuna")
if __name__ == "__main__":
# 例1: PendulumでSAC
sac_model = train_sac_pendulum()
# 例2: LunarLanderでPPO
ppo_model = train_ppo_lunarlander()
# 例3: ハイパーパラメータチューニング(オプション)
# hyperparameter_tuning_example()
print("\n" + "=" * 50)
print("すべての訓練例が完了しました!")
print("TensorBoardログを表示: tensorboard --logdir ./sac_pendulum_tensorboard/")
まとめと今後の展望
学んだこと
- SAC:頑健でサンプル効率の良い連続制御のための最大エントロピーRL
- RLHF:ChatGPTやClaudeなどのLLMを人間の価値観と調整する重要な技術
- DPO:明示的な報酬モデルなしのRLHFのよりシンプルな代替手法
- モデルベースRL:サンプル効率の良い学習のためのDreamerV3とMuZero
- オフラインRL:静的データセットから学習するためのDecision Transformer
- マルチエージェントRL:協調的・競争的設定のためのCTDEパラダイム
- 安全なRL:実世界展開のための制約付き最適化
- Stable-Baselines3:プロダクションレディなRL実装
今後の方向性(2025年以降)
| 方向性 | 説明 |
|---|---|
| RL基盤モデル | タスクや環境を超えて転移する事前学習モデル |
| RLHFの改善 | Constitutional AI、議論、スケーラブルな監視 |
| 大規模Sim-to-Real | シミュレーションから多様な実ロボットへのより良い転移 |
| 階層的RL | 再利用可能なスキルと時間的抽象化の学習 |
| マルチモーダルRL | ロボティクスのためのビジョン・言語・行動モデル |
参考資料
- Stable-Baselines3ドキュメント
- OpenAI Spinning Up in Deep RL
- Sutton and Barto: Reinforcement Learning Book
- SAC論文(Haarnoja et al., 2018)
- InstructGPT / RLHF論文(Ouyang et al., 2022)
- DPO論文(Rafailov et al., 2023)
- DreamerV3論文(Hafner et al., 2023)
- Decision Transformer論文(Chen et al., 2021)
演習問題
演習5.1: SAC温度パラメータの分析
問題:固定alpha値(0.05、0.1、0.2、0.5)と自動調整をPendulum-v1で比較してください。
タスク:
- 異なるalpha設定で5つのSACエージェントを訓練
- 学習曲線と最終性能をプロット
- alphaが探索と活用にどう影響するか分析
- 訓練中のエントロピーを追跡
期待される洞察:自動調整は手動チューニングなしで良好な性能を達成するはずです。
演習5.2: RLHF報酬モデル
問題:テキスト分類の嗜好に対する簡単な報酬モデルを実装してください。
タスク:
- 合成嗜好データを作成(例:「有用」vs「無用」な応答)
- Bradley-Terry損失を使って報酬モデルを訓練
- ホールドアウト嗜好での報酬モデル精度を評価
- モデルが何を学んだか分析
ヒント:事前学習済みの文エンコーダ(例:sentence-transformers)をバックボーンとして使用してください。
演習5.3: Decision Transformer実装
問題:CartPoleオフラインデータでDecision Transformerを訓練してください。
タスク:
- 訓練済みPPOエージェントからオフラインデータを収集
- Decision Transformerアーキテクチャを実装
- オフライン軌道で訓練
- 異なるターゲットリターンで評価
ヒント:短いコンテキスト長(5-10ステップ)から始めると反復が速くなります。
演習5.4: マルチエージェント協調タスク
問題:エージェントが協力する必要がある簡単なマルチエージェント環境を実装してください。
タスク:
- 2+エージェントがゴールに到達する必要があるグリッドワールドを作成
- 共有報酬(協調設定)を実装
- 独立学習者 vs 集中Criticで訓練
- 創発的な協調を分析
ヒント:マルチエージェント環境にはPettingZooを使用してください。
演習5.5: 制約付き安全なRL
問題:標準RLタスクに安全制約を追加してください。
タスク:
- CartPoleを「危険ゾーン」(特定の角度が危険)を持つように修正
- 危険な状態のコスト関数を定義
- ラグランジュベースの制約付き最適化を実装
- 制約なし vs 制約付きエージェントの行動を比較
ヒント:完全なCPOの前に、シンプルな報酬ペナルティアプローチから始めてください。