学習目標
この章を読むことで、以下を習得できます:
- ✅ A3Cの並列学習メカニズムを理解し、概念実装ができる
- ✅ SACのアクター・クリティック構造を実装できる
- ✅ マルチエージェント強化学習の基本を理解できる
- ✅ モデルベース強化学習の原理を理解できる
- ✅ ロボティクス、ゲームAI、トレーディングへの応用を実装できる
- ✅ Stable-Baselines3を使った実践的なRLプロジェクトを構築できる
- ✅ 実世界への適用における課題と解決策を理解できる
5.1 A3C (Asynchronous Advantage Actor-Critic)
A3Cの概要
A3C (Asynchronous Advantage Actor-Critic)は、DeepMindが2016年に提案した並列学習アルゴリズムです。複数のワーカーが非同期的に環境と相互作用し、グローバルネットワークを更新することで、高速かつ安定した学習を実現します。
共有パラメータ θ] W1[Worker 1
環境コピー 1] W2[Worker 2
環境コピー 2] W3[Worker 3
環境コピー 3] Wn[Worker N
環境コピー N] W1 -->|勾配更新| GN W2 -->|勾配更新| GN W3 -->|勾配更新| GN Wn -->|勾配更新| GN GN -->|パラメータ同期| W1 GN -->|パラメータ同期| W2 GN -->|パラメータ同期| W3 GN -->|パラメータ同期| Wn style GN fill:#e3f2fd style W1 fill:#c8e6c9 style W2 fill:#c8e6c9 style W3 fill:#c8e6c9 style Wn fill:#c8e6c9
A3Cの主要コンポーネント
| コンポーネント | 説明 | 特徴 |
|---|---|---|
| 非同期更新 | 各ワーカーが独立して学習 | Experience Replay不要、メモリ効率的 |
| Advantage関数 | $A(s, a) = Q(s, a) - V(s)$ | 分散減少、安定した学習 |
| エントロピー正則化 | 探索を促進 | 早期収束を防ぐ |
| 並列実行 | 複数環境で同時学習 | 学習速度向上、多様なデータ |
A3Cのアルゴリズム
各ワーカーは以下の手順を繰り返します:
- パラメータ同期: グローバルネットワークからパラメータをコピー $\theta' \leftarrow \theta$
- 経験収集: $t_{\text{max}}$ ステップまたは終端まで $(s_t, a_t, r_t)$ を収集
- リターン計算: $n$ステップリターン $R_t = \sum_{i=0}^{n-1} \gamma^i r_{t+i} + \gamma^n V(s_{t+n})$
- 勾配計算: アクターとクリティックの損失を計算
- 非同期更新: グローバルネットワークを更新
損失関数は以下の通りです:
$$ \mathcal{L}_{\text{actor}} = -\log \pi(a_t | s_t; \theta) A_t - \beta H(\pi(\cdot | s_t; \theta)) $$ $$ \mathcal{L}_{\text{critic}} = (R_t - V(s_t; \theta))^2 $$ここで、$H(\pi)$はエントロピー、$\beta$はエントロピー正則化係数、$A_t = R_t - V(s_t)$はAdvantage推定値です。
A3Cの概念実装
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.multiprocessing as mp
from torch.distributions import Categorical
import gymnasium as gym
import numpy as np
class A3CNetwork(nn.Module):
"""
A3C用のアクター・クリティック共有ネットワーク
Architecture:
- 共有層: 特徴抽出
- アクター出力: 行動確率分布
- クリティック出力: 状態価値関数
"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
"""
Args:
state_dim: 状態空間の次元
action_dim: 行動空間の次元
hidden_dim: 隠れ層の次元
"""
super(A3CNetwork, self).__init__()
# 共有特徴抽出層
self.shared_fc1 = nn.Linear(state_dim, hidden_dim)
self.shared_fc2 = nn.Linear(hidden_dim, hidden_dim)
# アクター出力(行動確率)
self.actor_head = nn.Linear(hidden_dim, action_dim)
# クリティック出力(状態価値)
self.critic_head = nn.Linear(hidden_dim, 1)
def forward(self, state):
"""
前向き計算
Args:
state: 状態 (batch_size, state_dim)
Returns:
action_probs: 行動確率分布 (batch_size, action_dim)
state_value: 状態価値 (batch_size, 1)
"""
# 共有層
x = F.relu(self.shared_fc1(state))
x = F.relu(self.shared_fc2(x))
# アクター出力
action_logits = self.actor_head(x)
action_probs = F.softmax(action_logits, dim=-1)
# クリティック出力
state_value = self.critic_head(x)
return action_probs, state_value
class A3CWorker:
"""
A3Cワーカー: 独立した環境で学習し、グローバルネットワークを更新
Features:
- 非同期パラメータ更新
- n-step returns計算
- エントロピー正則化
"""
def __init__(self, worker_id, global_network, optimizer,
env_name='CartPole-v1', gamma=0.99,
max_steps=20, entropy_coef=0.01):
"""
Args:
worker_id: ワーカーID
global_network: 共有グローバルネットワーク
optimizer: 共有オプティマイザー
env_name: 環境名
gamma: 割引率
max_steps: n-stepリターンのステップ数
entropy_coef: エントロピー正則化係数
"""
self.worker_id = worker_id
self.env = gym.make(env_name)
self.global_network = global_network
self.optimizer = optimizer
self.gamma = gamma
self.max_steps = max_steps
self.entropy_coef = entropy_coef
# ローカルネットワーク(グローバルと同じ構造)
state_dim = self.env.observation_space.shape[0]
action_dim = self.env.action_space.n
self.local_network = A3CNetwork(state_dim, action_dim)
def compute_returns(self, rewards, next_value, dones):
"""
n-stepリターンを計算
Args:
rewards: 報酬リスト
next_value: 最後の状態の価値推定
dones: 終端フラグリスト
Returns:
returns: 各ステップのリターン
"""
returns = []
R = next_value
# 逆順で計算
for r, done in zip(reversed(rewards), reversed(dones)):
R = r + self.gamma * R * (1 - done)
returns.insert(0, R)
return returns
def train_step(self):
"""
1エピソード分のトレーニング
Returns:
total_reward: エピソード合計報酬
"""
# グローバルネットワークからパラメータ同期
self.local_network.load_state_dict(self.global_network.state_dict())
state, _ = self.env.reset()
done = False
states, actions, rewards, dones, values = [], [], [], [], []
episode_reward = 0
while not done:
# 行動選択
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs, value = self.local_network(state_tensor)
dist = Categorical(action_probs)
action = dist.sample()
# 環境ステップ
next_state, reward, terminated, truncated, _ = self.env.step(action.item())
done = terminated or truncated
# 経験を保存
states.append(state)
actions.append(action)
rewards.append(reward)
dones.append(done)
values.append(value)
episode_reward += reward
state = next_state
# max_stepsごとまたは終端で更新
if len(states) >= self.max_steps or done:
# 次状態の価値推定
if done:
next_value = 0
else:
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
_, next_value = self.local_network(next_state_tensor)
next_value = next_value.item()
# リターン計算
returns = self.compute_returns(rewards, next_value, dones)
# 損失計算
self._update_global_network(states, actions, returns, values)
# バッファクリア
states, actions, rewards, dones, values = [], [], [], [], []
return episode_reward
def _update_global_network(self, states, actions, returns, values):
"""
グローバルネットワークを更新
Args:
states: 状態リスト
actions: 行動リスト
returns: リターンリスト
values: 価値推定リスト
"""
states_tensor = torch.FloatTensor(states)
actions_tensor = torch.LongTensor(actions)
returns_tensor = torch.FloatTensor(returns)
# 再計算
action_probs, state_values = self.local_network(states_tensor)
state_values = state_values.squeeze()
# Advantage計算
advantages = returns_tensor - state_values.detach()
# アクター損失(Policy Gradient + Entropy)
dist = Categorical(action_probs)
log_probs = dist.log_prob(actions_tensor)
entropy = dist.entropy().mean()
actor_loss = -(log_probs * advantages).mean() - self.entropy_coef * entropy
# クリティック損失(MSE)
critic_loss = F.mse_loss(state_values, returns_tensor)
# 合計損失
total_loss = actor_loss + critic_loss
# グローバルネットワーク更新
self.optimizer.zero_grad()
total_loss.backward()
# 勾配クリッピング
torch.nn.utils.clip_grad_norm_(self.local_network.parameters(), 40)
# グローバルネットワークに勾配を転送
for local_param, global_param in zip(
self.local_network.parameters(),
self.global_network.parameters()
):
if global_param.grad is not None:
return # 他のワーカーが更新中
global_param._grad = local_param.grad
self.optimizer.step()
def worker_process(worker_id, global_network, optimizer, num_episodes=100):
"""
ワーカープロセス関数(並列実行用)
Args:
worker_id: ワーカーID
global_network: グローバルネットワーク
optimizer: 共有オプティマイザー
num_episodes: エピソード数
"""
worker = A3CWorker(worker_id, global_network, optimizer)
for episode in range(num_episodes):
reward = worker.train_step()
if episode % 10 == 0:
print(f"Worker {worker_id} - Episode {episode}, Reward: {reward:.2f}")
# A3C訓練例(シングルプロセス版 - 概念実証用)
def train_a3c_simple():
"""
A3C訓練の簡易版(並列処理なし)
実際のA3Cは multiprocessing を使用
"""
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
# グローバルネットワーク
global_network = A3CNetwork(state_dim, action_dim)
global_network.share_memory() # プロセス間共有用
optimizer = torch.optim.Adam(global_network.parameters(), lr=0.0001)
# 単一ワーカーでのトレーニング例
worker = A3CWorker(0, global_network, optimizer)
rewards = []
for episode in range(100):
reward = worker.train_step()
rewards.append(reward)
if episode % 10 == 0:
avg_reward = np.mean(rewards[-10:])
print(f"Episode {episode}, Avg Reward: {avg_reward:.2f}")
return global_network, rewards
# 実行例
if __name__ == "__main__":
print("A3C Training (Simple Version)")
print("=" * 50)
model, rewards = train_a3c_simple()
print(f"Training completed. Final avg reward: {np.mean(rewards[-10:]):.2f}")
A3Cの実装ポイント: 完全な並列版はPythonの
multiprocessingを使用しますが、上記は概念を示す簡易版です。実際のA3Cでは、複数のワーカープロセスが同時にグローバルネットワークを更新します。エントロピー正則化により探索が促進され、勾配クリッピングにより学習が安定します。
5.2 SAC (Soft Actor-Critic)
SACの概要
SAC (Soft Actor-Critic)は、最大エントロピー強化学習フレームワークに基づくオフポリシーアルゴリズムです。報酬最大化と探索のバランスを自動的に調整し、連続行動空間で優れた性能を発揮します。
確率的方策] S --> Q1[Q-Network 1
Q₁s,a] S --> Q2[Q-Network 2
Q₂s,a] S --> V[Value Network
Vs] A --> |行動 a| E[環境] Q1 --> |最小値| MIN[min Q] Q2 --> |最小値| MIN E --> |報酬 + エントロピー| R[最大化目標] MIN --> R V --> R style A fill:#e3f2fd style Q1 fill:#fff9c4 style Q2 fill:#fff9c4 style V fill:#c8e6c9 style R fill:#ffccbc
SACの主要特徴
| 特徴 | 説明 | 利点 |
|---|---|---|
| 最大エントロピー目標 | 報酬 + エントロピーを最大化 | 自動探索、ロバストな方策 |
| Double Q-Learning | 2つのQ-Networkで過大推定を防ぐ | 安定した学習 |
| Off-Policy | Experience Replayを使用 | サンプル効率が高い |
| 自動温度調整 | エントロピー係数αを学習 | ハイパーパラメータ調整不要 |
SACの目的関数
SACは以下の最大エントロピー目的を最適化します:
$$ J(\pi) = \mathbb{E}_{\tau \sim \pi} \left[ \sum_{t=0}^{\infty} \gamma^t (r(s_t, a_t) + \alpha \mathcal{H}(\pi(\cdot | s_t))) \right] $$ここで、$\mathcal{H}(\pi)$は方策のエントロピー、$\alpha$は温度パラメータです。
アクター更新(方策改善):
$$ \mathcal{L}_{\pi}(\theta) = \mathbb{E}_{s_t \sim \mathcal{D}} \left[ \mathbb{E}_{a_t \sim \pi_\theta} [\alpha \log \pi_\theta(a_t | s_t) - Q(s_t, a_t)] \right] $$クリティック更新(Bellman誤差最小化):
$$ \mathcal{L}_Q(\phi) = \mathbb{E}_{(s, a, r, s') \sim \mathcal{D}} \left[ (Q_\phi(s, a) - (r + \gamma V(s')))^2 \right] $$ここで、$V(s') = \mathbb{E}_{a' \sim \pi}[Q(s', a') - \alpha \log \pi(a' | s')]$です。
SACの実装
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np
from collections import deque
import random
class GaussianPolicy(nn.Module):
"""
SAC用のガウス方策ネットワーク
Architecture:
- 状態を入力
- 平均μと標準偏差σを出力
- Reparameterization Trickで微分可能な行動サンプリング
"""
def __init__(self, state_dim, action_dim, hidden_dim=256,
log_std_min=-20, log_std_max=2):
"""
Args:
state_dim: 状態空間の次元
action_dim: 行動空間の次元
hidden_dim: 隠れ層の次元
log_std_min: log標準偏差の最小値
log_std_max: log標準偏差の最大値
"""
super(GaussianPolicy, self).__init__()
self.log_std_min = log_std_min
self.log_std_max = log_std_max
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.mean = nn.Linear(hidden_dim, action_dim)
self.log_std = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
"""
前向き計算
Args:
state: 状態 (batch_size, state_dim)
Returns:
mean: 行動分布の平均
log_std: 行動分布のlog標準偏差
"""
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
mean = self.mean(x)
log_std = self.log_std(x)
log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)
return mean, log_std
def sample(self, state):
"""
Reparameterization Trickによる行動サンプリング
Args:
state: 状態
Returns:
action: サンプルされた行動(tanh squashing適用後)
log_prob: 行動の対数確率
"""
mean, log_std = self.forward(state)
std = log_std.exp()
# ガウス分布からサンプリング
normal = Normal(mean, std)
x_t = normal.rsample() # Reparameterization trick
# tanh squashingで[-1, 1]に制限
action = torch.tanh(x_t)
# 対数確率(tanh変換の補正含む)
log_prob = normal.log_prob(x_t)
log_prob -= torch.log(1 - action.pow(2) + 1e-6)
log_prob = log_prob.sum(1, keepdim=True)
return action, log_prob
class QNetwork(nn.Module):
"""
SAC用のQ-Network(状態-行動価値関数)
"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(QNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, 1)
def forward(self, state, action):
"""
Q値を計算
Args:
state: 状態 (batch_size, state_dim)
action: 行動 (batch_size, action_dim)
Returns:
q_value: Q値 (batch_size, 1)
"""
x = torch.cat([state, action], dim=1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
q_value = self.fc3(x)
return q_value
class ReplayBuffer:
"""Experience Replay Buffer"""
def __init__(self, capacity=100000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*batch)
return (
np.array(state),
np.array(action),
np.array(reward).reshape(-1, 1),
np.array(next_state),
np.array(done).reshape(-1, 1)
)
def __len__(self):
return len(self.buffer)
class SAC:
"""
Soft Actor-Critic Implementation
Features:
- Maximum entropy reinforcement learning
- Double Q-learning for stability
- Automatic temperature tuning
- Off-policy learning with replay buffer
"""
def __init__(self, state_dim, action_dim,
lr=3e-4, gamma=0.99, tau=0.005, alpha=0.2,
automatic_entropy_tuning=True):
"""
Args:
state_dim: 状態空間の次元
action_dim: 行動空間の次元
lr: 学習率
gamma: 割引率
tau: ターゲットネットワーク更新率
alpha: エントロピー係数(automatic_entropy_tuning=Falseの場合)
automatic_entropy_tuning: 自動温度調整を使用するか
"""
self.gamma = gamma
self.tau = tau
self.alpha = alpha
# ネットワーク初期化
self.policy = GaussianPolicy(state_dim, action_dim)
self.q_net1 = QNetwork(state_dim, action_dim)
self.q_net2 = QNetwork(state_dim, action_dim)
self.target_q_net1 = QNetwork(state_dim, action_dim)
self.target_q_net2 = QNetwork(state_dim, action_dim)
# ターゲットネットワークのパラメータをコピー
self.target_q_net1.load_state_dict(self.q_net1.state_dict())
self.target_q_net2.load_state_dict(self.q_net2.state_dict())
# オプティマイザ
self.policy_optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
self.q1_optimizer = torch.optim.Adam(self.q_net1.parameters(), lr=lr)
self.q2_optimizer = torch.optim.Adam(self.q_net2.parameters(), lr=lr)
# 自動温度調整
self.automatic_entropy_tuning = automatic_entropy_tuning
if automatic_entropy_tuning:
self.target_entropy = -action_dim
self.log_alpha = torch.zeros(1, requires_grad=True)
self.alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=lr)
self.replay_buffer = ReplayBuffer()
def select_action(self, state, evaluate=False):
"""
行動選択
Args:
state: 状態
evaluate: 評価モード(決定的行動)
Returns:
action: 選択された行動
"""
state = torch.FloatTensor(state).unsqueeze(0)
if evaluate:
with torch.no_grad():
mean, _ = self.policy(state)
action = torch.tanh(mean)
else:
with torch.no_grad():
action, _ = self.policy.sample(state)
return action.cpu().numpy()[0]
def update(self, batch_size=256):
"""
SAC更新ステップ
Args:
batch_size: バッチサイズ
"""
if len(self.replay_buffer) < batch_size:
return
# バッファからサンプリング
state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
state = torch.FloatTensor(state)
action = torch.FloatTensor(action)
reward = torch.FloatTensor(reward)
next_state = torch.FloatTensor(next_state)
done = torch.FloatTensor(done)
# --- Q-Network更新 ---
with torch.no_grad():
next_action, next_log_prob = self.policy.sample(next_state)
# Double Q-learning: 最小値を使用
target_q1 = self.target_q_net1(next_state, next_action)
target_q2 = self.target_q_net2(next_state, next_action)
target_q = torch.min(target_q1, target_q2)
# エントロピー項を含むターゲット
target_value = reward + (1 - done) * self.gamma * (
target_q - self.alpha * next_log_prob
)
# Q1損失
q1_value = self.q_net1(state, action)
q1_loss = F.mse_loss(q1_value, target_value)
# Q2損失
q2_value = self.q_net2(state, action)
q2_loss = F.mse_loss(q2_value, target_value)
# Q-Network更新
self.q1_optimizer.zero_grad()
q1_loss.backward()
self.q1_optimizer.step()
self.q2_optimizer.zero_grad()
q2_loss.backward()
self.q2_optimizer.step()
# --- Policy更新 ---
new_action, log_prob = self.policy.sample(state)
q1_new = self.q_net1(state, new_action)
q2_new = self.q_net2(state, new_action)
q_new = torch.min(q1_new, q2_new)
policy_loss = (self.alpha * log_prob - q_new).mean()
self.policy_optimizer.zero_grad()
policy_loss.backward()
self.policy_optimizer.step()
# --- 温度パラメータ更新(自動調整) ---
if self.automatic_entropy_tuning:
alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()
self.alpha_optimizer.zero_grad()
alpha_loss.backward()
self.alpha_optimizer.step()
self.alpha = self.log_alpha.exp().item()
# --- ターゲットネットワークのソフト更新 ---
self._soft_update(self.q_net1, self.target_q_net1)
self._soft_update(self.q_net2, self.target_q_net2)
def _soft_update(self, source, target):
"""
ターゲットネットワークのソフト更新
θ_target = τ * θ_source + (1 - τ) * θ_target
"""
for target_param, source_param in zip(target.parameters(), source.parameters()):
target_param.data.copy_(
self.tau * source_param.data + (1 - self.tau) * target_param.data
)
# SAC訓練例
def train_sac():
"""SAC訓練の実行例(Pendulum環境)"""
import gymnasium as gym
env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
agent = SAC(state_dim, action_dim)
num_episodes = 100
max_steps = 200
for episode in range(num_episodes):
state, _ = env.reset()
episode_reward = 0
for step in range(max_steps):
# 行動選択
action = agent.select_action(state)
# 環境ステップ
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
# バッファに保存
agent.replay_buffer.push(state, action, reward, next_state, done)
# 更新
agent.update()
episode_reward += reward
state = next_state
if done:
break
if episode % 10 == 0:
print(f"Episode {episode}, Reward: {episode_reward:.2f}, Alpha: {agent.alpha:.3f}")
return agent
if __name__ == "__main__":
print("SAC Training on Pendulum-v1")
print("=" * 50)
agent = train_sac()
print("Training completed!")
SACの実装ポイント: Reparameterization Trickにより方策が微分可能になり、効率的な勾配ベース最適化が可能です。Double Q-learningで過大推定を防ぎ、自動温度調整により探索と活用のバランスが自動的に最適化されます。tanh squashingにより行動が有界範囲に制限されます。
5.3 マルチエージェント強化学習 (Multi-Agent RL)
マルチエージェント強化学習の基本
マルチエージェント強化学習 (MARL)では、複数のエージェントが同じ環境で同時に学習・行動します。エージェント間の相互作用により、シングルエージェントRLとは異なる課題が生じます。
方策 π₁] A2[Agent 2
方策 π₂] A3[Agent 3
方策 π₃] A1 --> |行動 a₁| ENV A2 --> |行動 a₂| ENV A3 --> |行動 a₃| ENV ENV --> |観測 o₁, 報酬 r₁| A1 ENV --> |観測 o₂, 報酬 r₂| A2 ENV --> |観測 o₃, 報酬 r₃| A3 A1 -.-> |観測・通信| A2 A2 -.-> |観測・通信| A3 A3 -.-> |観測・通信| A1 style ENV fill:#e3f2fd style A1 fill:#c8e6c9 style A2 fill:#fff9c4 style A3 fill:#ffccbc
MARLの主要パラダイム
| パラダイム | 説明 | 用途 |
|---|---|---|
| Cooperative(協調) | 全エージェントが共通目標を共有 | チームスポーツ、協調ロボット |
| Competitive(競争) | エージェント間でゼロサム | ゲームAI、対戦型タスク |
| Mixed(混合) | 協調と競争の両方が存在 | 経済シミュレーション、交渉 |
MARLの課題
- 非定常性: 他エージェントの学習により環境が動的に変化
- 信用割当: 報酬を各エージェントに適切に割り当てる
- スケーラビリティ: エージェント数の増加に伴う計算量増大
- 通信: エージェント間の効果的な情報共有
マルチエージェント環境の実装
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
import gymnasium as gym
from gymnasium import spaces
class SimpleMultiAgentEnv(gym.Env):
"""
シンプルなマルチエージェント環境
Task: 複数エージェントが目標地点に到達
- エージェントは2D空間を移動
- 目標地点に近づくと正の報酬
- エージェント同士が近すぎると負の報酬(衝突回避)
- 協調的タスク(共通報酬)
"""
def __init__(self, n_agents=3, grid_size=10, max_steps=50):
"""
Args:
n_agents: エージェント数
grid_size: グリッドサイズ
max_steps: 最大ステップ数
"""
super(SimpleMultiAgentEnv, self).__init__()
self.n_agents = n_agents
self.grid_size = grid_size
self.max_steps = max_steps
# 行動空間: 上下左右の4方向
self.action_space = spaces.Discrete(4)
# 観測空間: [自分のx, y, 目標までのx距離, y距離, 他エージェントとの相対位置...]
obs_dim = 2 + 2 + (n_agents - 1) * 2
self.observation_space = spaces.Box(
low=-grid_size, high=grid_size,
shape=(obs_dim,), dtype=np.float32
)
self.agent_positions = None
self.goal_position = None
self.current_step = 0
def reset(self, seed=None):
"""環境リセット"""
super().reset(seed=seed)
# エージェントをランダム配置
self.agent_positions = np.random.rand(self.n_agents, 2) * self.grid_size
# 目標をランダム配置
self.goal_position = np.random.rand(2) * self.grid_size
self.current_step = 0
return self._get_observations(), {}
def step(self, actions):
"""
環境ステップ
Args:
actions: 各エージェントの行動リスト
Returns:
observations: 各エージェントの観測
rewards: 各エージェントの報酬
terminated: 終了フラグ
truncated: 打ち切りフラグ
info: 追加情報
"""
# 行動を適用(上下左右移動)
for i, action in enumerate(actions):
if action == 0: # 上
self.agent_positions[i, 1] = min(self.grid_size, self.agent_positions[i, 1] + 0.5)
elif action == 1: # 下
self.agent_positions[i, 1] = max(0, self.agent_positions[i, 1] - 0.5)
elif action == 2: # 右
self.agent_positions[i, 0] = min(self.grid_size, self.agent_positions[i, 0] + 0.5)
elif action == 3: # 左
self.agent_positions[i, 0] = max(0, self.agent_positions[i, 0] - 0.5)
# 報酬計算
rewards = self._compute_rewards()
# 終了判定
self.current_step += 1
terminated = self._is_done()
truncated = self.current_step >= self.max_steps
observations = self._get_observations()
return observations, rewards, terminated, truncated, {}
def _get_observations(self):
"""各エージェントの観測を取得"""
observations = []
for i in range(self.n_agents):
obs = []
# 自分の位置
obs.extend(self.agent_positions[i])
# 目標までの距離
obs.extend(self.goal_position - self.agent_positions[i])
# 他エージェントとの相対位置
for j in range(self.n_agents):
if i != j:
obs.extend(self.agent_positions[j] - self.agent_positions[i])
observations.append(np.array(obs, dtype=np.float32))
return observations
def _compute_rewards(self):
"""報酬計算"""
rewards = []
for i in range(self.n_agents):
reward = 0
# 目標への距離に基づく報酬
dist_to_goal = np.linalg.norm(self.agent_positions[i] - self.goal_position)
reward -= dist_to_goal * 0.1
# 目標到達ボーナス
if dist_to_goal < 0.5:
reward += 10.0
# 衝突回避ペナルティ
for j in range(self.n_agents):
if i != j:
dist_to_agent = np.linalg.norm(
self.agent_positions[i] - self.agent_positions[j]
)
if dist_to_agent < 1.0:
reward -= 2.0
rewards.append(reward)
return rewards
def _is_done(self):
"""全エージェントが目標に到達したか"""
for i in range(self.n_agents):
dist = np.linalg.norm(self.agent_positions[i] - self.goal_position)
if dist >= 0.5:
return False
return True
def render(self):
"""環境の可視化"""
plt.figure(figsize=(8, 8))
plt.xlim(0, self.grid_size)
plt.ylim(0, self.grid_size)
# 目標を描画
goal_circle = Circle(self.goal_position, 0.5, color='gold', alpha=0.6, label='Goal')
plt.gca().add_patch(goal_circle)
# エージェントを描画
colors = ['blue', 'red', 'green', 'purple', 'orange']
for i in range(self.n_agents):
agent_circle = Circle(
self.agent_positions[i], 0.3,
color=colors[i % len(colors)],
alpha=0.8,
label=f'Agent {i+1}'
)
plt.gca().add_patch(agent_circle)
plt.legend()
plt.title(f'Multi-Agent Environment (Step {self.current_step})')
plt.grid(True, alpha=0.3)
plt.show()
class IndependentQLearning:
"""
Independent Q-Learning for MARL
各エージェントが独立してQ学習を実行
"""
def __init__(self, n_agents, state_dim, n_actions,
lr=0.1, gamma=0.99, epsilon=0.1):
"""
Args:
n_agents: エージェント数
state_dim: 状態空間の次元
n_actions: 行動数
lr: 学習率
gamma: 割引率
epsilon: ε-greedy探索率
"""
self.n_agents = n_agents
self.n_actions = n_actions
self.lr = lr
self.gamma = gamma
self.epsilon = epsilon
# 各エージェント用のQ-table(簡易版: 離散化)
# 実際は関数近似(ニューラルネット)を使用
self.q_tables = [
np.zeros((100, n_actions)) for _ in range(n_agents)
]
def select_actions(self, observations):
"""ε-greedy行動選択"""
actions = []
for i in range(self.n_agents):
if np.random.rand() < self.epsilon:
action = np.random.randint(self.n_actions)
else:
# 観測を離散化(簡易版)
state_idx = self._discretize_state(observations[i])
action = np.argmax(self.q_tables[i][state_idx])
actions.append(action)
return actions
def update(self, observations, actions, rewards, next_observations, done):
"""Q値更新"""
for i in range(self.n_agents):
state_idx = self._discretize_state(observations[i])
next_state_idx = self._discretize_state(next_observations[i])
# Q学習更新
target = rewards[i]
if not done:
target += self.gamma * np.max(self.q_tables[i][next_state_idx])
self.q_tables[i][state_idx, actions[i]] += self.lr * (
target - self.q_tables[i][state_idx, actions[i]]
)
def _discretize_state(self, observation):
"""観測を離散化(簡易版)"""
# 実際は状態をハッシュ化または関数近似を使用
return int(np.sum(np.abs(observation)) * 10) % 100
# MARL訓練例
def train_marl():
"""マルチエージェント環境での訓練"""
env = SimpleMultiAgentEnv(n_agents=3, grid_size=10)
agent_controller = IndependentQLearning(
n_agents=3,
state_dim=env.observation_space.shape[0],
n_actions=4
)
num_episodes = 100
for episode in range(num_episodes):
observations, _ = env.reset()
done = False
episode_reward = 0
while not done:
# 行動選択
actions = agent_controller.select_actions(observations)
# 環境ステップ
next_observations, rewards, terminated, truncated, _ = env.step(actions)
done = terminated or truncated
# 更新
agent_controller.update(observations, actions, rewards, next_observations, done)
episode_reward += sum(rewards)
observations = next_observations
if episode % 10 == 0:
print(f"Episode {episode}, Total Reward: {episode_reward:.2f}")
# 最終エピソードの可視化
observations, _ = env.reset()
env.render()
return agent_controller
if __name__ == "__main__":
print("Multi-Agent RL Training")
print("=" * 50)
controller = train_marl()
print("Training completed!")
MARLの実装ポイント: Independent Q-Learningは最もシンプルなMARLアプローチで、各エージェントが独立して学習します。より高度な手法にはQMIX(中央集権的訓練・分散実行)、MADDPG(Multi-Agent DDPG)などがあります。協調タスクでは報酬共有が有効で、通信機構を導入すると性能が向上します。
5.4 モデルベース強化学習 (Model-Based RL)
モデルベースRLの概要
モデルベース強化学習は、環境のダイナミクスモデル(遷移関数と報酬関数)を学習し、そのモデルを使って方策を最適化します。モデルフリー手法と比べてサンプル効率が高いのが特徴です。
P̂s'|s,a, R̂s,a] MD --> |学習済みモデル| PLAN[プランニング
シミュレーション] PLAN --> |方策改善| POL[方策 π] POL --> |行動 a| ENV PLAN -.-> |想像上の経験| MB[モデルベース更新] ENV -.-> |実経験| MF[モデルフリー更新] MB --> POL MF --> POL style ENV fill:#e3f2fd style MD fill:#fff9c4 style PLAN fill:#c8e6c9 style POL fill:#ffccbc
モデルベース vs モデルフリー
| 側面 | モデルベース | モデルフリー |
|---|---|---|
| サンプル効率 | 高い(モデルで補完) | 低い(多くの経験が必要) |
| 計算コスト | 高い(モデル学習+プランニング) | 低い(直接方策学習) |
| 適用難易度 | 難しい(モデル誤差の影響) | 容易(直接学習) |
| 解釈性 | 高い(モデルで予測可能) | 低い(ブラックボックス) |
主要アプローチ
- Dyna-Q: 実経験とモデル経験を組み合わせ
- PETS: 確率的アンサンブルで不確実性を考慮
- MBPO: モデルベース方策最適化
- MuZero: モデル学習とMCTSの統合
環境モデルは以下を学習します:
$$ \hat{P}(s' | s, a) \approx P(s' | s, a) $$ $$ \hat{R}(s, a) \approx R(s, a) $$学習したモデルを使ってシミュレーションし、多くの仮想経験を生成します。
モデルベースRLのポイント: モデル誤差が累積すると性能が悪化するため、不確実性推定とモデルの適切な使用が重要です。実環境とモデル環境のデータをバランス良く使うことで、サンプル効率と性能を両立できます。
5.5 実世界応用
5.5.1 ロボティクス (Robotics)
強化学習はロボットの制御、操作、ナビゲーションに広く応用されています。
主要応用分野
- ロボットアーム制御: 物体把持、組み立て作業
- 歩行ロボット: 二足歩行、四足歩行の学習
- 自律ナビゲーション: 障害物回避、経路計画
- Sim-to-Real転移: シミュレーションで学習→実機へ転移
5.5.2 ゲームAI (Game AI)
強化学習は複雑なゲームで人間レベル以上の性能を達成しています。
代表的な成功例
| システム | ゲーム | 手法 |
|---|---|---|
| AlphaGo | 囲碁 | MCTS + Deep RL |
| AlphaStar | StarCraft II | Multi-agent RL |
| OpenAI Five | Dota 2 | PPO + 大規模分散学習 |
| MuZero | チェス、将棋、Atari | Model-based RL + MCTS |
5.5.3 金融トレーディング (Trading)
強化学習は自動トレーディング、ポートフォリオ最適化に応用されています。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import deque
class TradingEnvironment:
"""
株式トレーディング環境
Features:
- 過去の価格履歴から行動決定
- 取引コストを考慮
- 保有ポジション管理
"""
def __init__(self, price_data, initial_balance=10000,
transaction_cost=0.001, window_size=20):
"""
Args:
price_data: 価格データ (DataFrame)
initial_balance: 初期資金
transaction_cost: 取引コスト(片道)
window_size: 観測する過去ウィンドウサイズ
"""
self.price_data = price_data
self.initial_balance = initial_balance
self.transaction_cost = transaction_cost
self.window_size = window_size
self.reset()
def reset(self):
"""環境リセット"""
self.current_step = self.window_size
self.balance = self.initial_balance
self.shares_held = 0
self.net_worth = self.initial_balance
self.max_net_worth = self.initial_balance
return self._get_observation()
def _get_observation(self):
"""
観測取得
Returns:
observation: [価格履歴, 保有株数, 残高] の正規化版
"""
# 過去window_sizeステップの価格変化率
window_data = self.price_data.iloc[
self.current_step - self.window_size:self.current_step
]['Close'].pct_change().fillna(0).values
# ポートフォリオ状態
portfolio_state = np.array([
self.shares_held / 100, # 正規化
self.balance / self.initial_balance # 正規化
])
observation = np.concatenate([window_data, portfolio_state])
return observation
def step(self, action):
"""
環境ステップ
Args:
action: 0=Hold, 1=Buy, 2=Sell
Returns:
observation: 次状態
reward: 報酬
done: 終了フラグ
info: 追加情報
"""
current_price = self.price_data.iloc[self.current_step]['Close']
# 行動実行
if action == 1: # Buy
shares_to_buy = self.balance // current_price
cost = shares_to_buy * current_price * (1 + self.transaction_cost)
if cost <= self.balance:
self.shares_held += shares_to_buy
self.balance -= cost
elif action == 2: # Sell
if self.shares_held > 0:
proceeds = self.shares_held * current_price * (1 - self.transaction_cost)
self.balance += proceeds
self.shares_held = 0
# ステップ進行
self.current_step += 1
# 純資産計算
self.net_worth = self.balance + self.shares_held * current_price
self.max_net_worth = max(self.max_net_worth, self.net_worth)
# 報酬: 純資産の変化率
reward = (self.net_worth - self.initial_balance) / self.initial_balance
# 終了判定
done = self.current_step >= len(self.price_data) - 1
observation = self._get_observation()
info = {
'net_worth': self.net_worth,
'shares_held': self.shares_held,
'balance': self.balance
}
return observation, reward, done, info
class DQNTrader:
"""
DQNベースのトレーディングエージェント
"""
def __init__(self, state_dim, n_actions=3, lr=0.001, gamma=0.95):
import torch
import torch.nn as nn
self.state_dim = state_dim
self.n_actions = n_actions
self.gamma = gamma
# Q-Network
self.q_network = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, n_actions)
)
self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)
self.memory = deque(maxlen=2000)
self.epsilon = 1.0
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
def select_action(self, state, training=True):
"""ε-greedy行動選択"""
import torch
if training and np.random.rand() < self.epsilon:
return np.random.randint(self.n_actions)
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network(state_tensor)
return q_values.argmax().item()
def train(self, batch_size=32):
"""DQN更新"""
import torch
import torch.nn.functional as F
if len(self.memory) < batch_size:
return
# ミニバッチサンプリング
batch = np.array(self.memory, dtype=object)
indices = np.random.choice(len(batch), batch_size, replace=False)
samples = batch[indices]
states = torch.FloatTensor(np.vstack([s[0] for s in samples]))
actions = torch.LongTensor([s[1] for s in samples])
rewards = torch.FloatTensor([s[2] for s in samples])
next_states = torch.FloatTensor(np.vstack([s[3] for s in samples]))
dones = torch.FloatTensor([s[4] for s in samples])
# Q値計算
current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
with torch.no_grad():
max_next_q = self.q_network(next_states).max(1)[0]
target_q = rewards + (1 - dones) * self.gamma * max_next_q
# 損失計算と更新
loss = F.mse_loss(current_q.squeeze(), target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# ε減衰
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
# トレーディングボット訓練例
def train_trading_bot():
"""
株式トレーディングボットの訓練
(デモ用: ランダムウォーク価格データ使用)
"""
# デモ用価格データ生成
np.random.seed(42)
dates = pd.date_range('2020-01-01', periods=500)
prices = 100 * np.exp(np.cumsum(np.random.randn(500) * 0.02))
price_data = pd.DataFrame({'Close': prices}, index=dates)
# 環境とエージェント初期化
env = TradingEnvironment(price_data, window_size=20)
obs = env.reset()
agent = DQNTrader(state_dim=len(obs))
num_episodes = 50
for episode in range(num_episodes):
state = env.reset()
total_reward = 0
done = False
while not done:
# 行動選択
action = agent.select_action(state, training=True)
# 環境ステップ
next_state, reward, done, info = env.step(action)
# 経験保存
agent.memory.append((state, action, reward, next_state, done))
# 訓練
agent.train(batch_size=32)
total_reward += reward
state = next_state
if episode % 10 == 0:
print(f"Episode {episode}, Total Reward: {total_reward:.4f}, "
f"Final Net Worth: ${info['net_worth']:.2f}, "
f"Epsilon: {agent.epsilon:.3f}")
# 最終評価
state = env.reset()
done = False
actions_taken = []
net_worths = []
while not done:
action = agent.select_action(state, training=False)
actions_taken.append(action)
state, reward, done, info = env.step(action)
net_worths.append(info['net_worth'])
# 可視化
plt.figure(figsize=(14, 6))
plt.subplot(1, 2, 1)
plt.plot(price_data.index[-len(net_worths):],
price_data['Close'].iloc[-len(net_worths):],
label='Stock Price', alpha=0.7)
plt.title('Stock Price Over Time')
plt.xlabel('Date')
plt.ylabel('Price ($)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(net_worths, label='Portfolio Net Worth', color='green')
plt.axhline(y=env.initial_balance, color='r', linestyle='--',
label='Initial Balance', alpha=0.7)
plt.title('Portfolio Performance')
plt.xlabel('Time Step')
plt.ylabel('Net Worth ($)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('trading_bot_performance.png', dpi=150, bbox_inches='tight')
print("Performance chart saved as 'trading_bot_performance.png'")
final_return = (net_worths[-1] - env.initial_balance) / env.initial_balance * 100
print(f"\nFinal Return: {final_return:.2f}%")
return agent, env
if __name__ == "__main__":
print("RL Trading Bot Training")
print("=" * 50)
agent, env = train_trading_bot()
print("Training completed!")
トレーディングへの応用ポイント: 取引コスト、スリッページ、市場インパクトを考慮することが重要です。過去データでの過学習を避けるため、複数の時期でバックテストを行います。実際の運用では、リスク管理(ポジションサイズ制限、ストップロス)を組み込む必要があります。
5.6 Stable-Baselines3による実践
Stable-Baselines3の概要
Stable-Baselines3 (SB3)は、信頼性の高いRL実装を提供するPythonライブラリです。最新アルゴリズムの実装が充実しており、実践的なRLプロジェクトに最適です。
SB3の主要アルゴリズム
| アルゴリズム | タイプ | 適用場面 |
|---|---|---|
| PPO | On-policy, Actor-Critic | 汎用性が高い、安定 |
| A2C | On-policy, Actor-Critic | 高速学習、並列化 |
| SAC | Off-policy, Max-Entropy | 連続行動、サンプル効率 |
| TD3 | Off-policy, DDPG改良 | 連続行動、安定性 |
| DQN | Off-policy, Value-based | 離散行動 |
Stable-Baselines3の実践例
"""
Stable-Baselines3を使った実践的なRL訓練
"""
# インストール(必要に応じて)
# !pip install stable-baselines3[extra]
import gymnasium as gym
from stable_baselines3 import PPO, SAC, DQN
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import numpy as np
import matplotlib.pyplot as plt
# === Example 1: PPO for CartPole ===
def train_ppo_cartpole():
"""
PPOでCartPole環境を訓練
Features:
- Vectorized environment for parallel training
- Evaluation callback for monitoring
- Model checkpointing
"""
print("Training PPO on CartPole-v1")
print("=" * 50)
# ベクトル化環境(並列訓練)
env = make_vec_env('CartPole-v1', n_envs=4)
# 評価用環境
eval_env = gym.make('CartPole-v1')
eval_env = Monitor(eval_env)
# PPOモデル初期化
model = PPO(
'MlpPolicy', # Multi-Layer Perceptron policy
env,
learning_rate=3e-4,
n_steps=2048, # ステップ数/更新
batch_size=64,
n_epochs=10, # 更新エポック数
gamma=0.99,
gae_lambda=0.95, # GAE parameter
clip_range=0.2, # PPO clipping
verbose=1,
tensorboard_log="./ppo_cartpole_tensorboard/"
)
# コールバック設定
eval_callback = EvalCallback(
eval_env,
best_model_save_path='./logs/best_model',
log_path='./logs/',
eval_freq=10000,
deterministic=True,
render=False
)
checkpoint_callback = CheckpointCallback(
save_freq=10000,
save_path='./logs/checkpoints/',
name_prefix='ppo_cartpole'
)
# 訓練実行
model.learn(
total_timesteps=100000,
callback=[eval_callback, checkpoint_callback]
)
# 評価
mean_reward, std_reward = evaluate_policy(
model, eval_env, n_eval_episodes=10
)
print(f"\nEvaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")
# モデル保存
model.save("ppo_cartpole_final")
return model
# === Example 2: SAC for Continuous Control ===
def train_sac_pendulum():
"""
SACでPendulum環境を訓練(連続行動空間)
Features:
- Maximum entropy RL
- Off-policy learning
- Automatic temperature tuning
"""
print("\nTraining SAC on Pendulum-v1")
print("=" * 50)
# 環境作成
env = gym.make('Pendulum-v1')
# SACモデル
model = SAC(
'MlpPolicy',
env,
learning_rate=3e-4,
buffer_size=100000,
learning_starts=1000,
batch_size=256,
tau=0.005, # Soft update coefficient
gamma=0.99,
train_freq=1,
gradient_steps=1,
ent_coef='auto', # Automatic entropy tuning
verbose=1,
tensorboard_log="./sac_pendulum_tensorboard/"
)
# 訓練
model.learn(total_timesteps=50000)
# 評価
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
print(f"Evaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")
# モデル保存
model.save("sac_pendulum_final")
return model
# === Example 3: Custom Environment with SB3 ===
class CustomGridWorld(gym.Env):
"""
カスタムグリッドワールド環境
SB3互換のGym環境
"""
def __init__(self, grid_size=5):
super(CustomGridWorld, self).__init__()
self.grid_size = grid_size
self.agent_pos = [0, 0]
self.goal_pos = [grid_size - 1, grid_size - 1]
# 行動空間: 上下左右
self.action_space = gym.spaces.Discrete(4)
# 観測空間: エージェント位置(正規化)
self.observation_space = gym.spaces.Box(
low=0, high=1, shape=(2,), dtype=np.float32
)
def reset(self, seed=None):
super().reset(seed=seed)
self.agent_pos = [0, 0]
return self._get_obs(), {}
def _get_obs(self):
return np.array(self.agent_pos, dtype=np.float32) / self.grid_size
def step(self, action):
# 行動実行
if action == 0 and self.agent_pos[1] < self.grid_size - 1: # Up
self.agent_pos[1] += 1
elif action == 1 and self.agent_pos[1] > 0: # Down
self.agent_pos[1] -= 1
elif action == 2 and self.agent_pos[0] < self.grid_size - 1: # Right
self.agent_pos[0] += 1
elif action == 3 and self.agent_pos[0] > 0: # Left
self.agent_pos[0] -= 1
# 報酬計算
if self.agent_pos == self.goal_pos:
reward = 1.0
done = True
else:
reward = -0.01
done = False
return self._get_obs(), reward, done, False, {}
def train_custom_env():
"""カスタム環境でDQNを訓練"""
print("\nTraining DQN on Custom GridWorld")
print("=" * 50)
# カスタム環境
env = CustomGridWorld(grid_size=5)
# DQNモデル
model = DQN(
'MlpPolicy',
env,
learning_rate=1e-3,
buffer_size=10000,
learning_starts=1000,
batch_size=32,
gamma=0.99,
exploration_fraction=0.1,
exploration_final_eps=0.02,
verbose=1
)
# 訓練
model.learn(total_timesteps=50000)
# 評価
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
print(f"Evaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")
return model
# === Example 4: Loading and Using Trained Model ===
def use_trained_model():
"""訓練済みモデルの読み込みと使用"""
print("\nUsing Trained Model")
print("=" * 50)
# モデル読み込み
model = PPO.load("ppo_cartpole_final")
# 環境で実行
env = gym.make('CartPole-v1', render_mode='rgb_array')
obs, _ = env.reset()
total_reward = 0
for _ in range(500):
# 決定的行動選択
action, _states = model.predict(obs, deterministic=True)
obs, reward, terminated, truncated, info = env.step(action)
total_reward += reward
if terminated or truncated:
break
print(f"Episode reward: {total_reward}")
env.close()
# === Example 5: Hyperparameter Tuning with Optuna ===
def hyperparameter_tuning():
"""
Optunaを使ったハイパーパラメータチューニング
(オプション: optunaインストール必要)
"""
try:
from stable_baselines3.common.env_util import make_vec_env
import optuna
from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler
def objective(trial):
"""Optuna目的関数"""
# ハイパーパラメータ提案
lr = trial.suggest_loguniform('lr', 1e-5, 1e-3)
gamma = trial.suggest_uniform('gamma', 0.9, 0.9999)
clip_range = trial.suggest_uniform('clip_range', 0.1, 0.4)
# 環境とモデル
env = make_vec_env('CartPole-v1', n_envs=4)
model = PPO(
'MlpPolicy', env,
learning_rate=lr,
gamma=gamma,
clip_range=clip_range,
verbose=0
)
# 訓練
model.learn(total_timesteps=20000)
# 評価
eval_env = gym.make('CartPole-v1')
mean_reward, _ = evaluate_policy(model, eval_env, n_eval_episodes=5)
return mean_reward
# Optuna study
study = optuna.create_study(
direction='maximize',
sampler=TPESampler(),
pruner=MedianPruner()
)
study.optimize(objective, n_trials=20, timeout=600)
print("\nBest hyperparameters:")
print(study.best_params)
print(f"Best value: {study.best_value:.2f}")
except ImportError:
print("Optuna not installed. Skipping hyperparameter tuning.")
print("Install with: pip install optuna")
# メイン実行
if __name__ == "__main__":
print("Stable-Baselines3 Practical Examples")
print("=" * 50)
# Example 1: PPO
ppo_model = train_ppo_cartpole()
# Example 2: SAC
sac_model = train_sac_pendulum()
# Example 3: Custom Environment
custom_model = train_custom_env()
# Example 4: Using trained model
use_trained_model()
# Example 5: Hyperparameter tuning (optional)
# hyperparameter_tuning()
print("\n" + "=" * 50)
print("All examples completed!")
print("Tensorboard logs saved. View with:")
print(" tensorboard --logdir ./ppo_cartpole_tensorboard/")
SB3の実践ポイント: ベクトル化環境により訓練が高速化され、コールバックにより訓練中の監視・評価が容易になります。TensorBoardログで学習曲線を可視化でき、ハイパーパラメータチューニングにはOptunaが有効です。カスタム環境はGym APIに準拠すれば簡単に統合できます。
5.7 実世界適用の課題と解決策
主要な課題
| 課題 | 説明 | 解決策 |
|---|---|---|
| サンプル効率 | 実環境での学習は時間・コストがかかる | シミュレーションでの事前学習、モデルベースRL、転移学習 |
| 安全性 | 学習中の失敗が危険 | Safe RL、シミュレーションでの検証、人間の監視 |
| Sim-to-Real Gap | シミュレーションと実環境の差 | Domain Randomization、現実性の高いシミュレータ |
| 部分観測 | 完全な状態が観測できない | LSTM/Transformer、信念状態の使用 |
| 報酬設計 | 適切な報酬関数の設計が難しい | 逆強化学習、模倣学習、報酬シェーピング |
| 汎化性能 | 訓練環境外での性能低下 | 多様な訓練データ、Meta-RL、ドメイン適応 |
ベストプラクティス
- 段階的アプローチ: シミュレーション → Sim-to-Real → 実環境
- モデル検証: 複数の評価指標、異なる環境設定でテスト
- 人間の知識活用: 模倣学習、事前学習、報酬シェーピング
- 安全性の確保: 制約付きRL、フェイルセーフ機構
- 継続的学習: オンライン学習、適応的方策
まとめ
この章では、以下の高度なRL手法と応用を学びました:
- ✅ A3C: 非同期並列学習による高速化
- ✅ SAC: 最大エントロピー強化学習による安定した連続制御
- ✅ マルチエージェントRL: 複数エージェントの協調・競争
- ✅ モデルベースRL: サンプル効率の向上
- ✅ 実世界応用: ロボティクス、ゲームAI、金融トレーディング
- ✅ Stable-Baselines3: 実践的なRL開発ツール
- ✅ 実装課題: 安全性、汎化性能、Sim-to-Real転移
次のステップ
- 実践プロジェクト: Stable-Baselines3で独自の環境を作成
- 論文読解: 最新のRL論文を読み、実装してみる
- コンペティション: Kaggle RLコンペ、OpenAI Gym Leaderboard
- 応用分野探索: 自動運転、ヘルスケア、エネルギー管理など
参考リソース
- Stable-Baselines3 Documentation
- OpenAI Spinning Up in Deep RL
- Sutton & Barto: Reinforcement Learning Book
- A3C Paper (Mnih et al., 2016)
- SAC Paper (Haarnoja et al., 2018)
演習問題
演習5.1: A3Cの並列化実装
問題: Pythonのmultiprocessingを使って、完全な並列A3Cを実装してください。
ヒント:
torch.multiprocessingを使用- グローバルネットワークは
share_memory()で共有 - 各ワーカーは独立したプロセスで実行
- ロックを使った同期に注意
演習5.2: SACの温度パラメータ分析
問題: SACの温度パラメータ$\alpha$を固定値と自動調整で比較し、学習曲線と最終性能の違いを分析してください。
ヒント:
- $\alpha \in \{0.05, 0.1, 0.2, \text{auto}\}$で実験
- エントロピーの推移を記録
- 探索の多様性を分析
演習5.3: マルチエージェント協調タスク
問題: 3つのエージェントが協調して目標を運ぶタスクを実装してください。1つのエージェントだけでは運べない重い物体を、複数エージェントで協力して目標地点まで運びます。
ヒント:
- 近接するエージェント数に応じて運搬可能か判定
- 共有報酬で協調を促進
- 通信機構を導入(オプション)
演習5.4: トレーディングボットの改良
問題: 提供されたトレーディングボットに以下の機能を追加してください:
- 複数銘柄のポートフォリオ管理
- リスク管理(最大ドローダウン制限)
- テクニカル指標(移動平均、RSIなど)の追加
ヒント:
- 観測空間にテクニカル指標を追加
- 報酬関数にシャープレシオを組み込む
- 行動空間を拡張(複数銘柄の売買)
演習5.5: Stable-Baselines3でのカスタムコールバック
問題: Stable-Baselines3のカスタムコールバックを作成し、訓練中に以下を実行してください:
- エピソードごとの報酬をログ
- 最高性能モデルを自動保存
- 訓練の早期停止(目標性能達成時)
ヒント:
BaseCallbackを継承_on_step()メソッドをオーバーライドself.localsで訓練情報にアクセス