🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 5: Advanced RL Methods and Applications

From State-of-the-Art Algorithms to Real-World Applications

📖 Reading Time: 25-30 minutes 📊 Difficulty: Advanced 💻 Code Examples: 7 📝 Exercises: 5

This chapter covers advanced topics in Advanced RL Methods and Applications. You will master parallel learning mechanism of A3C, actor-critic architecture of SAC, and fundamentals of multi-agent reinforcement learning.

Learning Objectives

By reading this chapter, you will be able to:


5.1 A3C (Asynchronous Advantage Actor-Critic)

Overview of A3C

A3C (Asynchronous Advantage Actor-Critic) is a parallel learning algorithm proposed by DeepMind in 2016. Multiple workers interact asynchronously with the environment and update a global network, achieving fast and stable learning.

graph TB GN[Global Network
Shared Parameters θ] W1[Worker 1
Environment Copy 1] W2[Worker 2
Environment Copy 2] W3[Worker 3
Environment Copy 3] Wn[Worker N
Environment Copy N] W1 -->|Gradient Update| GN W2 -->|Gradient Update| GN W3 -->|Gradient Update| GN Wn -->|Gradient Update| GN GN -->|Parameter Synchronization| W1 GN -->|Parameter Synchronization| W2 GN -->|Parameter Synchronization| W3 GN -->|Parameter Synchronization| Wn style GN fill:#e3f2fd style W1 fill:#c8e6c9 style W2 fill:#c8e6c9 style W3 fill:#c8e6c9 style Wn fill:#c8e6c9

Key Components of A3C

Component Description Features
Asynchronous Update Each worker learns independently No need for Experience Replay, memory efficient
Advantage Function $A(s, a) = Q(s, a) - V(s)$ Reduces variance, stable learning
Entropy Regularization Promotes exploration Prevents premature convergence
Parallel Execution Simultaneous learning in multiple environments Faster training, diverse data

A3C Algorithm

Each worker repeats the following steps:

  1. Parameter Synchronization: Copy parameters from global network $\theta' \leftarrow \theta$
  2. Experience Collection: $t_{\text{max}}$ steps or termination $(s_t, a_t, r_t)$ Collect
  3. Compute returns: $n$-step returns $R_t = \sum_{i=0}^{n-1} \gamma^i r_{t+i} + \gamma^n V(s_{t+n})$
  4. Gradient Computation: Calculate actor and critic losses
  5. Asynchronous Update: Update global network

The loss function is as follows:

$$ \mathcal{L}_{\text{actor}} = -\log \pi(a_t | s_t; \theta) A_t - \beta H(\pi(\cdot | s_t; \theta)) $$ $$ \mathcal{L}_{\text{critic}} = (R_t - V(s_t; \theta))^2 $$

where $H(\pi)$ is the entropy, $\beta$ is the entropy regularization coefficient, and $A_t = R_t - V(s_t)$ is the Advantage estimate.

Conceptual Implementation of A3C

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.multiprocessing as mp
from torch.distributions import Categorical
import gymnasium as gym
import numpy as np

class A3CNetwork(nn.Module):
    """
    Actor-Critic shared network for A3C

    Architecture:
    - Shared layers: Feature extraction
    - Actor output: Action probability distribution
    - Critic output: State value function
    """

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        """
        Args:
            state_dim: Dimension of state space
            action_dim: Dimension of action space
            hidden_dim: Dimension of hidden layers
        """
        super(A3CNetwork, self).__init__()

        # Shared feature extraction layer
        self.shared_fc1 = nn.Linear(state_dim, hidden_dim)
        self.shared_fc2 = nn.Linear(hidden_dim, hidden_dim)

        # Actor output (action probabilities)
        self.actor_head = nn.Linear(hidden_dim, action_dim)

        # Critic output (state value)
        self.critic_head = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        """
        Forward computation

        Args:
            state: State (batch_size, state_dim)

        Returns:
            action_probs: Action probability distribution (batch_size, action_dim)
            state_value: State value (batch_size, 1)
        """
        # Shared layers
        x = F.relu(self.shared_fc1(state))
        x = F.relu(self.shared_fc2(x))

        # Actor output
        action_logits = self.actor_head(x)
        action_probs = F.softmax(action_logits, dim=-1)

        # Critic output
        state_value = self.critic_head(x)

        return action_probs, state_value


class A3CWorker:
    """
    A3C worker: Learns in an independent environment and updates the global network

    Features:
    - Asynchronous parameter updates
    - n-step returns calculation
    - Entropy Regularization
    """

    def __init__(self, worker_id, global_network, optimizer,
                 env_name='CartPole-v1', gamma=0.99,
                 max_steps=20, entropy_coef=0.01):
        """
        Args:
            worker_id: Worker ID
            global_network: Shared global network
            optimizer: Shared optimizer
            env_name: Environment name
            gamma: Discount factor
            max_steps: Number of steps for n-step returns
            entropy_coef: Entropy regularization coefficient
        """
        self.worker_id = worker_id
        self.env = gym.make(env_name)
        self.global_network = global_network
        self.optimizer = optimizer
        self.gamma = gamma
        self.max_steps = max_steps
        self.entropy_coef = entropy_coef

        # Local network (same structure as global)
        state_dim = self.env.observation_space.shape[0]
        action_dim = self.env.action_space.n
        self.local_network = A3CNetwork(state_dim, action_dim)

    def compute_returns(self, rewards, next_value, dones):
        """
        Compute n-step returns

        Args:
            rewards: List of rewards
            next_value: Value estimate of the final state
            dones: List of termination flags

        Returns:
            returns: Returns for each step
        """
        returns = []
        R = next_value

        # Compute in reverse order
        for r, done in zip(reversed(rewards), reversed(dones)):
            R = r + self.gamma * R * (1 - done)
            returns.insert(0, R)

        return returns

    def train_step(self):
        """
        Training for one episode

        Returns:
            total_reward: Total episode reward
        """
        # Global networkParameter Synchronization
        self.local_network.load_state_dict(self.global_network.state_dict())

        state, _ = self.env.reset()
        done = False

        states, actions, rewards, dones, values = [], [], [], [], []
        episode_reward = 0

        while not done:
            # Action selection
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            action_probs, value = self.local_network(state_tensor)

            dist = Categorical(action_probs)
            action = dist.sample()

            # Environment step
            next_state, reward, terminated, truncated, _ = self.env.step(action.item())
            done = terminated or truncated

            # Store experience
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            dones.append(done)
            values.append(value)

            episode_reward += reward
            state = next_state

            # max_stepsUpdate every
            if len(states) >= self.max_steps or done:
                # Value estimate of next state
                if done:
                    next_value = 0
                else:
                    next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
                    _, next_value = self.local_network(next_state_tensor)
                    next_value = next_value.item()

                # Compute returns
                returns = self.compute_returns(rewards, next_value, dones)

                # Compute loss
                self._update_global_network(states, actions, returns, values)

                # Clear buffer
                states, actions, rewards, dones, values = [], [], [], [], []

        return episode_reward

    def _update_global_network(self, states, actions, returns, values):
        """
        Update global network

        Args:
            states: List of states
            actions: List of actions
            returns: List of returns
            values: List of value estimates
        """
        states_tensor = torch.FloatTensor(states)
        actions_tensor = torch.LongTensor(actions)
        returns_tensor = torch.FloatTensor(returns)

        # Re-compute
        action_probs, state_values = self.local_network(states_tensor)
        state_values = state_values.squeeze()

        # Compute Advantage
        advantages = returns_tensor - state_values.detach()

        # Actor loss (Policy Gradient + Entropy)
        dist = Categorical(action_probs)
        log_probs = dist.log_prob(actions_tensor)
        entropy = dist.entropy().mean()
        actor_loss = -(log_probs * advantages).mean() - self.entropy_coef * entropy

        # Critic loss (MSE)
        critic_loss = F.mse_loss(state_values, returns_tensor)

        # Total loss
        total_loss = actor_loss + critic_loss

        # Global networkupdate
        self.optimizer.zero_grad()
        total_loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(self.local_network.parameters(), 40)

        # Transfer gradients to global network
        for local_param, global_param in zip(
            self.local_network.parameters(),
            self.global_network.parameters()
        ):
            if global_param.grad is not None:
                return  # Another worker is updating
            global_param._grad = local_param.grad

        self.optimizer.step()


def worker_process(worker_id, global_network, optimizer, num_episodes=100):
    """
    Worker process function (for parallel execution)

    Args:
        worker_id: Worker ID
        global_network: Global network
        optimizer: Shared optimizer
        num_episodes: Number of episodes
    """
    worker = A3CWorker(worker_id, global_network, optimizer)

    for episode in range(num_episodes):
        reward = worker.train_step()
        if episode % 10 == 0:
            print(f"Worker {worker_id} - Episode {episode}, Reward: {reward:.2f}")


# A3C training example (single-process version - for concept verification)
def train_a3c_simple():
    """
    Simplified version of A3C training (no parallel processing)
    Actual A3C uses multiprocessing
    """
    env = gym.make('CartPole-v1')
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    # Global network
    global_network = A3CNetwork(state_dim, action_dim)
    global_network.share_memory()  # For inter-process sharing

    optimizer = torch.optim.Adam(global_network.parameters(), lr=0.0001)

    # Single worker training example
    worker = A3CWorker(0, global_network, optimizer)

    rewards = []
    for episode in range(100):
        reward = worker.train_step()
        rewards.append(reward)

        if episode % 10 == 0:
            avg_reward = np.mean(rewards[-10:])
            print(f"Episode {episode}, Avg Reward: {avg_reward:.2f}")

    return global_network, rewards


# Execution example
if __name__ == "__main__":
    print("A3C Training (Simple Version)")
    print("=" * 50)
    model, rewards = train_a3c_simple()
    print(f"Training completed. Final avg reward: {np.mean(rewards[-10:]):.2f}")

A3C Implementation Notes: The full parallel version uses Python'smultiprocessing, but the above is a simplified version demonstrating the concept.In actual A3C, multiple worker processes simultaneously update the global network. Entropy regularization promotes exploration, and gradient clipping stabilizes learning.


5.2 SAC (Soft Actor-Critic)

Overview of SAC

SAC (Soft Actor-Critic) is an off-policy algorithm based on the maximum entropy reinforcement learning framework. It automatically balances reward maximization and exploration, demonstrating excellent performance in continuous action spaces.

graph LR S[State s] --> A[Actor π
Stochastic Policy] S --> Q1[Q-Network 1
Q₁s,a] S --> Q2[Q-Network 2
Q₂s,a] S --> V[Value Network
Vs] A --> |Action a| E[Environment] Q1 --> |Min Value| MIN[min Q] Q2 --> |Min Value| MIN E --> |Reward + Entropy| R[Maximization Target] MIN --> R V --> R style A fill:#e3f2fd style Q1 fill:#fff9c4 style Q2 fill:#fff9c4 style V fill:#c8e6c9 style R fill:#ffccbc

Key Features of SAC

Features Description Benefits
Maximum Entropy Objective Reward + Entropy maximization Automatic exploration, robust policy
Double Q-Learning Use two Q-Networks to prevent overestimation Stable learning
Off-Policy Uses Experience Replay High sample efficiency
Automatic temperature tuning Learn entropy coefficient α No hyperparameter tuning needed

SAC Objective Function

SAC optimizes the following maximum entropy objective:

$$ J(\pi) = \mathbb{E}_{\tau \sim \pi} \left[ \sum_{t=0}^{\infty} \gamma^t (r(s_t, a_t) + \alpha \mathcal{H}(\pi(\cdot | s_t))) \right] $$

where$\mathcal{H}(\pi)$is the policy entropy and$\alpha$is the temperature parameter.

Actor Update(Policy Improvement):

$$ \mathcal{L}_{\pi}(\theta) = \mathbb{E}_{s_t \sim \mathcal{D}} \left[ \mathbb{E}_{a_t \sim \pi_\theta} [\alpha \log \pi_\theta(a_t | s_t) - Q(s_t, a_t)] \right] $$

Critic Update(Bellman error minimization):

$$ \mathcal{L}_Q(\phi) = \mathbb{E}_{(s, a, r, s') \sim \mathcal{D}} \left[ (Q_\phi(s, a) - (r + \gamma V(s')))^2 \right] $$

where$V(s') = \mathbb{E}_{a' \sim \pi}[Q(s', a') - \alpha \log \pi(a' | s')]$.

SAC Implementation

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np
from collections import deque
import random

class GaussianPolicy(nn.Module):
    """
    Gaussian policy network for SAC

    Architecture:
    - State input
    - Output: mean μ and standard deviation σ
    - Differentiable action sampling via Reparameterization Trick
    """

    def __init__(self, state_dim, action_dim, hidden_dim=256,
                 log_std_min=-20, log_std_max=2):
        """
        Args:
            state_dim: Dimension of state space
            action_dim: Dimension of action space
            hidden_dim: Dimension of hidden layers
            log_std_min: Minimum value of log standard deviation
            log_std_max: Maximum log standard deviation
        """
        super(GaussianPolicy, self).__init__()

        self.log_std_min = log_std_min
        self.log_std_max = log_std_max

        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)

        self.mean = nn.Linear(hidden_dim, action_dim)
        self.log_std = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        """
        Forward computation

        Args:
            state: State (batch_size, state_dim)

        Returns:
            mean: Mean of action distribution
            log_std: Log standard deviation of action distribution
        """
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))

        mean = self.mean(x)
        log_std = self.log_std(x)
        log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)

        return mean, log_std

    def sample(self, state):
        """
        Action sampling via Reparameterization Trick

        Args:
            state: State

        Returns:
            action: Sampled action (after tanh squashing)
            log_prob: Log probability of the action
        """
        mean, log_std = self.forward(state)
        std = log_std.exp()

        # Sample from Gaussian distribution
        normal = Normal(mean, std)
        x_t = normal.rsample()  # Reparameterization trick

        # tanh squashing[-1, 1]Squash to
        action = torch.tanh(x_t)

        # Log probability (including tanh transformation correction)
        log_prob = normal.log_prob(x_t)
        log_prob -= torch.log(1 - action.pow(2) + 1e-6)
        log_prob = log_prob.sum(1, keepdim=True)

        return action, log_prob


class QNetwork(nn.Module):
    """
    Q-Network for SAC (State-Action value function)
    """

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(QNetwork, self).__init__()

        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        """
        Compute Q-value

        Args:
            state: State (batch_size, state_dim)
            action: Action (batch_size, action_dim)

        Returns:
            q_value: Q-value (batch_size, 1)
        """
        x = torch.cat([state, action], dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        q_value = self.fc3(x)
        return q_value


class ReplayBuffer:
    """Experience Replay Buffer"""

    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*batch)
        return (
            np.array(state),
            np.array(action),
            np.array(reward).reshape(-1, 1),
            np.array(next_state),
            np.array(done).reshape(-1, 1)
        )

    def __len__(self):
        return len(self.buffer)


class SAC:
    """
    Soft Actor-Critic Implementation

    Features:
    - Maximum entropy reinforcement learning
    - Double Q-learning for stability
    - Automatic temperature tuning
    - Off-policy learning with replay buffer
    """

    def __init__(self, state_dim, action_dim,
                 lr=3e-4, gamma=0.99, tau=0.005, alpha=0.2,
                 automatic_entropy_tuning=True):
        """
        Args:
            state_dim: Dimension of state space
            action_dim: Dimension of action space
            lr: Learning rate
            gamma: Discount factor
            tau: Target network update rate
            alpha: Entropy coefficient (when automatic_entropy_tuning=False)
            automatic_entropy_tuning: Automatic temperature tuningUse
        """
        self.gamma = gamma
        self.tau = tau
        self.alpha = alpha

        # Initialize networks
        self.policy = GaussianPolicy(state_dim, action_dim)

        self.q_net1 = QNetwork(state_dim, action_dim)
        self.q_net2 = QNetwork(state_dim, action_dim)

        self.target_q_net1 = QNetwork(state_dim, action_dim)
        self.target_q_net2 = QNetwork(state_dim, action_dim)

        # Copy parameters to target networks
        self.target_q_net1.load_state_dict(self.q_net1.state_dict())
        self.target_q_net2.load_state_dict(self.q_net2.state_dict())

        # Optimizers
        self.policy_optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
        self.q1_optimizer = torch.optim.Adam(self.q_net1.parameters(), lr=lr)
        self.q2_optimizer = torch.optim.Adam(self.q_net2.parameters(), lr=lr)

        # Automatic temperature tuning
        self.automatic_entropy_tuning = automatic_entropy_tuning
        if automatic_entropy_tuning:
            self.target_entropy = -action_dim
            self.log_alpha = torch.zeros(1, requires_grad=True)
            self.alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=lr)

        self.replay_buffer = ReplayBuffer()

    def select_action(self, state, evaluate=False):
        """
        Action selection

        Args:
            state: State
            evaluate: Evaluation mode (deterministic action)

        Returns:
            action: Selected action
        """
        state = torch.FloatTensor(state).unsqueeze(0)

        if evaluate:
            with torch.no_grad():
                mean, _ = self.policy(state)
                action = torch.tanh(mean)
        else:
            with torch.no_grad():
                action, _ = self.policy.sample(state)

        return action.cpu().numpy()[0]

    def update(self, batch_size=256):
        """
        SAC update step

        Args:
            batch_size: Batch size
        """
        if len(self.replay_buffer) < batch_size:
            return

        # Sample from buffer
        state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)

        state = torch.FloatTensor(state)
        action = torch.FloatTensor(action)
        reward = torch.FloatTensor(reward)
        next_state = torch.FloatTensor(next_state)
        done = torch.FloatTensor(done)

        # --- Q-Network updates ---
        with torch.no_grad():
            next_action, next_log_prob = self.policy.sample(next_state)

            # Double Q-learning: Min ValueUse
            target_q1 = self.target_q_net1(next_state, next_action)
            target_q2 = self.target_q_net2(next_state, next_action)
            target_q = torch.min(target_q1, target_q2)

            # Target with entropy term
            target_value = reward + (1 - done) * self.gamma * (
                target_q - self.alpha * next_log_prob
            )

        # Q1 loss
        q1_value = self.q_net1(state, action)
        q1_loss = F.mse_loss(q1_value, target_value)

        # Q2 loss
        q2_value = self.q_net2(state, action)
        q2_loss = F.mse_loss(q2_value, target_value)

        # Q-Network updates
        self.q1_optimizer.zero_grad()
        q1_loss.backward()
        self.q1_optimizer.step()

        self.q2_optimizer.zero_grad()
        q2_loss.backward()
        self.q2_optimizer.step()

        # --- Policy Update ---
        new_action, log_prob = self.policy.sample(state)

        q1_new = self.q_net1(state, new_action)
        q2_new = self.q_net2(state, new_action)
        q_new = torch.min(q1_new, q2_new)

        policy_loss = (self.alpha * log_prob - q_new).mean()

        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()

        # --- Temperature parameter update (automatic adjustment) ---
        if self.automatic_entropy_tuning:
            alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()

            self.alpha_optimizer.zero_grad()
            alpha_loss.backward()
            self.alpha_optimizer.step()

            self.alpha = self.log_alpha.exp().item()

        # --- Soft update of target network ---
        self._soft_update(self.q_net1, self.target_q_net1)
        self._soft_update(self.q_net2, self.target_q_net2)

    def _soft_update(self, source, target):
        """
        Soft update of target network
        θ_target = τ * θ_source + (1 - τ) * θ_target
        """
        for target_param, source_param in zip(target.parameters(), source.parameters()):
            target_param.data.copy_(
                self.tau * source_param.data + (1 - self.tau) * target_param.data
            )


# SAC training example
def train_sac():
    """SAC training execution example (Pendulum environment)"""
    import gymnasium as gym

    env = gym.make('Pendulum-v1')
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]

    agent = SAC(state_dim, action_dim)

    num_episodes = 100
    max_steps = 200

    for episode in range(num_episodes):
        state, _ = env.reset()
        episode_reward = 0

        for step in range(max_steps):
            # Action selection
            action = agent.select_action(state)

            # Environment step
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Save to buffer
            agent.replay_buffer.push(state, action, reward, next_state, done)

            # update
            agent.update()

            episode_reward += reward
            state = next_state

            if done:
                break

        if episode % 10 == 0:
            print(f"Episode {episode}, Reward: {episode_reward:.2f}, Alpha: {agent.alpha:.3f}")

    return agent


if __name__ == "__main__":
    print("SAC Training on Pendulum-v1")
    print("=" * 50)
    agent = train_sac()
    print("Training completed!")

SAC Implementation Points: The reparameterization trick makes the policy differentiable, enabling efficient gradient-based optimization. Double Q-learning prevents overestimation, and automatic temperature tuning automatically optimizes the balance between exploration and exploitation. Tanh squashing ensures actions are bounded to a finite range.


5.3 Multi-Agent Reinforcement Learning (Multi-Agent RL)

Fundamentals of Multi-Agent Reinforcement Learning

Multi-Agent Reinforcement Learning (MARL) involves multiple agents simultaneously learning and acting in the same environment. The interactions between agents create challenges different from single-agent RL.

graph TB ENV[Environment Environment] A1[Agent 1
Policy π₁] A2[Agent 2
Policy π₂] A3[Agent 3
Policy π₃] A1 --> |Action a₁| ENV A2 --> |Action a₂| ENV A3 --> |Action a₃| ENV ENV --> |observation o₁, reward r₁| A1 ENV --> |observation o₂, reward r₂| A2 ENV --> |observation o₃, reward r₃| A3 A1 -.-> |Observation/Communication| A2 A2 -.-> |Observation/Communication| A3 A3 -.-> |Observation/Communication| A1 style ENV fill:#e3f2fd style A1 fill:#c8e6c9 style A2 fill:#fff9c4 style A3 fill:#ffccbc

Main MARL Paradigms

Paradigm Description Applications
Cooperative All agents share a common goal Team sports, cooperative robots
Competitive Zero-sum between agents Game AI, adversarial tasks
Mixed Both cooperation and competition exist Economic simulation, negotiation

MARL Challenges

Multi-Agent Environment Implementation

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
import gymnasium as gym
from gymnasium import spaces

class SimpleMultiAgentEnv(gym.Env):
    """
    Simple multi-agent environment

    Task: Multiple agents reach a goal position
    - Agents move in 2D space
    - Positive reward for getting closer to goal
    - Negative reward when agents are too close (collision avoidance)
    - Cooperative task (shared reward)
    """

    def __init__(self, n_agents=3, grid_size=10, max_steps=50):
        """
        Args:
            n_agents: Number of agents
            grid_size: Grid size
            max_steps: Maximum number of steps
        """
        super(SimpleMultiAgentEnv, self).__init__()

        self.n_agents = n_agents
        self.grid_size = grid_size
        self.max_steps = max_steps

        # Action space: 4 directions (up, down, left, right)
        self.action_space = spaces.Discrete(4)

        # Observation space: [own x, y, x distance to goal, y distance to goal, relative positions of other agents...]
        obs_dim = 2 + 2 + (n_agents - 1) * 2
        self.observation_space = spaces.Box(
            low=-grid_size, high=grid_size,
            shape=(obs_dim,), dtype=np.float32
        )

        self.agent_positions = None
        self.goal_position = None
        self.current_step = 0

    def reset(self, seed=None):
        """Reset environment"""
        super().reset(seed=seed)

        # Randomly place agents
        self.agent_positions = np.random.rand(self.n_agents, 2) * self.grid_size

        # Randomly place goal
        self.goal_position = np.random.rand(2) * self.grid_size

        self.current_step = 0

        return self._get_observations(), {}

    def step(self, actions):
        """
        Environment step

        Args:
            actions: List of actions for each agent

        Returns:
            observations: Observations for each agent
            rewards: Rewards for each agent
            terminated: Termination flag
            truncated: Truncation flag
            info: Additional information
        """
        # Apply actions (move up, down, left, right)
        for i, action in enumerate(actions):
            if action == 0:  # Up
                self.agent_positions[i, 1] = min(self.grid_size, self.agent_positions[i, 1] + 0.5)
            elif action == 1:  # Down
                self.agent_positions[i, 1] = max(0, self.agent_positions[i, 1] - 0.5)
            elif action == 2:  # Right
                self.agent_positions[i, 0] = min(self.grid_size, self.agent_positions[i, 0] + 0.5)
            elif action == 3:  # Left
                self.agent_positions[i, 0] = max(0, self.agent_positions[i, 0] - 0.5)

        # Compute rewards
        rewards = self._compute_rewards()

        # Termination check
        self.current_step += 1
        terminated = self._is_done()
        truncated = self.current_step >= self.max_steps

        observations = self._get_observations()

        return observations, rewards, terminated, truncated, {}

    def _get_observations(self):
        """Get observations for each agent"""
        observations = []

        for i in range(self.n_agents):
            obs = []

            # Own position
            obs.extend(self.agent_positions[i])

            # Distance to goal
            obs.extend(self.goal_position - self.agent_positions[i])

            # Relative positions of other agents
            for j in range(self.n_agents):
                if i != j:
                    obs.extend(self.agent_positions[j] - self.agent_positions[i])

            observations.append(np.array(obs, dtype=np.float32))

        return observations

    def _compute_rewards(self):
        """Compute rewards"""
        rewards = []

        for i in range(self.n_agents):
            reward = 0

            # Reward based on distance to goal
            dist_to_goal = np.linalg.norm(self.agent_positions[i] - self.goal_position)
            reward -= dist_to_goal * 0.1

            # Goal reached bonus
            if dist_to_goal < 0.5:
                reward += 10.0

            # Collision avoidance penalty
            for j in range(self.n_agents):
                if i != j:
                    dist_to_agent = np.linalg.norm(
                        self.agent_positions[i] - self.agent_positions[j]
                    )
                    if dist_to_agent < 1.0:
                        reward -= 2.0

            rewards.append(reward)

        return rewards

    def _is_done(self):
        """Check if all agents have reached the goal"""
        for i in range(self.n_agents):
            dist = np.linalg.norm(self.agent_positions[i] - self.goal_position)
            if dist >= 0.5:
                return False
        return True

    def render(self):
        """Visualize environment"""
        plt.figure(figsize=(8, 8))
        plt.xlim(0, self.grid_size)
        plt.ylim(0, self.grid_size)

        # Draw goal
        goal_circle = Circle(self.goal_position, 0.5, color='gold', alpha=0.6, label='Goal')
        plt.gca().add_patch(goal_circle)

        # Draw agents
        colors = ['blue', 'red', 'green', 'purple', 'orange']
        for i in range(self.n_agents):
            agent_circle = Circle(
                self.agent_positions[i], 0.3,
                color=colors[i % len(colors)],
                alpha=0.8,
                label=f'Agent {i+1}'
            )
            plt.gca().add_patch(agent_circle)

        plt.legend()
        plt.title(f'Multi-Agent Environment (Step {self.current_step})')
        plt.grid(True, alpha=0.3)
        plt.show()


class IndependentQLearning:
    """
    Independent Q-Learning for MARL
    Each agent runs Q-learning independently
    """

    def __init__(self, n_agents, state_dim, n_actions,
                 lr=0.1, gamma=0.99, epsilon=0.1):
        """
        Args:
            n_agents: Number of agents
            state_dim: Dimension of state space
            n_actions: Number of actions
            lr: Learning rate
            gamma: Discount factor
            epsilon: ε-greedy exploration rate
        """
        self.n_agents = n_agents
        self.n_actions = n_actions
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon

        # Q-table for each agent (simplified version: discretized)
        # In practice, function approximation (neural networks) is used
        self.q_tables = [
            np.zeros((100, n_actions)) for _ in range(n_agents)
        ]

    def select_actions(self, observations):
        """ε-greedyAction selection"""
        actions = []

        for i in range(self.n_agents):
            if np.random.rand() < self.epsilon:
                action = np.random.randint(self.n_actions)
            else:
                # Discretize observation (simplified version)
                state_idx = self._discretize_state(observations[i])
                action = np.argmax(self.q_tables[i][state_idx])

            actions.append(action)

        return actions

    def update(self, observations, actions, rewards, next_observations, done):
        """Q-valueupdate"""
        for i in range(self.n_agents):
            state_idx = self._discretize_state(observations[i])
            next_state_idx = self._discretize_state(next_observations[i])

            # Q-learning update
            target = rewards[i]
            if not done:
                target += self.gamma * np.max(self.q_tables[i][next_state_idx])

            self.q_tables[i][state_idx, actions[i]] += self.lr * (
                target - self.q_tables[i][state_idx, actions[i]]
            )

    def _discretize_state(self, observation):
        """Discretize observation (simplified version)"""
        # In practice, use state hashing or function approximation
        return int(np.sum(np.abs(observation)) * 10) % 100


# MARL training example
def train_marl():
    """Multi-agent environment training"""
    env = SimpleMultiAgentEnv(n_agents=3, grid_size=10)
    agent_controller = IndependentQLearning(
        n_agents=3,
        state_dim=env.observation_space.shape[0],
        n_actions=4
    )

    num_episodes = 100

    for episode in range(num_episodes):
        observations, _ = env.reset()
        done = False
        episode_reward = 0

        while not done:
            # Action selection
            actions = agent_controller.select_actions(observations)

            # Environment step
            next_observations, rewards, terminated, truncated, _ = env.step(actions)
            done = terminated or truncated

            # update
            agent_controller.update(observations, actions, rewards, next_observations, done)

            episode_reward += sum(rewards)
            observations = next_observations

        if episode % 10 == 0:
            print(f"Episode {episode}, Total Reward: {episode_reward:.2f}")

    # Visualize final episode
    observations, _ = env.reset()
    env.render()

    return agent_controller


if __name__ == "__main__":
    print("Multi-Agent RL Training")
    print("=" * 50)
    controller = train_marl()
    print("Training completed!")

MARL Implementation Notes: Independent Q-Learning is the simplest MARL approach where each agent learns independently. More advanced methods include QMIX (centralized training and decentralized execution), MADDPG (Multi-Agent DDPG), and others. For cooperative tasks, shared rewards are effective, and introducing communication mechanisms improves performance.


5.4 Model-Based Reinforcement Learning (Model-Based RL)

Overview of Model-Based RL

Model-Based Reinforcement Learning learns a model of the environment's dynamics (transition function and reward function) and uses that model to optimize the policy. It features higher sample efficiency compared to model-free methods.

graph LR ENV[realEnvironment] --> |Experience s,a,r,s'| MD[Model Learning
P̂s'|s,a, R̂s,a] MD --> |Learned Model| PLAN[Planning
Simulation] PLAN --> |Policy Improvement| POL[Policy π] POL --> |Action a| ENV PLAN -.-> |Imagined Experience| MB[Model-Based Update] ENV -.-> |realExperience| MF[Model-Free Update] MB --> POL MF --> POL style ENV fill:#e3f2fd style MD fill:#fff9c4 style PLAN fill:#c8e6c9 style POL fill:#ffccbc

Model-Based vs Model-Free

Aspect Model-Based Model-Free
Sample Efficiency High (model completion) Low (requires much experience)
Computational Cost High (model learning + planning) Low (direct policy learning)
Application Difficulty Difficult (affected by model errors) Easy (direct learning)
Interpretability High (model predictable) Low (black box)

Main Approaches

Learn environment model as follows:

$$ \hat{P}(s' | s, a) \approx P(s' | s, a) $$ $$ \hat{R}(s, a) \approx R(s, a) $$

Simulate using learned model to generate many virtual experiences.

Model-Based RL Key Points: Because model errors can accumulate and degrade performance, uncertainty estimation and appropriate use of the model are important. By balancing data from real and model environments well, both sample efficiency and performance can be achieved.


5.5 Real-World Applications

5.5.1 Robotics

Reinforcement learning is widely applied to robot control, manipulation, and navigation.

Main Application Areas

5.5.2 Game AI

Reinforcement learning has achieved human-level or superior performance in complex games.

Notable Success Stories

System Game Method
AlphaGo Go MCTS + Deep RL
AlphaStar StarCraft II Multi-agent RL
OpenAI Five Dota 2 PPO + Large-scale distributed training
MuZero Chess, Shogi, Atari Model-based RL + MCTS

5.5.3 Financial Trading

Reinforcement learning is applied to automated trading and portfolio optimization.

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
# - torch>=2.0.0, <2.3.0

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import deque

class TradingEnvironment:
    """
    Stock trading environment

    Features:
    - Decide actions based on past price history
    - Consider transaction costs
    - Manage held positions
    """

    def __init__(self, price_data, initial_balance=10000,
                 transaction_cost=0.001, window_size=20):
        """
        Args:
            price_data: Price data (DataFrame)
            initial_balance: Initial capital
            transaction_cost: Transaction cost (one-way)
            window_size: Historical window size to observe
        """
        self.price_data = price_data
        self.initial_balance = initial_balance
        self.transaction_cost = transaction_cost
        self.window_size = window_size

        self.reset()

    def reset(self):
        """Reset environment"""
        self.current_step = self.window_size
        self.balance = self.initial_balance
        self.shares_held = 0
        self.net_worth = self.initial_balance
        self.max_net_worth = self.initial_balance

        return self._get_observation()

    def _get_observation(self):
        """
        Get observation

        Returns:
            observation: Normalized version of [price history, shares held, cash balance]
        """
        # pastwindow_sizePrice change rate for
        window_data = self.price_data.iloc[
            self.current_step - self.window_size:self.current_step
        ]['Close'].pct_change().fillna(0).values

        # Portfolio state
        portfolio_state = np.array([
            self.shares_held / 100,  # Normalized
            self.balance / self.initial_balance  # Normalized
        ])

        observation = np.concatenate([window_data, portfolio_state])
        return observation

    def step(self, action):
        """
        Environment step

        Args:
            action: 0=Hold, 1=Buy, 2=Sell

        Returns:
            observation: Next state
            reward: reward
            done: Termination flag
            info: Additional information
        """
        current_price = self.price_data.iloc[self.current_step]['Close']

        # Execute action
        if action == 1:  # Buy
            shares_to_buy = self.balance // current_price
            cost = shares_to_buy * current_price * (1 + self.transaction_cost)

            if cost <= self.balance:
                self.shares_held += shares_to_buy
                self.balance -= cost

        elif action == 2:  # Sell
            if self.shares_held > 0:
                proceeds = self.shares_held * current_price * (1 - self.transaction_cost)
                self.balance += proceeds
                self.shares_held = 0

        # Step forward
        self.current_step += 1

        # Calculate net worth
        self.net_worth = self.balance + self.shares_held * current_price
        self.max_net_worth = max(self.max_net_worth, self.net_worth)

        # reward: Net worth change rate
        reward = (self.net_worth - self.initial_balance) / self.initial_balance

        # Termination check
        done = self.current_step >= len(self.price_data) - 1

        observation = self._get_observation()
        info = {
            'net_worth': self.net_worth,
            'shares_held': self.shares_held,
            'balance': self.balance
        }

        return observation, reward, done, info


class DQNTrader:
    """
    DQN-based trading agent
    """

    def __init__(self, state_dim, n_actions=3, lr=0.001, gamma=0.95):
        import torch
        import torch.nn as nn

        self.state_dim = state_dim
        self.n_actions = n_actions
        self.gamma = gamma

        # Q-Network
        self.q_network = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )

        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)
        self.memory = deque(maxlen=2000)
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01

    def select_action(self, state, training=True):
        """ε-greedyAction selection"""
        import torch

        if training and np.random.rand() < self.epsilon:
            return np.random.randint(self.n_actions)

        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.q_network(state_tensor)
            return q_values.argmax().item()

    def train(self, batch_size=32):
        """DQN update"""
        import torch
        import torch.nn.functional as F

        if len(self.memory) < batch_size:
            return

        # Mini-batch sampling
        batch = np.array(self.memory, dtype=object)
        indices = np.random.choice(len(batch), batch_size, replace=False)
        samples = batch[indices]

        states = torch.FloatTensor(np.vstack([s[0] for s in samples]))
        actions = torch.LongTensor([s[1] for s in samples])
        rewards = torch.FloatTensor([s[2] for s in samples])
        next_states = torch.FloatTensor(np.vstack([s[3] for s in samples]))
        dones = torch.FloatTensor([s[4] for s in samples])

        # Q-valuecomputation
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1))

        with torch.no_grad():
            max_next_q = self.q_network(next_states).max(1)[0]
            target_q = rewards + (1 - dones) * self.gamma * max_next_q

        # Compute loss andupdate
        loss = F.mse_loss(current_q.squeeze(), target_q)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # ε decay
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)


# Trading bot training example
def train_trading_bot():
    """
    Stock trading bot training
    (Demo: using random walk price data)
    """
    # Generate demo price data
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', periods=500)
    prices = 100 * np.exp(np.cumsum(np.random.randn(500) * 0.02))
    price_data = pd.DataFrame({'Close': prices}, index=dates)

    # Initialize environment and agent
    env = TradingEnvironment(price_data, window_size=20)
    obs = env.reset()
    agent = DQNTrader(state_dim=len(obs))

    num_episodes = 50

    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False

        while not done:
            # Action selection
            action = agent.select_action(state, training=True)

            # Environment step
            next_state, reward, done, info = env.step(action)

            # Save experience
            agent.memory.append((state, action, reward, next_state, done))

            # Training
            agent.train(batch_size=32)

            total_reward += reward
            state = next_state

        if episode % 10 == 0:
            print(f"Episode {episode}, Total Reward: {total_reward:.4f}, "
                  f"Final Net Worth: ${info['net_worth']:.2f}, "
                  f"Epsilon: {agent.epsilon:.3f}")

    # Final evaluation
    state = env.reset()
    done = False
    actions_taken = []
    net_worths = []

    while not done:
        action = agent.select_action(state, training=False)
        actions_taken.append(action)
        state, reward, done, info = env.step(action)
        net_worths.append(info['net_worth'])

    # Visualization
    plt.figure(figsize=(14, 6))

    plt.subplot(1, 2, 1)
    plt.plot(price_data.index[-len(net_worths):],
             price_data['Close'].iloc[-len(net_worths):],
             label='Stock Price', alpha=0.7)
    plt.title('Stock Price Over Time')
    plt.xlabel('Date')
    plt.ylabel('Price ($)')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.subplot(1, 2, 2)
    plt.plot(net_worths, label='Portfolio Net Worth', color='green')
    plt.axhline(y=env.initial_balance, color='r', linestyle='--',
                label='Initial Balance', alpha=0.7)
    plt.title('Portfolio Performance')
    plt.xlabel('Time Step')
    plt.ylabel('Net Worth ($)')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig('trading_bot_performance.png', dpi=150, bbox_inches='tight')
    print("Performance chart saved as 'trading_bot_performance.png'")

    final_return = (net_worths[-1] - env.initial_balance) / env.initial_balance * 100
    print(f"\nFinal Return: {final_return:.2f}%")

    return agent, env


if __name__ == "__main__":
    print("RL Trading Bot Training")
    print("=" * 50)
    agent, env = train_trading_bot()
    print("Training completed!")

Trading Application Notes: It is important to consider transaction costs, slippage, and market impact. To avoid overfitting to past data, perform backtests across multiple periods. In actual operation, risk management (position size limits, stop loss) must be incorporated.


5.6 Practical Applications with Stable-Baselines3

Overview of Stable-Baselines3

Stable-Baselines3 (SB3) is a Python library providing reliable RL implementations. It has comprehensive implementations of the latest algorithms and is optimal for practical RL projects.

Key Algorithms in SB3

Algorithm Type Use Cases
PPO On-policy, Actor-Critic High versatility, stable
A2C On-policy, Actor-Critic Fast learning, parallelization
SAC Off-policy, Max-Entropy Continuous actions, Sample Efficiency
TD3 Off-policy, DDPG improved Continuous actions, Stability
DQN Off-policy, Value-based Discrete actions

Practical Examples with Stable-Baselines3

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - optuna>=3.2.0

"""
Practical RL training using Stable-Baselines3
"""

# Install (if needed)
# !pip install stable-baselines3[extra]

import gymnasium as gym
from stable_baselines3 import PPO, SAC, DQN
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import numpy as np
import matplotlib.pyplot as plt

# === Example 1: PPO for CartPole ===
def train_ppo_cartpole():
    """
    PPOCartPoleEnvironmentTraining

    Features:
    - Vectorized environment for parallel training
    - Evaluation callback for monitoring
    - Model checkpointing
    """
    print("Training PPO on CartPole-v1")
    print("=" * 50)

    # Vectorized environment (parallel training)
    env = make_vec_env('CartPole-v1', n_envs=4)

    # Environment for evaluation
    eval_env = gym.make('CartPole-v1')
    eval_env = Monitor(eval_env)

    # Initialize PPO model
    model = PPO(
        'MlpPolicy',           # Multi-Layer Perceptron policy
        env,
        learning_rate=3e-4,
        n_steps=2048,          # Steps per update
        batch_size=64,
        n_epochs=10,           # Update epochs
        gamma=0.99,
        gae_lambda=0.95,       # GAE parameter
        clip_range=0.2,        # PPO clipping
        verbose=1,
        tensorboard_log="./ppo_cartpole_tensorboard/"
    )

    # Setup callbacks
    eval_callback = EvalCallback(
        eval_env,
        best_model_save_path='./logs/best_model',
        log_path='./logs/',
        eval_freq=10000,
        deterministic=True,
        render=False
    )

    checkpoint_callback = CheckpointCallback(
        save_freq=10000,
        save_path='./logs/checkpoints/',
        name_prefix='ppo_cartpole'
    )

    # Execute training
    model.learn(
        total_timesteps=100000,
        callback=[eval_callback, checkpoint_callback]
    )

    # Evaluation
    mean_reward, std_reward = evaluate_policy(
        model, eval_env, n_eval_episodes=10
    )
    print(f"\nEvaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")

    # Save model
    model.save("ppo_cartpole_final")

    return model


# === Example 2: SAC for Continuous Control ===
def train_sac_pendulum():
    """
    SAC Pendulum environment training (continuous action space)

    Features:
    - Maximum entropy RL
    - Off-policy learning
    - Automatic temperature tuning
    """
    print("\nTraining SAC on Pendulum-v1")
    print("=" * 50)

    # Create environment
    env = gym.make('Pendulum-v1')

    # SAC model
    model = SAC(
        'MlpPolicy',
        env,
        learning_rate=3e-4,
        buffer_size=100000,
        learning_starts=1000,
        batch_size=256,
        tau=0.005,             # Soft update coefficient
        gamma=0.99,
        train_freq=1,
        gradient_steps=1,
        ent_coef='auto',       # Automatic entropy tuning
        verbose=1,
        tensorboard_log="./sac_pendulum_tensorboard/"
    )

    # Training
    model.learn(total_timesteps=50000)

    # Evaluation
    mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
    print(f"Evaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")

    # Save model
    model.save("sac_pendulum_final")

    return model


# === Example 3: Custom Environment with SB3 ===
class CustomGridWorld(gym.Env):
    """
    Custom grid world environment
    SB3-compatible Gym environment
    """

    def __init__(self, grid_size=5):
        super(CustomGridWorld, self).__init__()

        self.grid_size = grid_size
        self.agent_pos = [0, 0]
        self.goal_pos = [grid_size - 1, grid_size - 1]

        # Action space: up, down, left, right
        self.action_space = gym.spaces.Discrete(4)

        # Observation space: agent position (normalized)
        self.observation_space = gym.spaces.Box(
            low=0, high=1, shape=(2,), dtype=np.float32
        )

    def reset(self, seed=None):
        super().reset(seed=seed)
        self.agent_pos = [0, 0]
        return self._get_obs(), {}

    def _get_obs(self):
        return np.array(self.agent_pos, dtype=np.float32) / self.grid_size

    def step(self, action):
        # Execute action
        if action == 0 and self.agent_pos[1] < self.grid_size - 1:  # Up
            self.agent_pos[1] += 1
        elif action == 1 and self.agent_pos[1] > 0:  # Down
            self.agent_pos[1] -= 1
        elif action == 2 and self.agent_pos[0] < self.grid_size - 1:  # Right
            self.agent_pos[0] += 1
        elif action == 3 and self.agent_pos[0] > 0:  # Left
            self.agent_pos[0] -= 1

        # Compute rewards
        if self.agent_pos == self.goal_pos:
            reward = 1.0
            done = True
        else:
            reward = -0.01
            done = False

        return self._get_obs(), reward, done, False, {}


def train_custom_env():
    """Custom environment DQN training"""
    print("\nTraining DQN on Custom GridWorld")
    print("=" * 50)

    # Custom environment
    env = CustomGridWorld(grid_size=5)

    # DQN model
    model = DQN(
        'MlpPolicy',
        env,
        learning_rate=1e-3,
        buffer_size=10000,
        learning_starts=1000,
        batch_size=32,
        gamma=0.99,
        exploration_fraction=0.1,
        exploration_final_eps=0.02,
        verbose=1
    )

    # Training
    model.learn(total_timesteps=50000)

    # Evaluation
    mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
    print(f"Evaluation: Mean Reward = {mean_reward:.2f} +/- {std_reward:.2f}")

    return model


# === Example 4: Loading and Using Trained Model ===
def use_trained_model():
    """Load and use trained model"""
    print("\nUsing Trained Model")
    print("=" * 50)

    # Load model
    model = PPO.load("ppo_cartpole_final")

    # Execute environment
    env = gym.make('CartPole-v1', render_mode='rgb_array')

    obs, _ = env.reset()
    total_reward = 0

    for _ in range(500):
        # Deterministic action selection
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = env.step(action)
        total_reward += reward

        if terminated or truncated:
            break

    print(f"Episode reward: {total_reward}")
    env.close()


# === Example 5: Hyperparameter Tuning with Optuna ===
def hyperparameter_tuning():
    """
    Hyperparameter tuning using Optuna
    (Optional: requires optuna installation)
    """
    try:
        from stable_baselines3.common.env_util import make_vec_env
        import optuna
        from optuna.pruners import MedianPruner
        from optuna.samplers import TPESampler

        def objective(trial):
            """Optuna objective function"""
            # Suggest hyperparameters
            lr = trial.suggest_loguniform('lr', 1e-5, 1e-3)
            gamma = trial.suggest_uniform('gamma', 0.9, 0.9999)
            clip_range = trial.suggest_uniform('clip_range', 0.1, 0.4)

            # Environment andmodel
            env = make_vec_env('CartPole-v1', n_envs=4)
            model = PPO(
                'MlpPolicy', env,
                learning_rate=lr,
                gamma=gamma,
                clip_range=clip_range,
                verbose=0
            )

            # Training
            model.learn(total_timesteps=20000)

            # Evaluation
            eval_env = gym.make('CartPole-v1')
            mean_reward, _ = evaluate_policy(model, eval_env, n_eval_episodes=5)

            return mean_reward

        # Optuna study
        study = optuna.create_study(
            direction='maximize',
            sampler=TPESampler(),
            pruner=MedianPruner()
        )

        study.optimize(objective, n_trials=20, timeout=600)

        print("\nBest hyperparameters:")
        print(study.best_params)
        print(f"Best value: {study.best_value:.2f}")

    except ImportError:
        print("Optuna not installed. Skipping hyperparameter tuning.")
        print("Install with: pip install optuna")


# Main execution
if __name__ == "__main__":
    print("Stable-Baselines3 Practical Examples")
    print("=" * 50)

    # Example 1: PPO
    ppo_model = train_ppo_cartpole()

    # Example 2: SAC
    sac_model = train_sac_pendulum()

    # Example 3: Custom Environment
    custom_model = train_custom_env()

    # Example 4: Using trained model
    use_trained_model()

    # Example 5: Hyperparameter tuning (optional)
    # hyperparameter_tuning()

    print("\n" + "=" * 50)
    print("All examples completed!")
    print("Tensorboard logs saved. View with:")
    print("  tensorboard --logdir ./ppo_cartpole_tensorboard/")

SB3 Practical Notes: Vectorized environments accelerate training, and callbacks make monitoring and evaluation during training easy. TensorBoard logs allow visualization of learning curves, and hyperparameter tuning is effective with Optuna. Custom environments can be easily integrated if they comply with the Gym API.


5.7 Challenges and Solutions for Real-World Deployment

Major Challenges

Challenge Description Solutions
Sample Efficiency Real environment learning time/cost is high Pre-training in simulation, Model-Based RL, Transfer learning
Safety Failures during learning can be dangerous Safe RL, validation in simulation, human supervision
Sim-to-Real Gap Gap between simulation and real environment Domain Randomization, high-fidelity simulators
Partial Observability Cannot observe complete state Use LSTM/Transformer, belief state
Reward Design Difficult to design appropriate reward functions Inverse RL, imitation learning, reward shaping
Generalization Performance degradation outside training environment Diverse training data, Meta-RL, Domain adaptation

Best Practices

  1. Gradual Approach: simulation → Sim-to-Real → realEnvironment
  2. Model Validation: Multiple evaluation metrics, test with different environment settings
  3. Leverage Human Knowledge: Imitation learning, pre-training, reward shaping
  4. Safety Assurance: Constrained RL, fail-safe mechanisms
  5. Continuous Learning: Online learning, adaptive policy

Summary

In this chapter, we learned the following advanced RL methods and applications:

Next Steps

  1. Practical Projects: Creating custom environments with Stable-Baselines3
  2. Paper Reading: Read and implement the latest RL papers
  3. Competitions: Participate in Kaggle RL competitions, OpenAI Gym Leaderboard
  4. Explore Application Domains: Autonomous driving, healthcare, energy management, etc.

Reference Resources


Exercises

Exercise 5.1: Parallel Implementation of A3C

Problem: Python'smultiprocessingto implement full parallel A3C.

Hints:

Exercise 5.2: SAC Temperature Parameter Analysis

Problem: SAC's temperature parameter$\alpha$with fixed values versus automatic tuning, and analyze differences in learning curves and final performance.

Hints:

Exercise 5.3: Multi-Agent Cooperative Task

Problem: Implement a task where three agents cooperatively carry a heavy object. The object cannot be carried by a single agent and must be transported to a goal position by multiple agents working together.

Hints:

Exercise 5.4: Trading Bot Enhancement

Problem: Add the following features to the provided trading bot:

Hints:

Exercise 5.5: Custom Callback for Stable-Baselines3

Problem: Create a Stable-Baselines3 custom callback to perform the following during training:

Hints:


Disclaimer