🌐 JP | 🇬🇧 EN | Last sync: 2025-11-16

第5章:高度なRL手法と応用 (Advanced RL Methods and Applications)

最新アルゴリズムから実世界応用まで

📖 読了時間: 25-30分 📊 難易度: 上級 💻 コード例: 7個 📝 演習問題: 5問

学習目標

この章を読むことで、以下を習得できます:


5.1 A3C (Asynchronous Advantage Actor-Critic)

A3Cの概要

A3C (Asynchronous Advantage Actor-Critic)は、DeepMindが2016年に提案した並列学習アルゴリズムです。複数のワーカーが非同期的に環境と相互作用し、グローバルネットワークを更新することで、高速かつ安定した学習を実現します。

graph TB GN[Global Network
共有パラメータ θ] W1[Worker 1
環境コピー 1] W2[Worker 2
環境コピー 2] W3[Worker 3
環境コピー 3] Wn[Worker N
環境コピー N] W1 -->|勾配更新| GN W2 -->|勾配更新| GN W3 -->|勾配更新| GN Wn -->|勾配更新| GN GN -->|パラメータ同期| W1 GN -->|パラメータ同期| W2 GN -->|パラメータ同期| W3 GN -->|パラメータ同期| Wn style GN fill:#e3f2fd style W1 fill:#c8e6c9 style W2 fill:#c8e6c9 style W3 fill:#c8e6c9 style Wn fill:#c8e6c9

A3Cの主要コンポーネント

コンポーネント 説明 特徴
非同期更新 各ワーカーが独立して学習 Experience Replay不要、メモリ効率的
Advantage関数 $A(s, a) = Q(s, a) - V(s)$ 分散減少、安定した学習
エントロピー正則化 探索を促進 早期収束を防ぐ
並列実行 複数環境で同時学習 学習速度向上、多様なデータ

A3Cのアルゴリズム

各ワーカーは以下の手順を繰り返します:

  1. パラメータ同期: グローバルネットワークからパラメータをコピー $\theta' \leftarrow \theta$
  2. 経験収集: $t_{\text{max}}$ ステップまたは終端まで $(s_t, a_t, r_t)$ を収集
  3. リターン計算: $n$ステップリターン $R_t = \sum_{i=0}^{n-1} \gamma^i r_{t+i} + \gamma^n V(s_{t+n})$
  4. 勾配計算: アクターとクリティックの損失を計算
  5. 非同期更新: グローバルネットワークを更新

損失関数は以下の通りです:

$$ \mathcal{L}_{\text{actor}} = -\log \pi(a_t | s_t; \theta) A_t - \beta H(\pi(\cdot | s_t; \theta)) $$ $$ \mathcal{L}_{\text{critic}} = (R_t - V(s_t; \theta))^2 $$

ここで、$H(\pi)$はエントロピー、$\beta$はエントロピー正則化係数、$A_t = R_t - V(s_t)$はAdvantage推定値です。

A3Cの概念実装

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.multiprocessing as mp
from torch.distributions import Categorical
import gymnasium as gym
import numpy as np

class A3CNetwork(nn.Module):
    """
    A3C用のアクター・クリティック共有ネットワーク

    Architecture:
    - 共有層: 特徴抽出
    - アクター出力: 行動確率分布
    - クリティック出力: 状態価値関数
    """

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        """
        Args:
            state_dim: 状態空間の次元
            action_dim: 行動空間の次元
            hidden_dim: 隠れ層の次元
        """
        super(A3CNetwork, self).__init__()

        # 共有特徴抽出層
        self.shared_fc1 = nn.Linear(state_dim, hidden_dim)
        self.shared_fc2 = nn.Linear(hidden_dim, hidden_dim)

        # アクター出力(行動確率)
        self.actor_head = nn.Linear(hidden_dim, action_dim)

        # クリティック出力(状態価値)
        self.critic_head = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        """
        前向き計算

        Args:
            state: 状態 (batch_size, state_dim)

        Returns:
            action_probs: 行動確率分布 (batch_size, action_dim)
            state_value: 状態価値 (batch_size, 1)
        """
        # 共有層
        x = F.relu(self.shared_fc1(state))
        x = F.relu(self.shared_fc2(x))

        # アクター出力
        action_logits = self.actor_head(x)
        action_probs = F.softmax(action_logits, dim=-1)

        # クリティック出力
        state_value = self.critic_head(x)

        return action_probs, state_value


class A3CWorker:
    """
    A3Cワーカー: 独立した環境で学習し、グローバルネットワークを更新

    Features:
    - 非同期パラメータ更新
    - n-step returns計算
    - エントロピー正則化
    """

    def __init__(self, worker_id, global_network, optimizer,
                 env_name='CartPole-v1', gamma=0.99,
                 max_steps=20, entropy_coef=0.01):
        """
        Args:
            worker_id: ワーカーID
            global_network: 共有グローバルネットワーク
            optimizer: 共有オプティマイザー
            env_name: 環境名
            gamma: 割引率
            max_steps: n-stepリターンのステップ数
            entropy_coef: エントロピー正則化係数
        """
        self.worker_id = worker_id
        self.env = gym.make(env_name)
        self.global_network = global_network
        self.optimizer = optimizer
        self.gamma = gamma
        self.max_steps = max_steps
        self.entropy_coef = entropy_coef

        # ローカルネットワーク(グローバルと同じ構造)
        state_dim = self.env.observation_space.shape[0]
        action_dim = self.env.action_space.n
        self.local_network = A3CNetwork(state_dim, action_dim)

    def compute_returns(self, rewards, next_value, dones):
        """
        n-stepリターンを計算

        Args:
            rewards: 報酬リスト
            next_value: 最後の状態の価値推定
            dones: 終端フラグリスト

        Returns:
            returns: 各ステップのリターン
        """
        returns = []
        R = next_value

        # 逆順で計算
        for r, done in zip(reversed(rewards), reversed(dones)):
            R = r + self.gamma * R * (1 - done)
            returns.insert(0, R)

        return returns

    def train_step(self):
        """
        1エピソード分のトレーニング

        Returns:
            total_reward: エピソード合計報酬
        """
        # グローバルネットワークからパラメータ同期
        self.local_network.load_state_dict(self.global_network.state_dict())

        state, _ = self.env.reset()
        done = False

        states, actions, rewards, dones, values = [], [], [], [], []
        episode_reward = 0

        while not done:
            # 行動選択
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            action_probs, value = self.local_network(state_tensor)

            dist = Categorical(action_probs)
            action = dist.sample()

            # 環境ステップ
            next_state, reward, terminated, truncated, _ = self.env.step(action.item())
            done = terminated or truncated

            # 経験を保存
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            dones.append(done)
            values.append(value)

            episode_reward += reward
            state = next_state

            # max_stepsごとまたは終端で更新
            if len(states) >= self.max_steps or done:
                # 次状態の価値推定
                if done:
                    next_value = 0
                else:
                    next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
                    _, next_value = self.local_network(next_state_tensor)
                    next_value = next_value.item()

                # リターン計算
                returns = self.compute_returns(rewards, next_value, dones)

                # 損失計算
                self._update_global_network(states, actions, returns, values)

                # バッファクリア
                states, actions, rewards, dones, values = [], [], [], [], []

        return episode_reward

    def _update_global_network(self, states, actions, returns, values):
        """
        グローバルネットワークを更新

        Args:
            states: 状態リスト
            actions: 行動リスト
            returns: リターンリスト
            values: 価値推定リスト
        """
        states_tensor = torch.FloatTensor(states)
        actions_tensor = torch.LongTensor(actions)
        returns_tensor = torch.FloatTensor(returns)

        # 再計算
        action_probs, state_values = self.local_network(states_tensor)
        state_values = state_values.squeeze()

        # Advantage計算
        advantages = returns_tensor - state_values.detach()

        # アクター損失(Policy Gradient + Entropy)
        dist = Categorical(action_probs)
        log_probs = dist.log_prob(actions_tensor)
        entropy = dist.entropy().mean()
        actor_loss = -(log_probs * advantages).mean() - self.entropy_coef * entropy

        # クリティック損失(MSE)
        critic_loss = F.mse_loss(state_values, returns_tensor)

        # 合計損失
        total_loss = actor_loss + critic_loss

        # グローバルネットワーク更新
        self.optimizer.zero_grad()
        total_loss.backward()

        # 勾配クリッピング
        torch.nn.utils.clip_grad_norm_(self.local_network.parameters(), 40)

        # グローバルネットワークに勾配を転送
        for local_param, global_param in zip(
            self.local_network.parameters(),
            self.global_network.parameters()
        ):
            if global_param.grad is not None:
                return  # 他のワーカーが更新中
            global_param._grad = local_param.grad

        self.optimizer.step()


def worker_process(worker_id, global_network, optimizer, num_episodes=100):
    """
    ワーカープロセス関数(並列実行用)

    Args:
        worker_id: ワーカーID
        global_network: グローバルネットワーク
        optimizer: 共有オプティマイザー
        num_episodes: エピソード数
    """
    worker = A3CWorker(worker_id, global_network, optimizer)

    for episode in range(num_episodes):
        reward = worker.train_step()
        if episode % 10 == 0:
            print(f"Worker {worker_id} - Episode {episode}, Reward: {reward:.2f}")


# A3C訓練例(シングルプロセス版 - 概念実証用)
def train_a3c_simple():
    """
    A3C訓練の簡易版(並列処理なし)
    実際のA3Cは multiprocessing を使用
    """
    env = gym.make('CartPole-v1')
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    # グローバルネットワーク
    global_network = A3CNetwork(state_dim, action_dim)
    global_network.share_memory()  # プロセス間共有用

    optimizer = torch.optim.Adam(global_network.parameters(), lr=0.0001)

    # 単一ワーカーでのトレーニング例
    worker = A3CWorker(0, global_network, optimizer)

    rewards = []
    for episode in range(100):
        reward = worker.train_step()
        rewards.append(reward)

        if episode % 10 == 0:
            avg_reward = np.mean(rewards[-10:])
            print(f"Episode {episode}, Avg Reward: {avg_reward:.2f}")

    return global_network, rewards


# 実行例
if __name__ == "__main__":
    print("A3C Training (Simple Version)")
    print("=" * 50)
    model, rewards = train_a3c_simple()
    print(f"Training completed. Final avg reward: {np.mean(rewards[-10:]):.2f}")

A3Cの実装ポイント: 完全な並列版はPythonのmultiprocessingを使用しますが、上記は概念を示す簡易版です。実際のA3Cでは、複数のワーカープロセスが同時にグローバルネットワークを更新します。エントロピー正則化により探索が促進され、勾配クリッピングにより学習が安定します。


5.2 SAC (Soft Actor-Critic)

SACの概要

SAC (Soft Actor-Critic)は、最大エントロピー強化学習フレームワークに基づくオフポリシーアルゴリズムです。報酬最大化と探索のバランスを自動的に調整し、連続行動空間で優れた性能を発揮します。

graph LR S[状態 s] --> A[Actor π
確率的方策] S --> Q1[Q-Network 1
Q₁s,a] S --> Q2[Q-Network 2
Q₂s,a] S --> V[Value Network
Vs] A --> |行動 a| E[環境] Q1 --> |最小値| MIN[min Q] Q2 --> |最小値| MIN E --> |報酬 + エントロピー| R[最大化目標] MIN --> R V --> R style A fill:#e3f2fd style Q1 fill:#fff9c4 style Q2 fill:#fff9c4 style V fill:#c8e6c9 style R fill:#ffccbc

SACの主要特徴

特徴 説明 利点
最大エントロピー目標 報酬 + エントロピーを最大化 自動探索、ロバストな方策
Double Q-Learning 2つのQ-Networkで過大推定を防ぐ 安定した学習
Off-Policy Experience Replayを使用 サンプル効率が高い
自動温度調整 エントロピー係数αを学習 ハイパーパラメータ調整不要

SACの目的関数

SACは以下の最大エントロピー目的を最適化します:

$$ J(\pi) = \mathbb{E}_{\tau \sim \pi} \left[ \sum_{t=0}^{\infty} \gamma^t (r(s_t, a_t) + \alpha \mathcal{H}(\pi(\cdot | s_t))) \right] $$

ここで、$\mathcal{H}(\pi)$は方策のエントロピー、$\alpha$は温度パラメータです。

アクター更新(方策改善):

$$ \mathcal{L}_{\pi}(\theta) = \mathbb{E}_{s_t \sim \mathcal{D}} \left[ \mathbb{E}_{a_t \sim \pi_\theta} [\alpha \log \pi_\theta(a_t | s_t) - Q(s_t, a_t)] \right] $$

クリティック更新(Bellman誤差最小化):

$$ \mathcal{L}_Q(\phi) = \mathbb{E}_{(s, a, r, s') \sim \mathcal{D}} \left[ (Q_\phi(s, a) - (r + \gamma V(s')))^2 \right] $$

ここで、$V(s') = \mathbb{E}_{a' \sim \pi}[Q(s', a') - \alpha \log \pi(a' | s')]$です。

SACの実装

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np
from collections import deque
import random

class GaussianPolicy(nn.Module):
    """
    SAC用のガウス方策ネットワーク

    Architecture:
    - 状態を入力
    - 平均μと標準偏差σを出力
    - Reparameterization Trickで微分可能な行動サンプリング
    """

    def __init__(self, state_dim, action_dim, hidden_dim=256,
                 log_std_min=-20, log_std_max=2):
        """
        Args:
            state_dim: 状態空間の次元
            action_dim: 行動空間の次元
            hidden_dim: 隠れ層の次元
            log_std_min: log標準偏差の最小値
            log_std_max: log標準偏差の最大値
        """
        super(GaussianPolicy, self).__init__()

        self.log_std_min = log_std_min
        self.log_std_max = log_std_max

        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)

        self.mean = nn.Linear(hidden_dim, action_dim)
        self.log_std = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        """
        前向き計算

        Args:
            state: 状態 (batch_size, state_dim)

        Returns:
            mean: 行動分布の平均
            log_std: 行動分布のlog標準偏差
        """
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))

        mean = self.mean(x)
        log_std = self.log_std(x)
        log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)

        return mean, log_std

    def sample(self, state):
        """
        Reparameterization Trickによる行動サンプリング

        Args:
            state: 状態

        Returns:
            action: サンプルされた行動(tanh squashing適用後)
            log_prob: 行動の対数確率
        """
        mean, log_std = self.forward(state)
        std = log_std.exp()

        # ガウス分布からサンプリング
        normal = Normal(mean, std)
        x_t = normal.rsample()  # Reparameterization trick

        # tanh squashingで[-1, 1]に制限
        action = torch.tanh(x_t)

        # 対数確率(tanh変換の補正含む)
        log_prob = normal.log_prob(x_t)
        log_prob -= torch.log(1 - action.pow(2) + 1e-6)
        log_prob = log_prob.sum(1, keepdim=True)

        return action, log_prob


class QNetwork(nn.Module):
    """
    SAC用のQ-Network(状態-行動価値関数)
    """

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(QNetwork, self).__init__()

        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        """
        Q値を計算

        Args:
            state: 状態 (batch_size, state_dim)
            action: 行動 (batch_size, action_dim)

        Returns:
            q_value: Q値 (batch_size, 1)
        """
        x = torch.cat([state, action], dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        q_value = self.fc3(x)
        return q_value


class ReplayBuffer:
    """Experience Replay Buffer"""

    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*batch)
        return (
            np.array(state),
            np.array(action),
            np.array(reward).reshape(-1, 1),
            np.array(next_state),
            np.array(done).reshape(-1, 1)
        )

    def __len__(self):
        return len(self.buffer)


class SAC:
    """
    Soft Actor-Critic Implementation

    Features:
    - Maximum entropy reinforcement learning
    - Double Q-learning for stability
    - Automatic temperature tuning
    - Off-policy learning with replay buffer
    """

    def __init__(self, state_dim, action_dim,
                 lr=3e-4, gamma=0.99, tau=0.005, alpha=0.2,
                 automatic_entropy_tuning=True):
        """
        Args:
            state_dim: 状態空間の次元
            action_dim: 行動空間の次元
            lr: 学習率
            gamma: 割引率
            tau: ターゲットネットワーク更新率
            alpha: エントロピー係数(automatic_entropy_tuning=Falseの場合)
            automatic_entropy_tuning: 自動温度調整を使用するか
        """
        self.gamma = gamma
        self.tau = tau
        self.alpha = alpha

        # ネットワーク初期化
        self.policy = GaussianPolicy(state_dim, action_dim)

        self.q_net1 = QNetwork(state_dim, action_dim)
        self.q_net2 = QNetwork(state_dim, action_dim)

        self.target_q_net1 = QNetwork(state_dim, action_dim)
        self.target_q_net2 = QNetwork(state_dim, action_dim)

        # ターゲットネットワークのパラメータをコピー
        self.target_q_net1.load_state_dict(self.q_net1.state_dict())
        self.target_q_net2.load_state_dict(self.q_net2.state_dict())

        # オプティマイザ
        self.policy_optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
        self.q1_optimizer = torch.optim.Adam(self.q_net1.parameters(), lr=lr)
        self.q2_optimizer = torch.optim.Adam(self.q_net2.parameters(), lr=lr)

        # 自動温度調整
        self.automatic_entropy_tuning = automatic_entropy_tuning
        if automatic_entropy_tuning:
            self.target_entropy = -action_dim
            self.log_alpha = torch.zeros(1, requires_grad=True)
            self.alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=lr)

        self.replay_buffer = ReplayBuffer()

    def select_action(self, state, evaluate=False):
        """
        行動選択

        Args:
            state: 状態
            evaluate: 評価モード(決定的行動)

        Returns:
            action: 選択された行動
        """
        state = torch.FloatTensor(state).unsqueeze(0)

        if evaluate:
            with torch.no_grad():
                mean, _ = self.policy(state)
                action = torch.tanh(mean)
        else:
            with torch.no_grad():
                action, _ = self.policy.sample(state)

        return action.cpu().numpy()[0]

    def update(self, batch_size=256):
        """
        SAC更新ステップ

        Args:
            batch_size: バッチサイズ
        """
        if len(self.replay_buffer) < batch_size:
            return

        # バッファからサンプリング
        state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)

        state = torch.FloatTensor(state)
        action = torch.FloatTensor(action)
        reward = torch.FloatTensor(reward)
        next_state = torch.FloatTensor(next_state)
        done = torch.FloatTensor(done)

        # --- Q-Network更新 ---
        with torch.no_grad():
            next_action, next_log_prob = self.policy.sample(next_state)

            # Double Q-learning: 最小値を使用
            target_q1 = self.target_q_net1(next_state, next_action)
            target_q2 = self.target_q_net2(next_state, next_action)
            target_q = torch.min(target_q1, target_q2)

            # エントロピー項を含むターゲット
            target_value = reward + (1 - done) * self.gamma * (
                target_q - self.alpha * next_log_prob
            )

        # Q1損失
        q1_value = self.q_net1(state, action)
        q1_loss = F.mse_loss(q1_value, target_value)

        # Q2損失
        q2_value = self.q_net2(state, action)
        q2_loss = F.mse_loss(q2_value, target_value)

        # Q-Network更新
        self.q1_optimizer.zero_grad()
        q1_loss.backward()
        self.q1_optimizer.step()

        self.q2_optimizer.zero_grad()
        q2_loss.backward()
        self.q2_optimizer.step()

        # --- Policy更新 ---
        new_action, log_prob = self.policy.sample(state)

        q1_new = self.q_net1(state, new_action)
        q2_new = self.q_net2(state, new_action)
        q_new = torch.min(q1_new, q2_new)

        policy_loss = (self.alpha * log_prob - q_new).mean()

        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()

        # --- 温度パラメータ更新(自動調整) ---
        if self.automatic_entropy_tuning:
            alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()

            self.alpha_optimizer.zero_grad()
            alpha_loss.backward()
            self.alpha_optimizer.step()

            self.alpha = self.log_alpha.exp().item()

        # --- ターゲットネットワークのソフト更新 ---
        self._soft_update(self.q_net1, self.target_q_net1)
        self._soft_update(self.q_net2, self.target_q_net2)

    def _soft_update(self, source, target):
        """
        ターゲットネットワークのソフト更新
        θ_target = τ * θ_source + (1 - τ) * θ_target
        """
        for target_param, source_param in zip(target.parameters(), source.parameters()):
            target_param.data.copy_(
                self.tau * source_param.data + (1 - self.tau) * target_param.data
            )


# SAC訓練例
def train_sac():
    """SAC訓練の実行例(Pendulum環境)"""
    import gymnasium as gym

    env = gym.make('Pendulum-v1')
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]

    agent = SAC(state_dim, action_dim)

    num_episodes = 100
    max_steps = 200

    for episode in range(num_episodes):
        state, _ = env.reset()
        episode_reward = 0

        for step in range(max_steps):
            # 行動選択
            action = agent.select_action(state)

            # 環境ステップ
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # バッファに保存
            agent.replay_buffer.push(state, action, reward, next_state, done)

            # 更新
            agent.update()

            episode_reward += reward
            state = next_state

            if done:
                break

        if episode % 10 == 0:
            print(f"Episode {episode}, Reward: {episode_reward:.2f}, Alpha: {agent.alpha:.3f}")

    return agent


if __name__ == "__main__":
    print("SAC Training on Pendulum-v1")
    print("=" * 50)
    agent = train_sac()
    print("Training completed!")

SACの実装ポイント: Reparameterization Trickにより方策が微分可能になり、効率的な勾配ベース最適化が可能です。Double Q-learningで過大推定を防ぎ、自動温度調整により探索と活用のバランスが自動的に最適化されます。tanh squashingにより行動が有界範囲に制限されます。


5.3 マルチエージェント強化学習 (Multi-Agent RL)

マルチエージェント強化学習の基本

マルチエージェント強化学習 (MARL)では、複数のエージェントが同じ環境で同時に学習・行動します。エージェント間の相互作用により、シングルエージェントRLとは異なる課題が生じます。

graph TB ENV[環境 Environment] A1[Agent 1
方策 π₁] A2[Agent 2
方策 π₂] A3[Agent 3
方策 π₃] A1 --> |行動 a₁| ENV A2 --> |行動 a₂| ENV A3 --> |行動 a₃| ENV ENV --> |観測 o₁, 報酬 r₁| A1 ENV --> |観測 o₂, 報酬 r₂| A2 ENV --> |観測 o₃, 報酬 r₃| A3 A1 -.-> |観測・通信| A2 A2 -.-> |観測・通信| A3 A3 -.-> |観測・通信| A1 style ENV fill:#e3f2fd style A1 fill:#c8e6c9 style A2 fill:#fff9c4 style A3 fill:#ffccbc

MARLの主要パラダイム

パラダイム 説明 用途
Cooperative(協調) 全エージェントが共通目標を共有 チームスポーツ、協調ロボット
Competitive(競争) エージェント間でゼロサム ゲームAI、対戦型タスク
Mixed(混合) 協調と競争の両方が存在 経済シミュレーション、交渉

MARLの課題

マルチエージェント環境の実装

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
import gymnasium as gym
from gymnasium import spaces

class SimpleMultiAgentEnv(gym.Env):
    """
    シンプルなマルチエージェント環境

    Task: 複数エージェントが目標地点に到達
    - エージェントは2D空間を移動
    - 目標地点に近づくと正の報酬
    - エージェント同士が近すぎると負の報酬(衝突回避)
    - 協調的タスク(共通報酬)
    """

    def __init__(self, n_agents=3, grid_size=10, max_steps=50):
        """
        Args:
            n_agents: エージェント数
            grid_size: グリッドサイズ
            max_steps: 最大ステップ数
        """
        super(SimpleMultiAgentEnv, self).__init__()

        self.n_agents = n_agents
        self.grid_size = grid_size
        self.max_steps = max_steps

        # 行動空間: 上下左右の4方向
        self.action_space = spaces.Discrete(4)

        # 観測空間: [自分のx, y, 目標までのx距離, y距離, 他エージェントとの相対位置...]
        obs_dim = 2 + 2 + (n_agents - 1) * 2
        self.observation_space = spaces.Box(
            low=-grid_size, high=grid_size,
            shape=(obs_dim,), dtype=np.float32
        )

        self.agent_positions = None
        self.goal_position = None
        self.current_step = 0

    def reset(self, seed=None):
        """環境リセット"""
        super().reset(seed=seed)

        # エージェントをランダム配置
        self.agent_positions = np.random.rand(self.n_agents, 2) * self.grid_size

        # 目標をランダム配置
        self.goal_position = np.random.rand(2) * self.grid_size

        self.current_step = 0

        return self._get_observations(), {}

    def step(self, actions):
        """
        環境ステップ

        Args:
            actions: 各エージェントの行動リスト

        Returns:
            observations: 各エージェントの観測
            rewards: 各エージェントの報酬
            terminated: 終了フラグ
            truncated: 打ち切りフラグ
            info: 追加情報
        """
        # 行動を適用(上下左右移動)
        for i, action in enumerate(actions):
            if action == 0:  # 上
                self.agent_positions[i, 1] = min(self.grid_size, self.agent_positions[i, 1] + 0.5)
            elif action == 1:  # 下
                self.agent_positions[i, 1] = max(0, self.agent_positions[i, 1] - 0.5)
            elif action == 2:  # 右
                self.agent_positions[i, 0] = min(self.grid_size, self.agent_positions[i, 0] + 0.5)
            elif action == 3:  # 左
                self.agent_positions[i, 0] = max(0, self.agent_positions[i, 0] - 0.5)

        # 報酬計算
        rewards = self._compute_rewards()

        # 終了判定
        self.current_step += 1
        terminated = self._is_done()
        truncated = self.current_step >= self.max_steps

        observations = self._get_observations()

        return observations, rewards, terminated, truncated, {}

    def _get_observations(self):
        """各エージェントの観測を取得"""
        observations = []

        for i in range(self.n_agents):
            obs = []

            # 自分の位置
            obs.extend(self.agent_positions[i])

            # 目標までの距離
            obs.extend(self.goal_position - self.agent_positions[i])

            # 他エージェントとの相対位置
            for j in range(self.n_agents):
                if i != j:
                    obs.extend(self.agent_positions[j] - self.agent_positions[i])

            observations.append(np.array(obs, dtype=np.float32))

        return observations

    def _compute_rewards(self):
        """報酬計算"""
        rewards = []

        for i in range(self.n_agents):
            reward = 0

            # 目標への距離に基づく報酬
            dist_to_goal = np.linalg.norm(self.agent_positions[i] - self.goal_position)
            reward -= dist_to_goal * 0.1

            # 目標到達ボーナス
            if dist_to_goal < 0.5:
                reward += 10.0

            # 衝突回避ペナルティ
            for j in range(self.n_agents):
                if i != j:
                    dist_to_agent = np.linalg.norm(
                        self.agent_positions[i] - self.agent_positions[j]
                    )
                    if dist_to_agent < 1.0:
                        reward -= 2.0

            rewards.append(reward)

        return rewards

    def _is_done(self):
        """全エージェントが目標に到達したか"""
        for i in range(self.n_agents):
            dist = np.linalg.norm(self.agent_positions[i] - self.goal_position)
            if dist >= 0.5:
                return False
        return True

    def render(self):
        """環境の可視化"""
        plt.figure(figsize=(8, 8))
        plt.xlim(0, self.grid_size)
        plt.ylim(0, self.grid_size)

        # 目標を描画
        goal_circle = Circle(self.goal_position, 0.5, color='gold', alpha=0.6, label='Goal')
        plt.gca().add_patch(goal_circle)

        # エージェントを描画
        colors = ['blue', 'red', 'green', 'purple', 'orange']
        for i in range(self.n_agents):
            agent_circle = Circle(
                self.agent_positions[i], 0.3,
                color=colors[i % len(colors)],
                alpha=0.8,
                label=f'Agent {i+1}'
            )
            plt.gca().add_patch(agent_circle)

        plt.legend()
        plt.title(f'Multi-Agent Environment (Step {self.current_step})')
        plt.grid(True, alpha=0.3)
        plt.show()


class IndependentQLearning:
    """
    Independent Q-Learning for MARL
    各エージェントが独立してQ学習を実行
    """

    def __init__(self, n_agents, state_dim, n_actions,
                 lr=0.1, gamma=0.99, epsilon=0.1):
        """
        Args:
            n_agents: エージェント数
            state_dim: 状態空間の次元
            n_actions: 行動数
            lr: 学習率
            gamma: 割引率
            epsilon: ε-greedy探索率
        """
        self.n_agents = n_agents
        self.n_actions = n_actions
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon

        # 各エージェント用のQ-table(簡易版: 離散化)
        # 実際は関数近似(ニューラルネット)を使用
        self.q_tables = [
            np.zeros((100, n_actions)) for _ in range(n_agents)
        ]

    def select_actions(self, observations):
        """ε-greedy行動選択"""
        actions = []

        for i in range(self.n_agents):
            if np.random.rand() < self.epsilon:
                action = np.random.randint(self.n_actions)
            else:
                # 観測を離散化(簡易版)
                state_idx = self._discretize_state(observations[i])
                action = np.argmax(self.q_tables[i][state_idx])

            actions.append(action)

        return actions

    def update(self, observations, actions, rewards, next_observations, done):
        """Q値更新"""
        for i in range(self.n_agents):
            state_idx = self._discretize_state(observations[i])
            next_state_idx = self._discretize_state(next_observations[i])

            # Q学習更新
            target = rewards[i]
            if not done:
                target += self.gamma * np.max(self.q_tables[i][next_state_idx])

            self.q_tables[i][state_idx, actions[i]] += self.lr * (
                target - self.q_tables[i][state_idx, actions[i]]
            )

    def _discretize_state(self, observation):
        """観測を離散化(簡易版)"""
        # 実際は状態をハッシュ化または関数近似を使用
        return int(np.sum(np.abs(observation)) * 10) % 100


# MARL訓練例
def train_marl():
    """マルチエージェント環境での訓練"""
    env = SimpleMultiAgentEnv(n_agents=3, grid_size=10)
    agent_controller = IndependentQLearning(
        n_agents=3,
        state_dim=env.observation_space.shape[0],
        n_actions=4
    )

    num_episodes = 100

    for episode in range(num_episodes):
        observations, _ = env.reset()
        done = False
        episode_reward = 0

        while not done:
            # 行動選択
            actions = agent_controller.select_actions(observations)

            # 環境ステップ
            next_observations, rewards, terminated, truncated, _ = env.step(actions)
            done = terminated or truncated

            # 更新
            agent_controller.update(observations, actions, rewards, next_observations, done)

            episode_reward += sum(rewards)
            observations = next_observations

        if episode % 10 == 0:
            print(f"Episode {episode}, Total Reward: {episode_reward:.2f}")

    # 最終エピソードの可視化
    observations, _ = env.reset()
    env.render()

    return agent_controller


if __name__ == "__main__":
    print("Multi-Agent RL Training")
    print("=" * 50)
    controller = train_marl()
    print("Training completed!")

MARLの実装ポイント: Independent Q-Learningは最もシンプルなMARLアプローチで、各エージェントが独立して学習します。より高度な手法にはQMIX(中央集権的訓練・分散実行)、MADDPG(Multi-Agent DDPG)などがあります。協調タスクでは報酬共有が有効で、通信機構を導入すると性能が向上します。


5.4 モデルベース強化学習 (Model-Based RL)

モデルベースRLの概要

モデルベース強化学習は、環境のダイナミクスモデル(遷移関数と報酬関数)を学習し、そのモデルを使って方策を最適化します。モデルフリー手法と比べてサンプル効率が高いのが特徴です。

graph LR ENV[実環境] --> |経験 s,a,r,s'| MD[モデル学習
P̂s'|s,a, R̂s,a] MD --> |学習済みモデル| PLAN[プランニング
シミュレーション] PLAN --> |方策改善| POL[方策 π] POL --> |行動 a| ENV PLAN -.-> |想像上の経験| MB[モデルベース更新] ENV -.-> |実経験| MF[モデルフリー更新] MB --> POL MF --> POL style ENV fill:#e3f2fd style MD fill:#fff9c4 style PLAN fill:#c8e6c9 style POL fill:#ffccbc

モデルベース vs モデルフリー

側面 モデルベース モデルフリー
サンプル効率 高い(モデルで補完) 低い(多くの経験が必要)
計算コスト 高い(モデル学習+プランニング) 低い(直接方策学習)
適用難易度 難しい(モデル誤差の影響) 容易(直接学習)
解釈性 高い(モデルで予測可能) 低い(ブラックボックス)

主要アプローチ

環境モデルは以下を学習します:

$$ \hat{P}(s' | s, a) \approx P(s' | s, a) $$ $$ \hat{R}(s, a) \approx R(s, a) $$

学習したモデルを使ってシミュレーションし、多くの仮想経験を生成します。

モデルベースRLのポイント: モデル誤差が累積すると性能が悪化するため、不確実性推定とモデルの適切な使用が重要です。実環境とモデル環境のデータをバランス良く使うことで、サンプル効率と性能を両立できます。


5.5 実世界応用

5.5.1 ロボティクス (Robotics)

強化学習はロボットの制御、操作、ナビゲーションに広く応用されています。

主要応用分野

5.5.2 ゲームAI (Game AI)

強化学習は複雑なゲームで人間レベル以上の性能を達成しています。

代表的な成功例

システム ゲーム 手法
AlphaGo 囲碁 MCTS + Deep RL
AlphaStar StarCraft II Multi-agent RL
OpenAI Five Dota 2 PPO + 大規模分散学習
MuZero チェス、将棋、Atari Model-based RL + MCTS

5.5.3 金融トレーディング (Trading)

強化学習は自動トレーディング、ポートフォリオ最適化に応用されています。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import deque

class TradingEnvironment:
    """
    株式トレーディング環境

    Features:
    - 過去の価格履歴から行動決定
    - 取引コストを考慮
    - 保有ポジション管理
    """

    def __init__(self, price_data, initial_balance=10000,
                 transaction_cost=0.001, window_size=20):
        """
        Args:
            price_data: 価格データ (DataFrame)
            initial_balance: 初期資金
            transaction_cost: 取引コスト(片道)
            window_size: 観測する過去ウィンドウサイズ
        """
        self.price_data = price_data
        self.initial_balance = initial_balance
        self.transaction_cost = transaction_cost
        self.window_size = window_size

        self.reset()

    def reset(self):
        """環境リセット"""
        self.current_step = self.window_size
        self.balance = self.initial_balance
        self.shares_held = 0
        self.net_worth = self.initial_balance
        self.max_net_worth = self.initial_balance

        return self._get_observation()

    def _get_observation(self):
        """
        観測取得

        Returns:
            observation: [価格履歴, 保有株数, 残高] の正規化版
        """
        # 過去window_sizeステップの価格変化率
        window_data = self.price_data.iloc[
            self.current_step - self.window_size:self.current_step
        ]['Close'].pct_change().fillna(0).values

        # ポートフォリオ状態
        portfolio_state = np.array([
            self.shares_held / 100,  # 正規化
            self.balance / self.initial_balance  # 正規化
        ])

        observation = np.concatenate([window_data, portfolio_state])
        return observation

    def step(self, action):
        """
        環境ステップ

        Args:
            action: 0=Hold, 1=Buy, 2=Sell

        Returns:
            observation: 次状態
            reward: 報酬
            done: 終了フラグ
            info: 追加情報
        """
        current_price = self.price_data.iloc[self.current_step]['Close']

        # 行動実行
        if action == 1:  # Buy
            shares_to_buy = self.balance // current_price
            cost = shares_to_buy * current_price * (1 + self.transaction_cost)

            if cost <= self.balance:
                self.shares_held += shares_to_buy
                self.balance -= cost

        elif action == 2:  # Sell
            if self.shares_held > 0:
                proceeds = self.shares_held * current_price * (1 - self.transaction_cost)
                self.balance += proceeds
                self.shares_held = 0

        # ステップ進行
        self.current_step += 1

        # 純資産計算
        self.net_worth = self.balance + self.shares_held * current_price
        self.max_net_worth = max(self.max_net_worth, self.net_worth)

        # 報酬: 純資産の変化率
        reward = (self.net_worth - self.initial_balance) / self.initial_balance

        # 終了判定
        done = self.current_step >= len(self.price_data) - 1

        observation = self._get_observation()
        info = {
            'net_worth': self.net_worth,
            'shares_held': self.shares_held,
            'balance': self.balance
        }

        return observation, reward, done, info


class DQNTrader:
    """
    DQNベースのトレーディングエージェント
    """

    def __init__(self, state_dim, n_actions=3, lr=0.001, gamma=0.95):
        import torch
        import torch.nn as nn

        self.state_dim = state_dim
        self.n_actions = n_actions
        self.gamma = gamma

        # Q-Network
        self.q_network = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )

        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)
        self.memory = deque(maxlen=2000)
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01

    def select_action(self, state, training=True):
        """ε-greedy行動選択"""
        import torch

        if training and np.random.rand() < self.epsilon:
            return np.random.randint(self.n_actions)

        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.q_network(state_tensor)
            return q_values.argmax().item()

    def train(self, batch_size=32):
        """DQN更新"""
        import torch
        import torch.nn.functional as F

        if len(self.memory) < batch_size:
            return

        # ミニバッチサンプリング
        batch = np.array(self.memory, dtype=object)
        indices = np.random.choice(len(batch), batch_size, replace=False)
        samples = batch[indices]

        states = torch.FloatTensor(np.vstack([s[0] for s in samples]))
        actions = torch.LongTensor([s[1] for s in samples])
        rewards = torch.FloatTensor([s[2] for s in samples])
        next_states = torch.FloatTensor(np.vstack([s[3] for s in samples]))
        dones = torch.FloatTensor([s[4] for s in samples])

        # Q値計算
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1))

        with torch.no_grad():
            max_next_q = self.q_network(next_states).max(1)[0]
            target_q = rewards + (1 - dones) * self.gamma * max_next_q

        # 損失計算と更新
        loss = F.mse_loss(current_q.squeeze(), target_q)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # ε減衰
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)


# トレーディングボット訓練例
def train_trading_bot():
    """
    株式トレーディングボットの訓練
    (デモ用: ランダムウォーク価格データ使用)
    """
    # デモ用価格データ生成
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', periods=500)
    prices = 100 * np.exp(np.cumsum(np.random.randn(500) * 0.02))
    price_data = pd.DataFrame({'Close': prices}, index=dates)

    # 環境とエージェント初期化
    env = TradingEnvironment(price_data, window_size=20)
    obs = env.reset()
    agent = DQNTrader(state_dim=len(obs))

    num_episodes = 50

    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False

        while not done:
            # 行動選択
            action = agent.select_action(state, training=True)

            # 環境ステップ
            next_state, reward, done, info = env.step(action)

            # 経験保存
            agent.memory.append((state, action, reward, next_state, done))

            # 訓練
            agent.train(batch_size=32)

            total_reward += reward
            state = next_state

        if episode % 10 == 0:
            print(f"Episode {episode}, Total Reward: {total_reward:.4f}, "
                  f"Final Net Worth: ${info['net_worth']:.2f}, "
                  f"Epsilon: {agent.epsilon:.3f}")

    # 最終評価
    state = env.reset()
    done = False
    actions_taken = []
    net_worths = []

    while not done:
        action = agent.select_action(state, training=False)
        actions_taken.append(action)
        state, reward, done, info = env.step(action)
        net_worths.append(info['net_worth'])

    # 可視化
    plt.figure(figsize=(14, 6))

    plt.subplot(1, 2, 1)
    plt.plot(price_data.index[-len(net_worths):],
             price_data['Close'].iloc[-len(net_worths):],
             label='Stock Price', alpha=0.7)
    plt.title('Stock Price Over Time')
    plt.xlabel('Date')
    plt.ylabel('Price ($)')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.subplot(1, 2, 2)
    plt.plot(net_worths, label='Portfolio Net Worth', color='green')
    plt.axhline(y=env.initial_balance, color='r', linestyle='--',
                label='Initial Balance', alpha=0.7)
    plt.title('Portfolio Performance')
    plt.xlabel('Time Step')
    plt.ylabel('Net Worth ($)')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig('trading_bot_performance.png', dpi=150, bbox_inches='tight')
    print("Performance chart saved as 'trading_bot_performance.png'")

    final_return = (net_worths[-1] - env.initial_balance) / env.initial_balance * 100
    print(f"\nFinal Return: {final_return:.2f}%")

    return agent, env


if __name__ == "__main__":
    print("RL Trading Bot Training")
    print("=" * 50)
    agent, env = train_trading_bot()
    print("Training completed!")

トレーディングへの応用ポイント: 取引コスト、スリッページ、市場インパクトを考慮することが重要です。過去データでの過学習を避けるため、複数の時期でバックテストを行います。実際の運用では、リスク管理(ポジションサイズ制限、ストップロス)を組み込む必要があります。


5.6 Stable-Baselines3による実践

Stable-Baselines3の概要

Stable-Baselines3 (SB3)は、信頼性の高いRL実装を提供するPythonライブラリです。最新アルゴリズムの実装が充実しており、実践的なRLプロジェクトに最適です。

SB3の主要アルゴリズム

アルゴリズム タイプ 適用場面
PPO On-policy, Actor-Critic 汎用性が高い、安定
A2C On-policy, Actor-Critic 高速学習、並列化
SAC Off-policy, Max-Entropy 連続行動、サンプル効率
TD3 Off-policy, DDPG改良 連続行動、安定性
DQN Off-policy, Value-based 離散行動

Stable-Baselines3の実践例

"""
Stable-Baselines3を使った実践的なRL訓練
"""

# インストール(必要に応じて)
# !pip install stable-baselines3[extra]

import gymnasium as gym
from stable_baselines3 import PPO, SAC, DQN
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import numpy as np
import matplotlib.pyplot as plt

# === Example 1: PPO for CartPole ===
def train_ppo_cartpole():
    """
    PPOでCartPole環境を訓練

    Features:
    - Vectorized environment for parallel training
    - Evaluation callback for monitoring
    - Model checkpointing
    """
    print("Training PPO on CartPole-v1")
    print("=" * 50)

    # ベクトル化環境(並列訓練)
    env = make_vec_env('CartPole-v1', n_envs=4)

    # 評価用環境
    eval_env = gym.make('CartPole-v1')
    eval_env = Monitor(eval_env)

    # PPOモデル初期化
    model = PPO(
        'MlpPolicy',           # Multi-Layer Perceptron policy
        env,
        learning_rate=3e-4,
        n_steps=2048,          # ステップ数/更新
        batch_size=64,
        n_epochs=10,           # 更新エポック数
        gamma=0.99,
        gae_lambda=0.95,       # GAE parameter
        clip_range=0.2,        # PPO clipping
        verbose=1,
        tensorboard_log="./ppo_cartpole_tensorboard/"
    )

    # コールバック設定
    eval_callback = EvalCallback(
        eval_env,
        best_model_save_path='./logs/best_model',
        log_path='./logs/',
        eval_freq=10000,
        deterministic=True,
        render=False
    )

    checkpoint_callback = CheckpointCallback(
        save_freq=10000,
        save_path='./logs/checkpoints/',
        name_prefix='ppo_cartpole'
    )

    # 訓練実行
    model.learn(
        total_timesteps=100000,
        callback=[eval_callback, checkpoint_callback]
    )

    # 評価
    mean_reward, std_reward = evaluate_policy(
        model, eval_env, n_eval_episodes=10
    )
    print(f"\nEvaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")

    # モデル保存
    model.save("ppo_cartpole_final")

    return model


# === Example 2: SAC for Continuous Control ===
def train_sac_pendulum():
    """
    SACでPendulum環境を訓練(連続行動空間)

    Features:
    - Maximum entropy RL
    - Off-policy learning
    - Automatic temperature tuning
    """
    print("\nTraining SAC on Pendulum-v1")
    print("=" * 50)

    # 環境作成
    env = gym.make('Pendulum-v1')

    # SACモデル
    model = SAC(
        'MlpPolicy',
        env,
        learning_rate=3e-4,
        buffer_size=100000,
        learning_starts=1000,
        batch_size=256,
        tau=0.005,             # Soft update coefficient
        gamma=0.99,
        train_freq=1,
        gradient_steps=1,
        ent_coef='auto',       # Automatic entropy tuning
        verbose=1,
        tensorboard_log="./sac_pendulum_tensorboard/"
    )

    # 訓練
    model.learn(total_timesteps=50000)

    # 評価
    mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
    print(f"Evaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")

    # モデル保存
    model.save("sac_pendulum_final")

    return model


# === Example 3: Custom Environment with SB3 ===
class CustomGridWorld(gym.Env):
    """
    カスタムグリッドワールド環境
    SB3互換のGym環境
    """

    def __init__(self, grid_size=5):
        super(CustomGridWorld, self).__init__()

        self.grid_size = grid_size
        self.agent_pos = [0, 0]
        self.goal_pos = [grid_size - 1, grid_size - 1]

        # 行動空間: 上下左右
        self.action_space = gym.spaces.Discrete(4)

        # 観測空間: エージェント位置(正規化)
        self.observation_space = gym.spaces.Box(
            low=0, high=1, shape=(2,), dtype=np.float32
        )

    def reset(self, seed=None):
        super().reset(seed=seed)
        self.agent_pos = [0, 0]
        return self._get_obs(), {}

    def _get_obs(self):
        return np.array(self.agent_pos, dtype=np.float32) / self.grid_size

    def step(self, action):
        # 行動実行
        if action == 0 and self.agent_pos[1] < self.grid_size - 1:  # Up
            self.agent_pos[1] += 1
        elif action == 1 and self.agent_pos[1] > 0:  # Down
            self.agent_pos[1] -= 1
        elif action == 2 and self.agent_pos[0] < self.grid_size - 1:  # Right
            self.agent_pos[0] += 1
        elif action == 3 and self.agent_pos[0] > 0:  # Left
            self.agent_pos[0] -= 1

        # 報酬計算
        if self.agent_pos == self.goal_pos:
            reward = 1.0
            done = True
        else:
            reward = -0.01
            done = False

        return self._get_obs(), reward, done, False, {}


def train_custom_env():
    """カスタム環境でDQNを訓練"""
    print("\nTraining DQN on Custom GridWorld")
    print("=" * 50)

    # カスタム環境
    env = CustomGridWorld(grid_size=5)

    # DQNモデル
    model = DQN(
        'MlpPolicy',
        env,
        learning_rate=1e-3,
        buffer_size=10000,
        learning_starts=1000,
        batch_size=32,
        gamma=0.99,
        exploration_fraction=0.1,
        exploration_final_eps=0.02,
        verbose=1
    )

    # 訓練
    model.learn(total_timesteps=50000)

    # 評価
    mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
    print(f"Evaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")

    return model


# === Example 4: Loading and Using Trained Model ===
def use_trained_model():
    """訓練済みモデルの読み込みと使用"""
    print("\nUsing Trained Model")
    print("=" * 50)

    # モデル読み込み
    model = PPO.load("ppo_cartpole_final")

    # 環境で実行
    env = gym.make('CartPole-v1', render_mode='rgb_array')

    obs, _ = env.reset()
    total_reward = 0

    for _ in range(500):
        # 決定的行動選択
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = env.step(action)
        total_reward += reward

        if terminated or truncated:
            break

    print(f"Episode reward: {total_reward}")
    env.close()


# === Example 5: Hyperparameter Tuning with Optuna ===
def hyperparameter_tuning():
    """
    Optunaを使ったハイパーパラメータチューニング
    (オプション: optunaインストール必要)
    """
    try:
        from stable_baselines3.common.env_util import make_vec_env
        import optuna
        from optuna.pruners import MedianPruner
        from optuna.samplers import TPESampler

        def objective(trial):
            """Optuna目的関数"""
            # ハイパーパラメータ提案
            lr = trial.suggest_loguniform('lr', 1e-5, 1e-3)
            gamma = trial.suggest_uniform('gamma', 0.9, 0.9999)
            clip_range = trial.suggest_uniform('clip_range', 0.1, 0.4)

            # 環境とモデル
            env = make_vec_env('CartPole-v1', n_envs=4)
            model = PPO(
                'MlpPolicy', env,
                learning_rate=lr,
                gamma=gamma,
                clip_range=clip_range,
                verbose=0
            )

            # 訓練
            model.learn(total_timesteps=20000)

            # 評価
            eval_env = gym.make('CartPole-v1')
            mean_reward, _ = evaluate_policy(model, eval_env, n_eval_episodes=5)

            return mean_reward

        # Optuna study
        study = optuna.create_study(
            direction='maximize',
            sampler=TPESampler(),
            pruner=MedianPruner()
        )

        study.optimize(objective, n_trials=20, timeout=600)

        print("\nBest hyperparameters:")
        print(study.best_params)
        print(f"Best value: {study.best_value:.2f}")

    except ImportError:
        print("Optuna not installed. Skipping hyperparameter tuning.")
        print("Install with: pip install optuna")


# メイン実行
if __name__ == "__main__":
    print("Stable-Baselines3 Practical Examples")
    print("=" * 50)

    # Example 1: PPO
    ppo_model = train_ppo_cartpole()

    # Example 2: SAC
    sac_model = train_sac_pendulum()

    # Example 3: Custom Environment
    custom_model = train_custom_env()

    # Example 4: Using trained model
    use_trained_model()

    # Example 5: Hyperparameter tuning (optional)
    # hyperparameter_tuning()

    print("\n" + "=" * 50)
    print("All examples completed!")
    print("Tensorboard logs saved. View with:")
    print("  tensorboard --logdir ./ppo_cartpole_tensorboard/")

SB3の実践ポイント: ベクトル化環境により訓練が高速化され、コールバックにより訓練中の監視・評価が容易になります。TensorBoardログで学習曲線を可視化でき、ハイパーパラメータチューニングにはOptunaが有効です。カスタム環境はGym APIに準拠すれば簡単に統合できます。


5.7 実世界適用の課題と解決策

主要な課題

課題 説明 解決策
サンプル効率 実環境での学習は時間・コストがかかる シミュレーションでの事前学習、モデルベースRL、転移学習
安全性 学習中の失敗が危険 Safe RL、シミュレーションでの検証、人間の監視
Sim-to-Real Gap シミュレーションと実環境の差 Domain Randomization、現実性の高いシミュレータ
部分観測 完全な状態が観測できない LSTM/Transformer、信念状態の使用
報酬設計 適切な報酬関数の設計が難しい 逆強化学習、模倣学習、報酬シェーピング
汎化性能 訓練環境外での性能低下 多様な訓練データ、Meta-RL、ドメイン適応

ベストプラクティス

  1. 段階的アプローチ: シミュレーション → Sim-to-Real → 実環境
  2. モデル検証: 複数の評価指標、異なる環境設定でテスト
  3. 人間の知識活用: 模倣学習、事前学習、報酬シェーピング
  4. 安全性の確保: 制約付きRL、フェイルセーフ機構
  5. 継続的学習: オンライン学習、適応的方策

まとめ

この章では、以下の高度なRL手法と応用を学びました:

次のステップ

  1. 実践プロジェクト: Stable-Baselines3で独自の環境を作成
  2. 論文読解: 最新のRL論文を読み、実装してみる
  3. コンペティション: Kaggle RLコンペ、OpenAI Gym Leaderboard
  4. 応用分野探索: 自動運転、ヘルスケア、エネルギー管理など

参考リソース


演習問題

演習5.1: A3Cの並列化実装

問題: Pythonのmultiprocessingを使って、完全な並列A3Cを実装してください。

ヒント:

演習5.2: SACの温度パラメータ分析

問題: SACの温度パラメータ$\alpha$を固定値と自動調整で比較し、学習曲線と最終性能の違いを分析してください。

ヒント:

演習5.3: マルチエージェント協調タスク

問題: 3つのエージェントが協調して目標を運ぶタスクを実装してください。1つのエージェントだけでは運べない重い物体を、複数エージェントで協力して目標地点まで運びます。

ヒント:

演習5.4: トレーディングボットの改良

問題: 提供されたトレーディングボットに以下の機能を追加してください:

ヒント:

演習5.5: Stable-Baselines3でのカスタムコールバック

問題: Stable-Baselines3のカスタムコールバックを作成し、訓練中に以下を実行してください:

ヒント:


免責事項