This chapter covers advanced topics in Advanced RL Methods and Applications. You will master parallel learning mechanism of A3C, actor-critic architecture of SAC, and fundamentals of multi-agent reinforcement learning.
Learning Objectives
By reading this chapter, you will be able to:
- ✅ Understand the parallel learning mechanism of A3C and implement a conceptual version
- ✅ Implement the actor-critic architecture of SAC
- ✅ Understand the fundamentals of multi-agent reinforcement learning
- ✅ Understand the principles of model-based reinforcement learning
- ✅ Implement applications in robotics, game AI, and trading
- ✅ Build practical RL projects using Stable-Baselines3
- ✅ Understand challenges and solutions for real-world deployment
5.1 A3C (Asynchronous Advantage Actor-Critic)
Overview of A3C
A3C (Asynchronous Advantage Actor-Critic) is a parallel learning algorithm proposed by DeepMind in 2016. Multiple workers interact asynchronously with the environment and update a global network, achieving fast and stable learning.
Shared Parameters θ] W1[Worker 1
Environment Copy 1] W2[Worker 2
Environment Copy 2] W3[Worker 3
Environment Copy 3] Wn[Worker N
Environment Copy N] W1 -->|Gradient Update| GN W2 -->|Gradient Update| GN W3 -->|Gradient Update| GN Wn -->|Gradient Update| GN GN -->|Parameter Synchronization| W1 GN -->|Parameter Synchronization| W2 GN -->|Parameter Synchronization| W3 GN -->|Parameter Synchronization| Wn style GN fill:#e3f2fd style W1 fill:#c8e6c9 style W2 fill:#c8e6c9 style W3 fill:#c8e6c9 style Wn fill:#c8e6c9
Key Components of A3C
| Component | Description | Features |
|---|---|---|
| Asynchronous Update | Each worker learns independently | No need for Experience Replay, memory efficient |
| Advantage Function | $A(s, a) = Q(s, a) - V(s)$ | Reduces variance, stable learning |
| Entropy Regularization | Promotes exploration | Prevents premature convergence |
| Parallel Execution | Simultaneous learning in multiple environments | Faster training, diverse data |
A3C Algorithm
Each worker repeats the following steps:
- Parameter Synchronization: Copy parameters from global network $\theta' \leftarrow \theta$
- Experience Collection: $t_{\text{max}}$ steps or termination $(s_t, a_t, r_t)$ Collect
- Compute returns: $n$-step returns $R_t = \sum_{i=0}^{n-1} \gamma^i r_{t+i} + \gamma^n V(s_{t+n})$
- Gradient Computation: Calculate actor and critic losses
- Asynchronous Update: Update global network
The loss function is as follows:
$$ \mathcal{L}_{\text{actor}} = -\log \pi(a_t | s_t; \theta) A_t - \beta H(\pi(\cdot | s_t; \theta)) $$ $$ \mathcal{L}_{\text{critic}} = (R_t - V(s_t; \theta))^2 $$where $H(\pi)$ is the entropy, $\beta$ is the entropy regularization coefficient, and $A_t = R_t - V(s_t)$ is the Advantage estimate.
Conceptual Implementation of A3C
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.multiprocessing as mp
from torch.distributions import Categorical
import gymnasium as gym
import numpy as np
class A3CNetwork(nn.Module):
"""
Actor-Critic shared network for A3C
Architecture:
- Shared layers: Feature extraction
- Actor output: Action probability distribution
- Critic output: State value function
"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
"""
Args:
state_dim: Dimension of state space
action_dim: Dimension of action space
hidden_dim: Dimension of hidden layers
"""
super(A3CNetwork, self).__init__()
# Shared feature extraction layer
self.shared_fc1 = nn.Linear(state_dim, hidden_dim)
self.shared_fc2 = nn.Linear(hidden_dim, hidden_dim)
# Actor output (action probabilities)
self.actor_head = nn.Linear(hidden_dim, action_dim)
# Critic output (state value)
self.critic_head = nn.Linear(hidden_dim, 1)
def forward(self, state):
"""
Forward computation
Args:
state: State (batch_size, state_dim)
Returns:
action_probs: Action probability distribution (batch_size, action_dim)
state_value: State value (batch_size, 1)
"""
# Shared layers
x = F.relu(self.shared_fc1(state))
x = F.relu(self.shared_fc2(x))
# Actor output
action_logits = self.actor_head(x)
action_probs = F.softmax(action_logits, dim=-1)
# Critic output
state_value = self.critic_head(x)
return action_probs, state_value
class A3CWorker:
"""
A3C worker: Learns in an independent environment and updates the global network
Features:
- Asynchronous parameter updates
- n-step returns calculation
- Entropy Regularization
"""
def __init__(self, worker_id, global_network, optimizer,
env_name='CartPole-v1', gamma=0.99,
max_steps=20, entropy_coef=0.01):
"""
Args:
worker_id: Worker ID
global_network: Shared global network
optimizer: Shared optimizer
env_name: Environment name
gamma: Discount factor
max_steps: Number of steps for n-step returns
entropy_coef: Entropy regularization coefficient
"""
self.worker_id = worker_id
self.env = gym.make(env_name)
self.global_network = global_network
self.optimizer = optimizer
self.gamma = gamma
self.max_steps = max_steps
self.entropy_coef = entropy_coef
# Local network (same structure as global)
state_dim = self.env.observation_space.shape[0]
action_dim = self.env.action_space.n
self.local_network = A3CNetwork(state_dim, action_dim)
def compute_returns(self, rewards, next_value, dones):
"""
Compute n-step returns
Args:
rewards: List of rewards
next_value: Value estimate of the final state
dones: List of termination flags
Returns:
returns: Returns for each step
"""
returns = []
R = next_value
# Compute in reverse order
for r, done in zip(reversed(rewards), reversed(dones)):
R = r + self.gamma * R * (1 - done)
returns.insert(0, R)
return returns
def train_step(self):
"""
Training for one episode
Returns:
total_reward: Total episode reward
"""
# Global networkParameter Synchronization
self.local_network.load_state_dict(self.global_network.state_dict())
state, _ = self.env.reset()
done = False
states, actions, rewards, dones, values = [], [], [], [], []
episode_reward = 0
while not done:
# Action selection
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs, value = self.local_network(state_tensor)
dist = Categorical(action_probs)
action = dist.sample()
# Environment step
next_state, reward, terminated, truncated, _ = self.env.step(action.item())
done = terminated or truncated
# Store experience
states.append(state)
actions.append(action)
rewards.append(reward)
dones.append(done)
values.append(value)
episode_reward += reward
state = next_state
# max_stepsUpdate every
if len(states) >= self.max_steps or done:
# Value estimate of next state
if done:
next_value = 0
else:
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
_, next_value = self.local_network(next_state_tensor)
next_value = next_value.item()
# Compute returns
returns = self.compute_returns(rewards, next_value, dones)
# Compute loss
self._update_global_network(states, actions, returns, values)
# Clear buffer
states, actions, rewards, dones, values = [], [], [], [], []
return episode_reward
def _update_global_network(self, states, actions, returns, values):
"""
Update global network
Args:
states: List of states
actions: List of actions
returns: List of returns
values: List of value estimates
"""
states_tensor = torch.FloatTensor(states)
actions_tensor = torch.LongTensor(actions)
returns_tensor = torch.FloatTensor(returns)
# Re-compute
action_probs, state_values = self.local_network(states_tensor)
state_values = state_values.squeeze()
# Compute Advantage
advantages = returns_tensor - state_values.detach()
# Actor loss (Policy Gradient + Entropy)
dist = Categorical(action_probs)
log_probs = dist.log_prob(actions_tensor)
entropy = dist.entropy().mean()
actor_loss = -(log_probs * advantages).mean() - self.entropy_coef * entropy
# Critic loss (MSE)
critic_loss = F.mse_loss(state_values, returns_tensor)
# Total loss
total_loss = actor_loss + critic_loss
# Global networkupdate
self.optimizer.zero_grad()
total_loss.backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(self.local_network.parameters(), 40)
# Transfer gradients to global network
for local_param, global_param in zip(
self.local_network.parameters(),
self.global_network.parameters()
):
if global_param.grad is not None:
return # Another worker is updating
global_param._grad = local_param.grad
self.optimizer.step()
def worker_process(worker_id, global_network, optimizer, num_episodes=100):
"""
Worker process function (for parallel execution)
Args:
worker_id: Worker ID
global_network: Global network
optimizer: Shared optimizer
num_episodes: Number of episodes
"""
worker = A3CWorker(worker_id, global_network, optimizer)
for episode in range(num_episodes):
reward = worker.train_step()
if episode % 10 == 0:
print(f"Worker {worker_id} - Episode {episode}, Reward: {reward:.2f}")
# A3C training example (single-process version - for concept verification)
def train_a3c_simple():
"""
Simplified version of A3C training (no parallel processing)
Actual A3C uses multiprocessing
"""
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
# Global network
global_network = A3CNetwork(state_dim, action_dim)
global_network.share_memory() # For inter-process sharing
optimizer = torch.optim.Adam(global_network.parameters(), lr=0.0001)
# Single worker training example
worker = A3CWorker(0, global_network, optimizer)
rewards = []
for episode in range(100):
reward = worker.train_step()
rewards.append(reward)
if episode % 10 == 0:
avg_reward = np.mean(rewards[-10:])
print(f"Episode {episode}, Avg Reward: {avg_reward:.2f}")
return global_network, rewards
# Execution example
if __name__ == "__main__":
print("A3C Training (Simple Version)")
print("=" * 50)
model, rewards = train_a3c_simple()
print(f"Training completed. Final avg reward: {np.mean(rewards[-10:]):.2f}")
A3C Implementation Notes: The full parallel version uses Python's
multiprocessing, but the above is a simplified version demonstrating the concept.In actual A3C, multiple worker processes simultaneously update the global network. Entropy regularization promotes exploration, and gradient clipping stabilizes learning.
5.2 SAC (Soft Actor-Critic)
Overview of SAC
SAC (Soft Actor-Critic) is an off-policy algorithm based on the maximum entropy reinforcement learning framework. It automatically balances reward maximization and exploration, demonstrating excellent performance in continuous action spaces.
Stochastic Policy] S --> Q1[Q-Network 1
Q₁s,a] S --> Q2[Q-Network 2
Q₂s,a] S --> V[Value Network
Vs] A --> |Action a| E[Environment] Q1 --> |Min Value| MIN[min Q] Q2 --> |Min Value| MIN E --> |Reward + Entropy| R[Maximization Target] MIN --> R V --> R style A fill:#e3f2fd style Q1 fill:#fff9c4 style Q2 fill:#fff9c4 style V fill:#c8e6c9 style R fill:#ffccbc
Key Features of SAC
| Features | Description | Benefits |
|---|---|---|
| Maximum Entropy Objective | Reward + Entropy maximization | Automatic exploration, robust policy |
| Double Q-Learning | Use two Q-Networks to prevent overestimation | Stable learning |
| Off-Policy | Uses Experience Replay | High sample efficiency |
| Automatic temperature tuning | Learn entropy coefficient α | No hyperparameter tuning needed |
SAC Objective Function
SAC optimizes the following maximum entropy objective:
$$ J(\pi) = \mathbb{E}_{\tau \sim \pi} \left[ \sum_{t=0}^{\infty} \gamma^t (r(s_t, a_t) + \alpha \mathcal{H}(\pi(\cdot | s_t))) \right] $$where$\mathcal{H}(\pi)$is the policy entropy and$\alpha$is the temperature parameter.
Actor Update(Policy Improvement):
$$ \mathcal{L}_{\pi}(\theta) = \mathbb{E}_{s_t \sim \mathcal{D}} \left[ \mathbb{E}_{a_t \sim \pi_\theta} [\alpha \log \pi_\theta(a_t | s_t) - Q(s_t, a_t)] \right] $$Critic Update(Bellman error minimization):
$$ \mathcal{L}_Q(\phi) = \mathbb{E}_{(s, a, r, s') \sim \mathcal{D}} \left[ (Q_\phi(s, a) - (r + \gamma V(s')))^2 \right] $$where$V(s') = \mathbb{E}_{a' \sim \pi}[Q(s', a') - \alpha \log \pi(a' | s')]$.
SAC Implementation
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np
from collections import deque
import random
class GaussianPolicy(nn.Module):
"""
Gaussian policy network for SAC
Architecture:
- State input
- Output: mean μ and standard deviation σ
- Differentiable action sampling via Reparameterization Trick
"""
def __init__(self, state_dim, action_dim, hidden_dim=256,
log_std_min=-20, log_std_max=2):
"""
Args:
state_dim: Dimension of state space
action_dim: Dimension of action space
hidden_dim: Dimension of hidden layers
log_std_min: Minimum value of log standard deviation
log_std_max: Maximum log standard deviation
"""
super(GaussianPolicy, self).__init__()
self.log_std_min = log_std_min
self.log_std_max = log_std_max
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.mean = nn.Linear(hidden_dim, action_dim)
self.log_std = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
"""
Forward computation
Args:
state: State (batch_size, state_dim)
Returns:
mean: Mean of action distribution
log_std: Log standard deviation of action distribution
"""
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
mean = self.mean(x)
log_std = self.log_std(x)
log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)
return mean, log_std
def sample(self, state):
"""
Action sampling via Reparameterization Trick
Args:
state: State
Returns:
action: Sampled action (after tanh squashing)
log_prob: Log probability of the action
"""
mean, log_std = self.forward(state)
std = log_std.exp()
# Sample from Gaussian distribution
normal = Normal(mean, std)
x_t = normal.rsample() # Reparameterization trick
# tanh squashing[-1, 1]Squash to
action = torch.tanh(x_t)
# Log probability (including tanh transformation correction)
log_prob = normal.log_prob(x_t)
log_prob -= torch.log(1 - action.pow(2) + 1e-6)
log_prob = log_prob.sum(1, keepdim=True)
return action, log_prob
class QNetwork(nn.Module):
"""
Q-Network for SAC (State-Action value function)
"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(QNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, 1)
def forward(self, state, action):
"""
Compute Q-value
Args:
state: State (batch_size, state_dim)
action: Action (batch_size, action_dim)
Returns:
q_value: Q-value (batch_size, 1)
"""
x = torch.cat([state, action], dim=1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
q_value = self.fc3(x)
return q_value
class ReplayBuffer:
"""Experience Replay Buffer"""
def __init__(self, capacity=100000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*batch)
return (
np.array(state),
np.array(action),
np.array(reward).reshape(-1, 1),
np.array(next_state),
np.array(done).reshape(-1, 1)
)
def __len__(self):
return len(self.buffer)
class SAC:
"""
Soft Actor-Critic Implementation
Features:
- Maximum entropy reinforcement learning
- Double Q-learning for stability
- Automatic temperature tuning
- Off-policy learning with replay buffer
"""
def __init__(self, state_dim, action_dim,
lr=3e-4, gamma=0.99, tau=0.005, alpha=0.2,
automatic_entropy_tuning=True):
"""
Args:
state_dim: Dimension of state space
action_dim: Dimension of action space
lr: Learning rate
gamma: Discount factor
tau: Target network update rate
alpha: Entropy coefficient (when automatic_entropy_tuning=False)
automatic_entropy_tuning: Automatic temperature tuningUse
"""
self.gamma = gamma
self.tau = tau
self.alpha = alpha
# Initialize networks
self.policy = GaussianPolicy(state_dim, action_dim)
self.q_net1 = QNetwork(state_dim, action_dim)
self.q_net2 = QNetwork(state_dim, action_dim)
self.target_q_net1 = QNetwork(state_dim, action_dim)
self.target_q_net2 = QNetwork(state_dim, action_dim)
# Copy parameters to target networks
self.target_q_net1.load_state_dict(self.q_net1.state_dict())
self.target_q_net2.load_state_dict(self.q_net2.state_dict())
# Optimizers
self.policy_optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
self.q1_optimizer = torch.optim.Adam(self.q_net1.parameters(), lr=lr)
self.q2_optimizer = torch.optim.Adam(self.q_net2.parameters(), lr=lr)
# Automatic temperature tuning
self.automatic_entropy_tuning = automatic_entropy_tuning
if automatic_entropy_tuning:
self.target_entropy = -action_dim
self.log_alpha = torch.zeros(1, requires_grad=True)
self.alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=lr)
self.replay_buffer = ReplayBuffer()
def select_action(self, state, evaluate=False):
"""
Action selection
Args:
state: State
evaluate: Evaluation mode (deterministic action)
Returns:
action: Selected action
"""
state = torch.FloatTensor(state).unsqueeze(0)
if evaluate:
with torch.no_grad():
mean, _ = self.policy(state)
action = torch.tanh(mean)
else:
with torch.no_grad():
action, _ = self.policy.sample(state)
return action.cpu().numpy()[0]
def update(self, batch_size=256):
"""
SAC update step
Args:
batch_size: Batch size
"""
if len(self.replay_buffer) < batch_size:
return
# Sample from buffer
state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
state = torch.FloatTensor(state)
action = torch.FloatTensor(action)
reward = torch.FloatTensor(reward)
next_state = torch.FloatTensor(next_state)
done = torch.FloatTensor(done)
# --- Q-Network updates ---
with torch.no_grad():
next_action, next_log_prob = self.policy.sample(next_state)
# Double Q-learning: Min ValueUse
target_q1 = self.target_q_net1(next_state, next_action)
target_q2 = self.target_q_net2(next_state, next_action)
target_q = torch.min(target_q1, target_q2)
# Target with entropy term
target_value = reward + (1 - done) * self.gamma * (
target_q - self.alpha * next_log_prob
)
# Q1 loss
q1_value = self.q_net1(state, action)
q1_loss = F.mse_loss(q1_value, target_value)
# Q2 loss
q2_value = self.q_net2(state, action)
q2_loss = F.mse_loss(q2_value, target_value)
# Q-Network updates
self.q1_optimizer.zero_grad()
q1_loss.backward()
self.q1_optimizer.step()
self.q2_optimizer.zero_grad()
q2_loss.backward()
self.q2_optimizer.step()
# --- Policy Update ---
new_action, log_prob = self.policy.sample(state)
q1_new = self.q_net1(state, new_action)
q2_new = self.q_net2(state, new_action)
q_new = torch.min(q1_new, q2_new)
policy_loss = (self.alpha * log_prob - q_new).mean()
self.policy_optimizer.zero_grad()
policy_loss.backward()
self.policy_optimizer.step()
# --- Temperature parameter update (automatic adjustment) ---
if self.automatic_entropy_tuning:
alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()
self.alpha_optimizer.zero_grad()
alpha_loss.backward()
self.alpha_optimizer.step()
self.alpha = self.log_alpha.exp().item()
# --- Soft update of target network ---
self._soft_update(self.q_net1, self.target_q_net1)
self._soft_update(self.q_net2, self.target_q_net2)
def _soft_update(self, source, target):
"""
Soft update of target network
θ_target = τ * θ_source + (1 - τ) * θ_target
"""
for target_param, source_param in zip(target.parameters(), source.parameters()):
target_param.data.copy_(
self.tau * source_param.data + (1 - self.tau) * target_param.data
)
# SAC training example
def train_sac():
"""SAC training execution example (Pendulum environment)"""
import gymnasium as gym
env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
agent = SAC(state_dim, action_dim)
num_episodes = 100
max_steps = 200
for episode in range(num_episodes):
state, _ = env.reset()
episode_reward = 0
for step in range(max_steps):
# Action selection
action = agent.select_action(state)
# Environment step
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
# Save to buffer
agent.replay_buffer.push(state, action, reward, next_state, done)
# update
agent.update()
episode_reward += reward
state = next_state
if done:
break
if episode % 10 == 0:
print(f"Episode {episode}, Reward: {episode_reward:.2f}, Alpha: {agent.alpha:.3f}")
return agent
if __name__ == "__main__":
print("SAC Training on Pendulum-v1")
print("=" * 50)
agent = train_sac()
print("Training completed!")
SAC Implementation Points: The reparameterization trick makes the policy differentiable, enabling efficient gradient-based optimization. Double Q-learning prevents overestimation, and automatic temperature tuning automatically optimizes the balance between exploration and exploitation. Tanh squashing ensures actions are bounded to a finite range.
5.3 Multi-Agent Reinforcement Learning (Multi-Agent RL)
Fundamentals of Multi-Agent Reinforcement Learning
Multi-Agent Reinforcement Learning (MARL) involves multiple agents simultaneously learning and acting in the same environment. The interactions between agents create challenges different from single-agent RL.
Policy π₁] A2[Agent 2
Policy π₂] A3[Agent 3
Policy π₃] A1 --> |Action a₁| ENV A2 --> |Action a₂| ENV A3 --> |Action a₃| ENV ENV --> |observation o₁, reward r₁| A1 ENV --> |observation o₂, reward r₂| A2 ENV --> |observation o₃, reward r₃| A3 A1 -.-> |Observation/Communication| A2 A2 -.-> |Observation/Communication| A3 A3 -.-> |Observation/Communication| A1 style ENV fill:#e3f2fd style A1 fill:#c8e6c9 style A2 fill:#fff9c4 style A3 fill:#ffccbc
Main MARL Paradigms
| Paradigm | Description | Applications |
|---|---|---|
| Cooperative | All agents share a common goal | Team sports, cooperative robots |
| Competitive | Zero-sum between agents | Game AI, adversarial tasks |
| Mixed | Both cooperation and competition exist | Economic simulation, negotiation |
MARL Challenges
- Non-stationarity: The environment changes dynamically due to other agents' learning
- Credit Assignment: Appropriately assigning rewards to each agent
- Scalability: Computational complexity increases with number of agents
- Communication: Effective information sharing between agents
Multi-Agent Environment Implementation
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
import gymnasium as gym
from gymnasium import spaces
class SimpleMultiAgentEnv(gym.Env):
"""
Simple multi-agent environment
Task: Multiple agents reach a goal position
- Agents move in 2D space
- Positive reward for getting closer to goal
- Negative reward when agents are too close (collision avoidance)
- Cooperative task (shared reward)
"""
def __init__(self, n_agents=3, grid_size=10, max_steps=50):
"""
Args:
n_agents: Number of agents
grid_size: Grid size
max_steps: Maximum number of steps
"""
super(SimpleMultiAgentEnv, self).__init__()
self.n_agents = n_agents
self.grid_size = grid_size
self.max_steps = max_steps
# Action space: 4 directions (up, down, left, right)
self.action_space = spaces.Discrete(4)
# Observation space: [own x, y, x distance to goal, y distance to goal, relative positions of other agents...]
obs_dim = 2 + 2 + (n_agents - 1) * 2
self.observation_space = spaces.Box(
low=-grid_size, high=grid_size,
shape=(obs_dim,), dtype=np.float32
)
self.agent_positions = None
self.goal_position = None
self.current_step = 0
def reset(self, seed=None):
"""Reset environment"""
super().reset(seed=seed)
# Randomly place agents
self.agent_positions = np.random.rand(self.n_agents, 2) * self.grid_size
# Randomly place goal
self.goal_position = np.random.rand(2) * self.grid_size
self.current_step = 0
return self._get_observations(), {}
def step(self, actions):
"""
Environment step
Args:
actions: List of actions for each agent
Returns:
observations: Observations for each agent
rewards: Rewards for each agent
terminated: Termination flag
truncated: Truncation flag
info: Additional information
"""
# Apply actions (move up, down, left, right)
for i, action in enumerate(actions):
if action == 0: # Up
self.agent_positions[i, 1] = min(self.grid_size, self.agent_positions[i, 1] + 0.5)
elif action == 1: # Down
self.agent_positions[i, 1] = max(0, self.agent_positions[i, 1] - 0.5)
elif action == 2: # Right
self.agent_positions[i, 0] = min(self.grid_size, self.agent_positions[i, 0] + 0.5)
elif action == 3: # Left
self.agent_positions[i, 0] = max(0, self.agent_positions[i, 0] - 0.5)
# Compute rewards
rewards = self._compute_rewards()
# Termination check
self.current_step += 1
terminated = self._is_done()
truncated = self.current_step >= self.max_steps
observations = self._get_observations()
return observations, rewards, terminated, truncated, {}
def _get_observations(self):
"""Get observations for each agent"""
observations = []
for i in range(self.n_agents):
obs = []
# Own position
obs.extend(self.agent_positions[i])
# Distance to goal
obs.extend(self.goal_position - self.agent_positions[i])
# Relative positions of other agents
for j in range(self.n_agents):
if i != j:
obs.extend(self.agent_positions[j] - self.agent_positions[i])
observations.append(np.array(obs, dtype=np.float32))
return observations
def _compute_rewards(self):
"""Compute rewards"""
rewards = []
for i in range(self.n_agents):
reward = 0
# Reward based on distance to goal
dist_to_goal = np.linalg.norm(self.agent_positions[i] - self.goal_position)
reward -= dist_to_goal * 0.1
# Goal reached bonus
if dist_to_goal < 0.5:
reward += 10.0
# Collision avoidance penalty
for j in range(self.n_agents):
if i != j:
dist_to_agent = np.linalg.norm(
self.agent_positions[i] - self.agent_positions[j]
)
if dist_to_agent < 1.0:
reward -= 2.0
rewards.append(reward)
return rewards
def _is_done(self):
"""Check if all agents have reached the goal"""
for i in range(self.n_agents):
dist = np.linalg.norm(self.agent_positions[i] - self.goal_position)
if dist >= 0.5:
return False
return True
def render(self):
"""Visualize environment"""
plt.figure(figsize=(8, 8))
plt.xlim(0, self.grid_size)
plt.ylim(0, self.grid_size)
# Draw goal
goal_circle = Circle(self.goal_position, 0.5, color='gold', alpha=0.6, label='Goal')
plt.gca().add_patch(goal_circle)
# Draw agents
colors = ['blue', 'red', 'green', 'purple', 'orange']
for i in range(self.n_agents):
agent_circle = Circle(
self.agent_positions[i], 0.3,
color=colors[i % len(colors)],
alpha=0.8,
label=f'Agent {i+1}'
)
plt.gca().add_patch(agent_circle)
plt.legend()
plt.title(f'Multi-Agent Environment (Step {self.current_step})')
plt.grid(True, alpha=0.3)
plt.show()
class IndependentQLearning:
"""
Independent Q-Learning for MARL
Each agent runs Q-learning independently
"""
def __init__(self, n_agents, state_dim, n_actions,
lr=0.1, gamma=0.99, epsilon=0.1):
"""
Args:
n_agents: Number of agents
state_dim: Dimension of state space
n_actions: Number of actions
lr: Learning rate
gamma: Discount factor
epsilon: ε-greedy exploration rate
"""
self.n_agents = n_agents
self.n_actions = n_actions
self.lr = lr
self.gamma = gamma
self.epsilon = epsilon
# Q-table for each agent (simplified version: discretized)
# In practice, function approximation (neural networks) is used
self.q_tables = [
np.zeros((100, n_actions)) for _ in range(n_agents)
]
def select_actions(self, observations):
"""ε-greedyAction selection"""
actions = []
for i in range(self.n_agents):
if np.random.rand() < self.epsilon:
action = np.random.randint(self.n_actions)
else:
# Discretize observation (simplified version)
state_idx = self._discretize_state(observations[i])
action = np.argmax(self.q_tables[i][state_idx])
actions.append(action)
return actions
def update(self, observations, actions, rewards, next_observations, done):
"""Q-valueupdate"""
for i in range(self.n_agents):
state_idx = self._discretize_state(observations[i])
next_state_idx = self._discretize_state(next_observations[i])
# Q-learning update
target = rewards[i]
if not done:
target += self.gamma * np.max(self.q_tables[i][next_state_idx])
self.q_tables[i][state_idx, actions[i]] += self.lr * (
target - self.q_tables[i][state_idx, actions[i]]
)
def _discretize_state(self, observation):
"""Discretize observation (simplified version)"""
# In practice, use state hashing or function approximation
return int(np.sum(np.abs(observation)) * 10) % 100
# MARL training example
def train_marl():
"""Multi-agent environment training"""
env = SimpleMultiAgentEnv(n_agents=3, grid_size=10)
agent_controller = IndependentQLearning(
n_agents=3,
state_dim=env.observation_space.shape[0],
n_actions=4
)
num_episodes = 100
for episode in range(num_episodes):
observations, _ = env.reset()
done = False
episode_reward = 0
while not done:
# Action selection
actions = agent_controller.select_actions(observations)
# Environment step
next_observations, rewards, terminated, truncated, _ = env.step(actions)
done = terminated or truncated
# update
agent_controller.update(observations, actions, rewards, next_observations, done)
episode_reward += sum(rewards)
observations = next_observations
if episode % 10 == 0:
print(f"Episode {episode}, Total Reward: {episode_reward:.2f}")
# Visualize final episode
observations, _ = env.reset()
env.render()
return agent_controller
if __name__ == "__main__":
print("Multi-Agent RL Training")
print("=" * 50)
controller = train_marl()
print("Training completed!")
MARL Implementation Notes: Independent Q-Learning is the simplest MARL approach where each agent learns independently. More advanced methods include QMIX (centralized training and decentralized execution), MADDPG (Multi-Agent DDPG), and others. For cooperative tasks, shared rewards are effective, and introducing communication mechanisms improves performance.
5.4 Model-Based Reinforcement Learning (Model-Based RL)
Overview of Model-Based RL
Model-Based Reinforcement Learning learns a model of the environment's dynamics (transition function and reward function) and uses that model to optimize the policy. It features higher sample efficiency compared to model-free methods.
P̂s'|s,a, R̂s,a] MD --> |Learned Model| PLAN[Planning
Simulation] PLAN --> |Policy Improvement| POL[Policy π] POL --> |Action a| ENV PLAN -.-> |Imagined Experience| MB[Model-Based Update] ENV -.-> |realExperience| MF[Model-Free Update] MB --> POL MF --> POL style ENV fill:#e3f2fd style MD fill:#fff9c4 style PLAN fill:#c8e6c9 style POL fill:#ffccbc
Model-Based vs Model-Free
| Aspect | Model-Based | Model-Free |
|---|---|---|
| Sample Efficiency | High (model completion) | Low (requires much experience) |
| Computational Cost | High (model learning + planning) | Low (direct policy learning) |
| Application Difficulty | Difficult (affected by model errors) | Easy (direct learning) |
| Interpretability | High (model predictable) | Low (black box) |
Main Approaches
- Dyna-Q: Combines real experience and model experience
- PETS: Considers uncertainty with probabilistic ensembles
- MBPO: Model-based policy optimization
- MuZero: Integration of model learning and MCTS
Learn environment model as follows:
$$ \hat{P}(s' | s, a) \approx P(s' | s, a) $$ $$ \hat{R}(s, a) \approx R(s, a) $$Simulate using learned model to generate many virtual experiences.
Model-Based RL Key Points: Because model errors can accumulate and degrade performance, uncertainty estimation and appropriate use of the model are important. By balancing data from real and model environments well, both sample efficiency and performance can be achieved.
5.5 Real-World Applications
5.5.1 Robotics
Reinforcement learning is widely applied to robot control, manipulation, and navigation.
Main Application Areas
- Robot Arm Control: Object grasping, assembly tasks
- Walking Robots: Learning bipedal and quadrupedal locomotion
- Autonomous Navigation: Obstacle avoidance, path planning
- Sim-to-Real Transfer: Learn in simulation → transfer to real robots
5.5.2 Game AI
Reinforcement learning has achieved human-level or superior performance in complex games.
Notable Success Stories
| System | Game | Method |
|---|---|---|
| AlphaGo | Go | MCTS + Deep RL |
| AlphaStar | StarCraft II | Multi-agent RL |
| OpenAI Five | Dota 2 | PPO + Large-scale distributed training |
| MuZero | Chess, Shogi, Atari | Model-based RL + MCTS |
5.5.3 Financial Trading
Reinforcement learning is applied to automated trading and portfolio optimization.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
# - torch>=2.0.0, <2.3.0
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import deque
class TradingEnvironment:
"""
Stock trading environment
Features:
- Decide actions based on past price history
- Consider transaction costs
- Manage held positions
"""
def __init__(self, price_data, initial_balance=10000,
transaction_cost=0.001, window_size=20):
"""
Args:
price_data: Price data (DataFrame)
initial_balance: Initial capital
transaction_cost: Transaction cost (one-way)
window_size: Historical window size to observe
"""
self.price_data = price_data
self.initial_balance = initial_balance
self.transaction_cost = transaction_cost
self.window_size = window_size
self.reset()
def reset(self):
"""Reset environment"""
self.current_step = self.window_size
self.balance = self.initial_balance
self.shares_held = 0
self.net_worth = self.initial_balance
self.max_net_worth = self.initial_balance
return self._get_observation()
def _get_observation(self):
"""
Get observation
Returns:
observation: Normalized version of [price history, shares held, cash balance]
"""
# pastwindow_sizePrice change rate for
window_data = self.price_data.iloc[
self.current_step - self.window_size:self.current_step
]['Close'].pct_change().fillna(0).values
# Portfolio state
portfolio_state = np.array([
self.shares_held / 100, # Normalized
self.balance / self.initial_balance # Normalized
])
observation = np.concatenate([window_data, portfolio_state])
return observation
def step(self, action):
"""
Environment step
Args:
action: 0=Hold, 1=Buy, 2=Sell
Returns:
observation: Next state
reward: reward
done: Termination flag
info: Additional information
"""
current_price = self.price_data.iloc[self.current_step]['Close']
# Execute action
if action == 1: # Buy
shares_to_buy = self.balance // current_price
cost = shares_to_buy * current_price * (1 + self.transaction_cost)
if cost <= self.balance:
self.shares_held += shares_to_buy
self.balance -= cost
elif action == 2: # Sell
if self.shares_held > 0:
proceeds = self.shares_held * current_price * (1 - self.transaction_cost)
self.balance += proceeds
self.shares_held = 0
# Step forward
self.current_step += 1
# Calculate net worth
self.net_worth = self.balance + self.shares_held * current_price
self.max_net_worth = max(self.max_net_worth, self.net_worth)
# reward: Net worth change rate
reward = (self.net_worth - self.initial_balance) / self.initial_balance
# Termination check
done = self.current_step >= len(self.price_data) - 1
observation = self._get_observation()
info = {
'net_worth': self.net_worth,
'shares_held': self.shares_held,
'balance': self.balance
}
return observation, reward, done, info
class DQNTrader:
"""
DQN-based trading agent
"""
def __init__(self, state_dim, n_actions=3, lr=0.001, gamma=0.95):
import torch
import torch.nn as nn
self.state_dim = state_dim
self.n_actions = n_actions
self.gamma = gamma
# Q-Network
self.q_network = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, n_actions)
)
self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)
self.memory = deque(maxlen=2000)
self.epsilon = 1.0
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
def select_action(self, state, training=True):
"""ε-greedyAction selection"""
import torch
if training and np.random.rand() < self.epsilon:
return np.random.randint(self.n_actions)
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network(state_tensor)
return q_values.argmax().item()
def train(self, batch_size=32):
"""DQN update"""
import torch
import torch.nn.functional as F
if len(self.memory) < batch_size:
return
# Mini-batch sampling
batch = np.array(self.memory, dtype=object)
indices = np.random.choice(len(batch), batch_size, replace=False)
samples = batch[indices]
states = torch.FloatTensor(np.vstack([s[0] for s in samples]))
actions = torch.LongTensor([s[1] for s in samples])
rewards = torch.FloatTensor([s[2] for s in samples])
next_states = torch.FloatTensor(np.vstack([s[3] for s in samples]))
dones = torch.FloatTensor([s[4] for s in samples])
# Q-valuecomputation
current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
with torch.no_grad():
max_next_q = self.q_network(next_states).max(1)[0]
target_q = rewards + (1 - dones) * self.gamma * max_next_q
# Compute loss andupdate
loss = F.mse_loss(current_q.squeeze(), target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# ε decay
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
# Trading bot training example
def train_trading_bot():
"""
Stock trading bot training
(Demo: using random walk price data)
"""
# Generate demo price data
np.random.seed(42)
dates = pd.date_range('2020-01-01', periods=500)
prices = 100 * np.exp(np.cumsum(np.random.randn(500) * 0.02))
price_data = pd.DataFrame({'Close': prices}, index=dates)
# Initialize environment and agent
env = TradingEnvironment(price_data, window_size=20)
obs = env.reset()
agent = DQNTrader(state_dim=len(obs))
num_episodes = 50
for episode in range(num_episodes):
state = env.reset()
total_reward = 0
done = False
while not done:
# Action selection
action = agent.select_action(state, training=True)
# Environment step
next_state, reward, done, info = env.step(action)
# Save experience
agent.memory.append((state, action, reward, next_state, done))
# Training
agent.train(batch_size=32)
total_reward += reward
state = next_state
if episode % 10 == 0:
print(f"Episode {episode}, Total Reward: {total_reward:.4f}, "
f"Final Net Worth: ${info['net_worth']:.2f}, "
f"Epsilon: {agent.epsilon:.3f}")
# Final evaluation
state = env.reset()
done = False
actions_taken = []
net_worths = []
while not done:
action = agent.select_action(state, training=False)
actions_taken.append(action)
state, reward, done, info = env.step(action)
net_worths.append(info['net_worth'])
# Visualization
plt.figure(figsize=(14, 6))
plt.subplot(1, 2, 1)
plt.plot(price_data.index[-len(net_worths):],
price_data['Close'].iloc[-len(net_worths):],
label='Stock Price', alpha=0.7)
plt.title('Stock Price Over Time')
plt.xlabel('Date')
plt.ylabel('Price ($)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(net_worths, label='Portfolio Net Worth', color='green')
plt.axhline(y=env.initial_balance, color='r', linestyle='--',
label='Initial Balance', alpha=0.7)
plt.title('Portfolio Performance')
plt.xlabel('Time Step')
plt.ylabel('Net Worth ($)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('trading_bot_performance.png', dpi=150, bbox_inches='tight')
print("Performance chart saved as 'trading_bot_performance.png'")
final_return = (net_worths[-1] - env.initial_balance) / env.initial_balance * 100
print(f"\nFinal Return: {final_return:.2f}%")
return agent, env
if __name__ == "__main__":
print("RL Trading Bot Training")
print("=" * 50)
agent, env = train_trading_bot()
print("Training completed!")
Trading Application Notes: It is important to consider transaction costs, slippage, and market impact. To avoid overfitting to past data, perform backtests across multiple periods. In actual operation, risk management (position size limits, stop loss) must be incorporated.
5.6 Practical Applications with Stable-Baselines3
Overview of Stable-Baselines3
Stable-Baselines3 (SB3) is a Python library providing reliable RL implementations. It has comprehensive implementations of the latest algorithms and is optimal for practical RL projects.
Key Algorithms in SB3
| Algorithm | Type | Use Cases |
|---|---|---|
| PPO | On-policy, Actor-Critic | High versatility, stable |
| A2C | On-policy, Actor-Critic | Fast learning, parallelization |
| SAC | Off-policy, Max-Entropy | Continuous actions, Sample Efficiency |
| TD3 | Off-policy, DDPG improved | Continuous actions, Stability |
| DQN | Off-policy, Value-based | Discrete actions |
Practical Examples with Stable-Baselines3
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - optuna>=3.2.0
"""
Practical RL training using Stable-Baselines3
"""
# Install (if needed)
# !pip install stable-baselines3[extra]
import gymnasium as gym
from stable_baselines3 import PPO, SAC, DQN
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import numpy as np
import matplotlib.pyplot as plt
# === Example 1: PPO for CartPole ===
def train_ppo_cartpole():
"""
PPOCartPoleEnvironmentTraining
Features:
- Vectorized environment for parallel training
- Evaluation callback for monitoring
- Model checkpointing
"""
print("Training PPO on CartPole-v1")
print("=" * 50)
# Vectorized environment (parallel training)
env = make_vec_env('CartPole-v1', n_envs=4)
# Environment for evaluation
eval_env = gym.make('CartPole-v1')
eval_env = Monitor(eval_env)
# Initialize PPO model
model = PPO(
'MlpPolicy', # Multi-Layer Perceptron policy
env,
learning_rate=3e-4,
n_steps=2048, # Steps per update
batch_size=64,
n_epochs=10, # Update epochs
gamma=0.99,
gae_lambda=0.95, # GAE parameter
clip_range=0.2, # PPO clipping
verbose=1,
tensorboard_log="./ppo_cartpole_tensorboard/"
)
# Setup callbacks
eval_callback = EvalCallback(
eval_env,
best_model_save_path='./logs/best_model',
log_path='./logs/',
eval_freq=10000,
deterministic=True,
render=False
)
checkpoint_callback = CheckpointCallback(
save_freq=10000,
save_path='./logs/checkpoints/',
name_prefix='ppo_cartpole'
)
# Execute training
model.learn(
total_timesteps=100000,
callback=[eval_callback, checkpoint_callback]
)
# Evaluation
mean_reward, std_reward = evaluate_policy(
model, eval_env, n_eval_episodes=10
)
print(f"\nEvaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")
# Save model
model.save("ppo_cartpole_final")
return model
# === Example 2: SAC for Continuous Control ===
def train_sac_pendulum():
"""
SAC Pendulum environment training (continuous action space)
Features:
- Maximum entropy RL
- Off-policy learning
- Automatic temperature tuning
"""
print("\nTraining SAC on Pendulum-v1")
print("=" * 50)
# Create environment
env = gym.make('Pendulum-v1')
# SAC model
model = SAC(
'MlpPolicy',
env,
learning_rate=3e-4,
buffer_size=100000,
learning_starts=1000,
batch_size=256,
tau=0.005, # Soft update coefficient
gamma=0.99,
train_freq=1,
gradient_steps=1,
ent_coef='auto', # Automatic entropy tuning
verbose=1,
tensorboard_log="./sac_pendulum_tensorboard/"
)
# Training
model.learn(total_timesteps=50000)
# Evaluation
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
print(f"Evaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")
# Save model
model.save("sac_pendulum_final")
return model
# === Example 3: Custom Environment with SB3 ===
class CustomGridWorld(gym.Env):
"""
Custom grid world environment
SB3-compatible Gym environment
"""
def __init__(self, grid_size=5):
super(CustomGridWorld, self).__init__()
self.grid_size = grid_size
self.agent_pos = [0, 0]
self.goal_pos = [grid_size - 1, grid_size - 1]
# Action space: up, down, left, right
self.action_space = gym.spaces.Discrete(4)
# Observation space: agent position (normalized)
self.observation_space = gym.spaces.Box(
low=0, high=1, shape=(2,), dtype=np.float32
)
def reset(self, seed=None):
super().reset(seed=seed)
self.agent_pos = [0, 0]
return self._get_obs(), {}
def _get_obs(self):
return np.array(self.agent_pos, dtype=np.float32) / self.grid_size
def step(self, action):
# Execute action
if action == 0 and self.agent_pos[1] < self.grid_size - 1: # Up
self.agent_pos[1] += 1
elif action == 1 and self.agent_pos[1] > 0: # Down
self.agent_pos[1] -= 1
elif action == 2 and self.agent_pos[0] < self.grid_size - 1: # Right
self.agent_pos[0] += 1
elif action == 3 and self.agent_pos[0] > 0: # Left
self.agent_pos[0] -= 1
# Compute rewards
if self.agent_pos == self.goal_pos:
reward = 1.0
done = True
else:
reward = -0.01
done = False
return self._get_obs(), reward, done, False, {}
def train_custom_env():
"""Custom environment DQN training"""
print("\nTraining DQN on Custom GridWorld")
print("=" * 50)
# Custom environment
env = CustomGridWorld(grid_size=5)
# DQN model
model = DQN(
'MlpPolicy',
env,
learning_rate=1e-3,
buffer_size=10000,
learning_starts=1000,
batch_size=32,
gamma=0.99,
exploration_fraction=0.1,
exploration_final_eps=0.02,
verbose=1
)
# Training
model.learn(total_timesteps=50000)
# Evaluation
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
print(f"Evaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")
return model
# === Example 4: Loading and Using Trained Model ===
def use_trained_model():
"""Load and use trained model"""
print("\nUsing Trained Model")
print("=" * 50)
# Load model
model = PPO.load("ppo_cartpole_final")
# Execute environment
env = gym.make('CartPole-v1', render_mode='rgb_array')
obs, _ = env.reset()
total_reward = 0
for _ in range(500):
# Deterministic action selection
action, _states = model.predict(obs, deterministic=True)
obs, reward, terminated, truncated, info = env.step(action)
total_reward += reward
if terminated or truncated:
break
print(f"Episode reward: {total_reward}")
env.close()
# === Example 5: Hyperparameter Tuning with Optuna ===
def hyperparameter_tuning():
"""
Hyperparameter tuning using Optuna
(Optional: requires optuna installation)
"""
try:
from stable_baselines3.common.env_util import make_vec_env
import optuna
from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler
def objective(trial):
"""Optuna objective function"""
# Suggest hyperparameters
lr = trial.suggest_loguniform('lr', 1e-5, 1e-3)
gamma = trial.suggest_uniform('gamma', 0.9, 0.9999)
clip_range = trial.suggest_uniform('clip_range', 0.1, 0.4)
# Environment andmodel
env = make_vec_env('CartPole-v1', n_envs=4)
model = PPO(
'MlpPolicy', env,
learning_rate=lr,
gamma=gamma,
clip_range=clip_range,
verbose=0
)
# Training
model.learn(total_timesteps=20000)
# Evaluation
eval_env = gym.make('CartPole-v1')
mean_reward, _ = evaluate_policy(model, eval_env, n_eval_episodes=5)
return mean_reward
# Optuna study
study = optuna.create_study(
direction='maximize',
sampler=TPESampler(),
pruner=MedianPruner()
)
study.optimize(objective, n_trials=20, timeout=600)
print("\nBest hyperparameters:")
print(study.best_params)
print(f"Best value: {study.best_value:.2f}")
except ImportError:
print("Optuna not installed. Skipping hyperparameter tuning.")
print("Install with: pip install optuna")
# Main execution
if __name__ == "__main__":
print("Stable-Baselines3 Practical Examples")
print("=" * 50)
# Example 1: PPO
ppo_model = train_ppo_cartpole()
# Example 2: SAC
sac_model = train_sac_pendulum()
# Example 3: Custom Environment
custom_model = train_custom_env()
# Example 4: Using trained model
use_trained_model()
# Example 5: Hyperparameter tuning (optional)
# hyperparameter_tuning()
print("\n" + "=" * 50)
print("All examples completed!")
print("Tensorboard logs saved. View with:")
print(" tensorboard --logdir ./ppo_cartpole_tensorboard/")
SB3 Practical Notes: Vectorized environments accelerate training, and callbacks make monitoring and evaluation during training easy. TensorBoard logs allow visualization of learning curves, and hyperparameter tuning is effective with Optuna. Custom environments can be easily integrated if they comply with the Gym API.
5.7 Challenges and Solutions for Real-World Deployment
Major Challenges
| Challenge | Description | Solutions |
|---|---|---|
| Sample Efficiency | Real environment learning time/cost is high | Pre-training in simulation, Model-Based RL, Transfer learning |
| Safety | Failures during learning can be dangerous | Safe RL, validation in simulation, human supervision |
| Sim-to-Real Gap | Gap between simulation and real environment | Domain Randomization, high-fidelity simulators |
| Partial Observability | Cannot observe complete state | Use LSTM/Transformer, belief state |
| Reward Design | Difficult to design appropriate reward functions | Inverse RL, imitation learning, reward shaping |
| Generalization | Performance degradation outside training environment | Diverse training data, Meta-RL, Domain adaptation |
Best Practices
- Gradual Approach: simulation → Sim-to-Real → realEnvironment
- Model Validation: Multiple evaluation metrics, test with different environment settings
- Leverage Human Knowledge: Imitation learning, pre-training, reward shaping
- Safety Assurance: Constrained RL, fail-safe mechanisms
- Continuous Learning: Online learning, adaptive policy
Summary
In this chapter, we learned the following advanced RL methods and applications:
- ✅ A3C: Acceleration through asynchronous parallel learning
- ✅ SAC: Stable continuous control through maximum entropy reinforcement learning
- ✅ Multi-Agent RL: Cooperation and competition among multiple agents
- ✅ Model-BasedRL: Improved sample efficiency
- ✅ Real-World Applications: Robotics, Game AI, Financial trading
- ✅ Stable-Baselines3: Practical RL development tools
- ✅ Implementation Challenges: Safety, Generalization, Sim-to-Real Transfer
Next Steps
- Practical Projects: Creating custom environments with Stable-Baselines3
- Paper Reading: Read and implement the latest RL papers
- Competitions: Participate in Kaggle RL competitions, OpenAI Gym Leaderboard
- Explore Application Domains: Autonomous driving, healthcare, energy management, etc.
Reference Resources
- Stable-Baselines3 Documentation
- OpenAI Spinning Up in Deep RL
- Sutton & Barto: Reinforcement Learning Book
- A3C Paper (Mnih et al., 2016)
- SAC Paper (Haarnoja et al., 2018)
Exercises
Exercise 5.1: Parallel Implementation of A3C
Problem: Python'smultiprocessingto implement full parallel A3C.
Hints:
torch.multiprocessingUse- Global network
share_memory() - Each worker runs in an independent process
- Be careful with synchronization using locks
Exercise 5.2: SAC Temperature Parameter Analysis
Problem: SAC's temperature parameter$\alpha$with fixed values versus automatic tuning, and analyze differences in learning curves and final performance.
Hints:
- $\alpha \in \{0.05, 0.1, 0.2, \text{auto}\}$Experiment with
- Record entropy transitions
- Analyze exploration diversity
Exercise 5.3: Multi-Agent Cooperative Task
Problem: Implement a task where three agents cooperatively carry a heavy object. The object cannot be carried by a single agent and must be transported to a goal position by multiple agents working together.
Hints:
- Determine if transport is possible based on number of nearby agents
- Use shared rewards to encourage cooperation
- Introduce communication mechanism (optional)
Exercise 5.4: Trading Bot Enhancement
Problem: Add the following features to the provided trading bot:
- Multi-stock portfolio management
- Risk management (maximum drawdown limits)
- Add technical indicators (moving averages, RSI, etc.)
Hints:
- Add technical indicators to observation space
- Incorporate Sharpe ratio into reward function
- Expand action space (buy/sell multiple stocks)
Exercise 5.5: Custom Callback for Stable-Baselines3
Problem: Create a Stable-Baselines3 custom callback to perform the following during training:
- Log rewards per episode
- Automatically save the best performing model
- Early stopping of training (when target performance is achieved)
Hints:
BaseCallbackInherit from_on_step()Overrideself.localsAccess training information