EN | JP | Last sync: 2026-01-10

Chapter 4: Policy Gradient Methods

Direct Policy Optimization: REINFORCE, Actor-Critic, A2C, and PPO with Practical Implementations

Reading Time: 30-35 minutes Difficulty: Intermediate to Advanced Code Examples: 8 Exercises: 6

This chapter covers Policy Gradient Methods, the foundation of modern reinforcement learning. You will learn why direct policy optimization is powerful, master the mathematical foundations, and implement state-of-the-art algorithms including PPO with Stable-Baselines3.

Learning Objectives

By reading this chapter, you will be able to:


4.1 Why Policy Gradient?

4.1.1 Limitations of Value-Based Methods

In Chapters 2 and 3, we learned value-based methods (Q-learning, DQN). While powerful, they have inherent limitations:

Limitation Description Impact
Discrete actions only Requires $\arg\max_a Q(s,a)$ operation Cannot handle continuous control (robotics)
Deterministic policies $\epsilon$-greedy is a workaround Cannot learn optimal stochastic policies
High-dimensional actions Exponential action space Combinatorial explosion
Small policy changes Q-value changes can cause large policy shifts Unstable learning

"In rock-paper-scissors, the optimal policy is uniformly random. Value-based methods struggle to represent this naturally, while policy gradient methods handle it elegantly."

4.1.2 Direct Policy Optimization

Policy gradient methods directly optimize the policy $\pi_\theta(a|s)$ parameterized by $\theta$:

graph LR subgraph "Value-Based (DQN)" S1["State s"] --> Q["Q(s,a)"] Q --> AM["argmax"] AM --> A1["Action a"] style Q fill:#e74c3c,color:#fff end subgraph "Policy-Based" S2["State s"] --> P["Policy pi(a|s; theta)"] P --> A2["Action a (sampled)"] style P fill:#27ae60,color:#fff end

4.1.3 Advantages of Policy Gradient

  1. Continuous action spaces: Natural handling of robot joint angles, steering
  2. Stochastic policies: Built-in exploration, optimal for partial observability
  3. Smoother optimization: Small $\theta$ changes lead to small policy changes
  4. Convergence guarantees: Gradient ascent on well-defined objective

4.1.4 Policy Parameterization

For discrete actions, use softmax:

$$ \pi_\theta(a|s) = \frac{\exp(f_\theta(s, a))}{\sum_{a'} \exp(f_\theta(s, a'))} $$

For continuous actions, use Gaussian:

$$ \pi_\theta(a|s) = \mathcal{N}(\mu_\theta(s), \sigma_\theta(s)^2) $$

4.2 Policy Gradient Theorem

4.2.1 Objective Function

We want to maximize the expected return:

$$ J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}[R(\tau)] = \mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^{T} \gamma^t r_t\right] $$

where $\tau = (s_0, a_0, r_1, s_1, a_1, \ldots)$ is a trajectory sampled under policy $\pi_\theta$.

4.2.2 The Policy Gradient Theorem

The key insight: we can compute the gradient $\nabla_\theta J(\theta)$ without knowing environment dynamics!

$$ \nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}\left[\nabla_\theta \log \pi_\theta(a|s) \cdot Q^{\pi_\theta}(s, a)\right] $$

Or equivalently, using trajectories:

$$ \nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) \cdot G_t\right] $$

where $G_t = \sum_{t'=t}^{T} \gamma^{t'-t} r_{t'}$ is the return from time $t$.

4.2.3 Intuition Behind the Formula

graph TB subgraph "Policy Gradient Update" Action["Take action a"] Reward["Observe G_t"] GoodAction{{"G_t > 0?"}} Increase["Increase pi(a|s)"] Decrease["Decrease pi(a|s)"] Action --> Reward Reward --> GoodAction GoodAction -->|Yes| Increase GoodAction -->|No| Decrease style Increase fill:#27ae60,color:#fff style Decrease fill:#e74c3c,color:#fff end

4.3 REINFORCE Algorithm

4.3.1 Monte Carlo Policy Gradient

REINFORCE (Williams, 1992) estimates the policy gradient using complete episodes:

  1. Sample trajectory $\tau$ using $\pi_\theta$
  2. Compute returns $G_t$ for each timestep
  3. Update: $\theta \leftarrow \theta + \alpha \sum_{t} \nabla_\theta \log \pi_\theta(a_t|s_t) G_t$

4.3.2 High Variance Problem

REINFORCE suffers from high variance because $G_t$ varies significantly across episodes. The solution: baseline subtraction.

$$ \nabla_\theta J(\theta) = \mathbb{E}\left[\nabla_\theta \log \pi_\theta(a_t|s_t) (G_t - b(s_t))\right] $$

The optimal baseline is $b(s) = V^{\pi}(s)$, giving us the advantage:

$$ A^{\pi}(s, a) = Q^{\pi}(s, a) - V^{\pi}(s) $$

4.3.3 REINFORCE Implementation

# Requirements:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0
# - matplotlib>=3.7.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from collections import deque

print("=== REINFORCE Algorithm Implementation ===\n")

class PolicyNetwork(nn.Module):
    """
    Policy Network for REINFORCE

    Outputs action probabilities given a state.
    """
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        logits = self.fc3(x)
        return F.softmax(logits, dim=-1)


class ValueNetwork(nn.Module):
    """
    Value Network for baseline

    Estimates V(s) to reduce variance.
    """
    def __init__(self, state_dim, hidden_dim=128):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        return self.fc3(x)


class REINFORCEWithBaseline:
    """REINFORCE with learned baseline (value function)"""

    def __init__(self, state_dim, action_dim, lr_policy=0.001, lr_value=0.001, gamma=0.99):
        self.gamma = gamma
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.value = ValueNetwork(state_dim)

        self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=lr_policy)
        self.value_optimizer = optim.Adam(self.value.parameters(), lr=lr_value)

        # Episode storage
        self.saved_log_probs = []
        self.saved_values = []
        self.rewards = []

    def select_action(self, state):
        """Select action and store log probability"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs = self.policy(state_tensor)
        value = self.value(state_tensor)

        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()

        self.saved_log_probs.append(dist.log_prob(action))
        self.saved_values.append(value)

        return action.item()

    def update(self):
        """Update policy and value networks after episode"""
        R = 0
        returns = []

        # Compute returns (backwards)
        for r in reversed(self.rewards):
            R = r + self.gamma * R
            returns.insert(0, R)

        returns = torch.tensor(returns, dtype=torch.float32)

        # Normalize returns for stability
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        # Compute losses
        policy_losses = []
        value_losses = []

        for log_prob, value, G in zip(self.saved_log_probs, self.saved_values, returns):
            advantage = G - value.squeeze().detach()
            policy_losses.append(-log_prob * advantage)
            value_losses.append(F.mse_loss(value.squeeze(), G))

        # Update policy
        self.policy_optimizer.zero_grad()
        policy_loss = torch.stack(policy_losses).sum()
        policy_loss.backward()
        self.policy_optimizer.step()

        # Update value function
        self.value_optimizer.zero_grad()
        value_loss = torch.stack(value_losses).sum()
        value_loss.backward()
        self.value_optimizer.step()

        # Clear episode data
        self.saved_log_probs = []
        self.saved_values = []
        self.rewards = []

        return policy_loss.item(), value_loss.item()


# Training
print("Environment: CartPole-v1")
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

agent = REINFORCEWithBaseline(state_dim, action_dim, lr_policy=0.01, lr_value=0.01)

print(f"  State dimension: {state_dim}")
print(f"  Action dimension: {action_dim}")
print(f"  Policy parameters: {sum(p.numel() for p in agent.policy.parameters()):,}")
print(f"  Value parameters: {sum(p.numel() for p in agent.value.parameters()):,}")

num_episodes = 500
episode_rewards = []
moving_avg = deque(maxlen=100)

print("\nTraining REINFORCE with baseline...")
for episode in range(num_episodes):
    state, _ = env.reset()
    episode_reward = 0

    for t in range(500):
        action = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        agent.rewards.append(reward)
        episode_reward += reward
        state = next_state

        if done:
            break

    policy_loss, value_loss = agent.update()

    episode_rewards.append(episode_reward)
    moving_avg.append(episode_reward)

    if (episode + 1) % 100 == 0:
        avg = np.mean(moving_avg)
        print(f"Episode {episode+1:3d} | Avg Reward: {avg:.1f} | Policy Loss: {policy_loss:.3f}")

env.close()
print(f"\nFinal average (last 100): {np.mean(moving_avg):.1f}")
print("REINFORCE training complete!")

Expected Output:

=== REINFORCE Algorithm Implementation ===

Environment: CartPole-v1
  State dimension: 4
  Action dimension: 2
  Policy parameters: 17,026
  Value parameters: 16,897

Training REINFORCE with baseline...
Episode 100 | Avg Reward: 45.2 | Policy Loss: 12.345
Episode 200 | Avg Reward: 156.8 | Policy Loss: 5.678
Episode 300 | Avg Reward: 287.3 | Policy Loss: 2.345
Episode 400 | Avg Reward: 412.6 | Policy Loss: 1.234
Episode 500 | Avg Reward: 478.9 | Policy Loss: 0.567

Final average (last 100): 478.9
REINFORCE training complete!

4.4 Actor-Critic Methods

4.4.1 Combining Policy and Value Learning

Actor-Critic methods use two components:

Unlike REINFORCE (which waits for episode completion), Actor-Critic uses TD learning for online updates.

graph TB subgraph "Actor-Critic Architecture" State["State s_t"] --> Actor["Actor pi_theta(a|s)"] State --> Critic["Critic V_phi(s)"] Actor --> Action["Action a_t"] Critic --> Value["Value V(s_t)"] Action --> Env["Environment"] Env --> Reward["r_t, s_{t+1}"] Reward --> TDError["TD Error: delta = r + gamma*V(s') - V(s)"] Value --> TDError TDError --> ActorUpdate["Actor: theta += alpha * delta * grad log pi"] TDError --> CriticUpdate["Critic: phi -= beta * delta * grad V"] style Actor fill:#27ae60,color:#fff style Critic fill:#3498db,color:#fff style TDError fill:#f39c12,color:#fff end

4.4.2 TD Error as Advantage Estimate

The one-step TD error serves as an unbiased estimate of the advantage:

$$ \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) \approx A(s_t, a_t) $$

4.4.3 Actor-Critic Implementation

# Requirements:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym

print("=== Actor-Critic Implementation ===\n")

class ActorCriticNetwork(nn.Module):
    """
    Shared network with Actor and Critic heads

    Sharing early layers improves sample efficiency.
    """
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(ActorCriticNetwork, self).__init__()

        # Shared layers
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )

        # Actor head
        self.actor = nn.Linear(hidden_dim, action_dim)

        # Critic head
        self.critic = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)
        action_probs = F.softmax(self.actor(features), dim=-1)
        state_value = self.critic(features)
        return action_probs, state_value


class ActorCritic:
    """One-step Actor-Critic with TD learning"""

    def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
        self.gamma = gamma
        self.network = ActorCriticNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)

    def select_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs, value = self.network(state_tensor)

        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()

        return action.item(), dist.log_prob(action), value

    def update(self, log_prob, value, reward, next_state, done):
        """TD update at each step"""
        # Compute next state value
        if done:
            next_value = torch.tensor([0.0])
        else:
            next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
            with torch.no_grad():
                _, next_value = self.network(next_state_tensor)

        # TD error (advantage estimate)
        td_target = reward + self.gamma * next_value * (1 - float(done))
        td_error = td_target - value

        # Losses
        actor_loss = -log_prob * td_error.detach()  # Policy gradient
        critic_loss = td_error.pow(2)  # Value function MSE

        loss = actor_loss + 0.5 * critic_loss

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()


# Training
print("Training Actor-Critic on CartPole-v1...")
env = gym.make('CartPole-v1')
agent = ActorCritic(state_dim=4, action_dim=2, lr=0.002)

num_episodes = 300
episode_rewards = []

for episode in range(num_episodes):
    state, _ = env.reset()
    episode_reward = 0

    for t in range(500):
        action, log_prob, value = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        agent.update(log_prob, value, reward, next_state, done)

        episode_reward += reward
        state = next_state

        if done:
            break

    episode_rewards.append(episode_reward)

    if (episode + 1) % 50 == 0:
        avg = np.mean(episode_rewards[-100:])
        print(f"Episode {episode+1:3d} | Avg Reward: {avg:.1f}")

env.close()
print(f"\nFinal average: {np.mean(episode_rewards[-100:]):.1f}")
print("\nActor-Critic advantages over REINFORCE:")
print("  - Updates at each step (not episode end)")
print("  - Lower variance (uses TD rather than MC)")
print("  - Works for continuing tasks")

4.5 Advantage Actor-Critic (A2C)

4.5.1 Improvements over Basic Actor-Critic

A2C (Advantage Actor-Critic) adds several enhancements:

4.5.2 N-Step Returns

Instead of 1-step TD, use n-step returns:

$$ G_t^{(n)} = r_t + \gamma r_{t+1} + \cdots + \gamma^{n-1} r_{t+n-1} + \gamma^n V(s_{t+n}) $$

4.5.3 Entropy Regularization

Add entropy bonus to encourage exploration:

$$ L = -\mathbb{E}[\log \pi(a|s) A(s,a)] + \beta H(\pi(\cdot|s)) $$

where $H(\pi) = -\sum_a \pi(a|s) \log \pi(a|s)$ is the entropy.

4.5.4 A2C Implementation

# Requirements:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym

print("=== A2C (Advantage Actor-Critic) Implementation ===\n")

class A2CNetwork(nn.Module):
    """A2C Network with larger capacity"""

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(A2CNetwork, self).__init__()

        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )

        self.actor = nn.Linear(hidden_dim, action_dim)
        self.critic = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)
        logits = self.actor(features)
        value = self.critic(features)
        return logits, value


class A2C:
    """
    Advantage Actor-Critic with n-step returns and entropy regularization
    """

    def __init__(self, state_dim, action_dim, lr=0.0007, gamma=0.99,
                 n_steps=5, entropy_coef=0.01, value_coef=0.5):
        self.gamma = gamma
        self.n_steps = n_steps
        self.entropy_coef = entropy_coef
        self.value_coef = value_coef

        self.network = A2CNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)

    def select_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        logits, value = self.network(state_tensor)

        dist = torch.distributions.Categorical(logits=logits)
        action = dist.sample()

        return (action.item(), dist.log_prob(action),
                dist.entropy(), value)

    def compute_returns(self, rewards, values, dones, next_value):
        """Compute n-step returns and advantages"""
        returns = []
        R = next_value

        for step in reversed(range(len(rewards))):
            R = rewards[step] + self.gamma * R * (1 - dones[step])
            returns.insert(0, R)

        returns = torch.tensor(returns, dtype=torch.float32)
        values = torch.cat(values).squeeze()
        advantages = returns - values.detach()

        return returns, advantages

    def update(self, log_probs, entropies, values, returns, advantages):
        """A2C update with entropy regularization"""
        log_probs = torch.cat(log_probs)
        entropies = torch.cat(entropies)
        values = torch.cat(values).squeeze()

        # Actor loss with advantage
        actor_loss = -(log_probs * advantages.detach()).mean()

        # Critic loss
        critic_loss = F.mse_loss(values, returns)

        # Entropy bonus (negative because we maximize entropy)
        entropy_loss = -entropies.mean()

        # Combined loss
        loss = (actor_loss +
                self.value_coef * critic_loss +
                self.entropy_coef * entropy_loss)

        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
        self.optimizer.step()

        return actor_loss.item(), critic_loss.item(), entropies.mean().item()


# Training
print("Training A2C on CartPole-v1...")
env = gym.make('CartPole-v1')
agent = A2C(state_dim=4, action_dim=2, n_steps=5, entropy_coef=0.01)

print(f"  n_steps: {agent.n_steps}")
print(f"  entropy_coef: {agent.entropy_coef}")
print(f"  value_coef: {agent.value_coef}")

num_episodes = 500
episode_rewards = []

for episode in range(num_episodes):
    state, _ = env.reset()
    episode_reward = 0

    log_probs, entropies, values, rewards, dones = [], [], [], [], []

    done = False
    while not done:
        action, log_prob, entropy, value = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        log_probs.append(log_prob)
        entropies.append(entropy)
        values.append(value)
        rewards.append(reward)
        dones.append(float(done))

        episode_reward += reward
        state = next_state

        # Update every n_steps or at episode end
        if len(rewards) >= agent.n_steps or done:
            if done:
                next_value = 0
            else:
                with torch.no_grad():
                    _, next_value = agent.network(
                        torch.FloatTensor(next_state).unsqueeze(0))
                    next_value = next_value.item()

            returns, advantages = agent.compute_returns(
                rewards, values, dones, next_value)
            actor_loss, critic_loss, entropy = agent.update(
                log_probs, entropies, values, returns, advantages)

            log_probs, entropies, values, rewards, dones = [], [], [], [], []

    episode_rewards.append(episode_reward)

    if (episode + 1) % 100 == 0:
        avg = np.mean(episode_rewards[-100:])
        print(f"Episode {episode+1:3d} | Avg: {avg:.1f} | "
              f"Actor Loss: {actor_loss:.3f} | Entropy: {entropy:.3f}")

env.close()
print(f"\nFinal average: {np.mean(episode_rewards[-100:]):.1f}")

4.6 Proximal Policy Optimization (PPO)

4.6.1 Why Trust Regions Matter

A fundamental problem in policy gradient: large updates can destroy good policies.

"If we take too large a step in policy space, performance can collapse catastrophically and never recover."

TRPO (Trust Region Policy Optimization) addressed this by constraining KL divergence:

$$ \max_\theta \mathbb{E}\left[\frac{\pi_\theta(a|s)}{\pi_{\theta_{old}}(a|s)} A(s,a)\right] \quad \text{s.t. } D_{KL}(\pi_{\theta_{old}} || \pi_\theta) \leq \delta $$

However, TRPO requires complex second-order optimization (conjugate gradients, Fisher information matrix).

4.6.2 PPO Clipped Objective

PPO achieves similar stability with a simpler clipped objective:

$$ L^{CLIP}(\theta) = \mathbb{E}_t\left[\min\left(r_t(\theta) A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) A_t\right)\right] $$

where the probability ratio is:

$$ r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)} $$

4.6.3 How Clipping Works

The clip function prevents the ratio from moving too far from 1:

graph TB subgraph "PPO Clipping Mechanism" Ratio["Probability Ratio r(theta)"] Advantage{{"A > 0?"}} GoodAction["Good Action: want to increase pi"] BadAction["Bad Action: want to decrease pi"] ClipHigh["Clip at 1+epsilon (prevent too much increase)"] ClipLow["Clip at 1-epsilon (prevent too much decrease)"] Min["Take minimum of clipped and unclipped"] Ratio --> Advantage Advantage -->|Yes| GoodAction Advantage -->|No| BadAction GoodAction --> ClipHigh BadAction --> ClipLow ClipHigh --> Min ClipLow --> Min style ClipHigh fill:#27ae60,color:#fff style ClipLow fill:#e74c3c,color:#fff end

4.6.4 Generalized Advantage Estimation (GAE)

PPO typically uses GAE for advantage estimation, which balances bias and variance:

$$ \hat{A}_t^{GAE(\gamma, \lambda)} = \sum_{l=0}^{\infty} (\gamma\lambda)^l \delta_{t+l} $$

where $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$ is the TD error.

4.6.5 Complete PPO Implementation

# Requirements:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym

print("=== PPO (Proximal Policy Optimization) Implementation ===\n")

class PPONetwork(nn.Module):
    """PPO Actor-Critic Network"""

    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(PPONetwork, self).__init__()

        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh()
        )

        self.actor = nn.Linear(hidden_dim, action_dim)
        self.critic = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)
        logits = self.actor(features)
        value = self.critic(features)
        return logits, value

    def get_action_and_value(self, state, action=None):
        logits, value = self.forward(state)
        dist = torch.distributions.Categorical(logits=logits)

        if action is None:
            action = dist.sample()

        return action, dist.log_prob(action), dist.entropy(), value


class PPO:
    """
    Proximal Policy Optimization with:
    - Clipped objective
    - GAE for advantage estimation
    - Multiple epochs per update
    """

    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
                 gae_lambda=0.95, clip_epsilon=0.2, epochs=10,
                 batch_size=64, entropy_coef=0.01, value_coef=0.5):

        self.gamma = gamma
        self.gae_lambda = gae_lambda
        self.clip_epsilon = clip_epsilon
        self.epochs = epochs
        self.batch_size = batch_size
        self.entropy_coef = entropy_coef
        self.value_coef = value_coef

        self.network = PPONetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)

    def select_action(self, state):
        """Select action for data collection"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)

        with torch.no_grad():
            action, log_prob, _, value = self.network.get_action_and_value(
                state_tensor)

        return action.item(), log_prob.item(), value.item()

    def compute_gae(self, rewards, values, dones, next_value):
        """
        Compute Generalized Advantage Estimation

        GAE reduces variance while maintaining low bias.
        """
        advantages = []
        gae = 0

        values = values + [next_value]

        for step in reversed(range(len(rewards))):
            # TD error
            delta = (rewards[step] +
                    self.gamma * values[step + 1] * (1 - dones[step]) -
                    values[step])

            # GAE
            gae = delta + self.gamma * self.gae_lambda * (1 - dones[step]) * gae
            advantages.insert(0, gae)

        advantages = torch.tensor(advantages, dtype=torch.float32)
        returns = advantages + torch.tensor(values[:-1], dtype=torch.float32)

        return advantages, returns

    def update(self, states, actions, old_log_probs, returns, advantages):
        """
        PPO update with clipped objective

        Performs multiple epochs over collected data.
        """
        states = torch.FloatTensor(np.array(states))
        actions = torch.LongTensor(actions)
        old_log_probs = torch.FloatTensor(old_log_probs)

        # Normalize advantages
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        dataset_size = len(states)

        for epoch in range(self.epochs):
            # Shuffle and create mini-batches
            indices = np.random.permutation(dataset_size)

            for start in range(0, dataset_size, self.batch_size):
                end = start + self.batch_size
                batch_idx = indices[start:end]

                batch_states = states[batch_idx]
                batch_actions = actions[batch_idx]
                batch_old_log_probs = old_log_probs[batch_idx]
                batch_returns = returns[batch_idx]
                batch_advantages = advantages[batch_idx]

                # Get current policy values
                _, new_log_probs, entropy, values = \
                    self.network.get_action_and_value(batch_states, batch_actions)

                # Probability ratio
                ratio = torch.exp(new_log_probs - batch_old_log_probs)

                # Clipped surrogate objective
                surr1 = ratio * batch_advantages
                surr2 = torch.clamp(ratio,
                                   1 - self.clip_epsilon,
                                   1 + self.clip_epsilon) * batch_advantages

                actor_loss = -torch.min(surr1, surr2).mean()

                # Value loss
                critic_loss = F.mse_loss(values.squeeze(), batch_returns)

                # Entropy bonus
                entropy_loss = -entropy.mean()

                # Total loss
                loss = (actor_loss +
                       self.value_coef * critic_loss +
                       self.entropy_coef * entropy_loss)

                self.optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
                self.optimizer.step()

        return actor_loss.item(), critic_loss.item()


# Training
print("Training PPO on CartPole-v1\n")
env = gym.make('CartPole-v1')
agent = PPO(state_dim=4, action_dim=2, lr=3e-4, epochs=10)

print(f"Hyperparameters:")
print(f"  clip_epsilon: {agent.clip_epsilon}")
print(f"  gae_lambda: {agent.gae_lambda}")
print(f"  epochs per update: {agent.epochs}")
print(f"  batch_size: {agent.batch_size}")

num_iterations = 100
rollout_steps = 2048
episode_rewards = []
all_rewards = []

print("\nTraining...")
total_steps = 0

for iteration in range(num_iterations):
    # Collect rollout data
    states, actions, log_probs, rewards, values, dones = [], [], [], [], [], []

    state, _ = env.reset()
    episode_reward = 0

    for _ in range(rollout_steps):
        action, log_prob, value = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        states.append(state)
        actions.append(action)
        log_probs.append(log_prob)
        rewards.append(reward)
        values.append(value)
        dones.append(float(done))

        episode_reward += reward
        total_steps += 1

        state = next_state

        if done:
            all_rewards.append(episode_reward)
            episode_reward = 0
            state, _ = env.reset()

    # Compute GAE and returns
    _, _, next_value = agent.select_action(state)
    advantages, returns = agent.compute_gae(rewards, values, dones, next_value)

    # PPO update
    actor_loss, critic_loss = agent.update(
        states, actions, log_probs, returns, advantages)

    if (iteration + 1) % 10 == 0:
        avg_reward = np.mean(all_rewards[-100:]) if all_rewards else 0
        print(f"Iter {iteration+1:3d} | Steps: {total_steps:6d} | "
              f"Avg Reward: {avg_reward:.1f} | "
              f"Actor Loss: {actor_loss:.4f}")

env.close()
print(f"\nFinal average: {np.mean(all_rewards[-100:]):.1f}")
print("\nPPO Key Features:")
print("  - Clipped objective prevents destructive updates")
print("  - Multiple epochs improve sample efficiency")
print("  - GAE balances bias-variance in advantage estimation")
print("  - Simple to implement yet highly effective")

Expected Output:

=== PPO (Proximal Policy Optimization) Implementation ===

Training PPO on CartPole-v1

Hyperparameters:
  clip_epsilon: 0.2
  gae_lambda: 0.95
  epochs per update: 10
  batch_size: 64

Training...
Iter  10 | Steps:  20480 | Avg Reward: 156.3 | Actor Loss: 0.0234
Iter  20 | Steps:  40960 | Avg Reward: 287.5 | Actor Loss: 0.0156
Iter  30 | Steps:  61440 | Avg Reward: 398.2 | Actor Loss: 0.0089
Iter  40 | Steps:  81920 | Avg Reward: 456.7 | Actor Loss: 0.0045
Iter  50 | Steps: 102400 | Avg Reward: 482.1 | Actor Loss: 0.0023
...
Iter 100 | Steps: 204800 | Avg Reward: 498.7 | Actor Loss: 0.0012

Final average: 498.7

PPO Key Features:
  - Clipped objective prevents destructive updates
  - Multiple epochs improve sample efficiency
  - GAE balances bias-variance in advantage estimation
  - Simple to implement yet highly effective

4.6.6 Why PPO is So Popular

Aspect PPO Advantage
Stability Clipping prevents catastrophic updates
Simplicity First-order optimization only (no Fisher matrix)
Sample efficiency Multiple epochs reuse collected data
Generality Works across diverse tasks without tuning
Scalability Easy to parallelize across environments

Notable PPO applications:


4.7 Continuous Action Spaces

4.7.1 Gaussian Policy

For continuous actions (e.g., robot joint torques), we use a Gaussian distribution:

$$ \pi_\theta(a|s) = \mathcal{N}(\mu_\theta(s), \sigma_\theta(s)^2) $$

The network outputs:

4.7.2 Gaussian Policy Implementation

# Requirements:
# - Python 3.9+
# - gymnasium>=0.29.0
# - torch>=2.0.0
# - numpy>=1.24.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gymnasium as gym

print("=== Continuous Action Space with Gaussian Policy ===\n")

class ContinuousPolicyNetwork(nn.Module):
    """
    Policy network for continuous actions

    Outputs mean and log_std for Gaussian distribution.
    """

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(ContinuousPolicyNetwork, self).__init__()

        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )

        # Mean head
        self.mu = nn.Linear(hidden_dim, action_dim)

        # Log std (learned parameter or state-dependent)
        self.log_std = nn.Linear(hidden_dim, action_dim)

        # Value head
        self.value = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)

        mu = self.mu(features)
        log_std = self.log_std(features)
        log_std = torch.clamp(log_std, -20, 2)  # Numerical stability
        std = torch.exp(log_std)

        value = self.value(features)

        return mu, std, value


class ContinuousPPO:
    """PPO for continuous action spaces"""

    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99):
        self.gamma = gamma
        self.action_dim = action_dim

        self.network = ContinuousPolicyNetwork(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=lr)

    def select_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)

        with torch.no_grad():
            mu, std, value = self.network(state_tensor)

        # Sample from Gaussian
        dist = torch.distributions.Normal(mu, std)
        action = dist.sample()
        log_prob = dist.log_prob(action).sum(dim=-1)  # Sum over action dims

        return action.squeeze().numpy(), log_prob.item(), value.item()

    def evaluate_actions(self, states, actions):
        mu, std, values = self.network(states)

        dist = torch.distributions.Normal(mu, std)
        log_probs = dist.log_prob(actions).sum(dim=-1)
        entropy = dist.entropy().sum(dim=-1)

        return log_probs, entropy, values


# Demonstration with Pendulum environment
print("Testing on Pendulum-v1 (continuous control)\n")

env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]

print(f"Environment: Pendulum-v1")
print(f"  State space: {env.observation_space}")
print(f"  Action space: {env.action_space}")

agent = ContinuousPPO(state_dim, action_dim)

# Test episode
state, _ = env.reset()
episode_reward = 0

print("\nSampling actions from Gaussian policy:")
for step in range(5):
    action, log_prob, value = agent.select_action(state)

    # Clip action to valid range
    action_clipped = np.clip(action, -2.0, 2.0)

    print(f"  Step {step}: action={action_clipped[0]:.3f}, "
          f"log_prob={log_prob:.3f}, value={value:.3f}")

    next_state, reward, terminated, truncated, _ = env.step(action_clipped)
    episode_reward += reward
    state = next_state

env.close()
print(f"\nTest episode reward: {episode_reward:.1f}")
print("\nGaussian Policy Features:")
print("  - Natural for continuous actions (joint torques, steering)")
print("  - Exploration via sampling (std controls exploration)")
print("  - Reparameterization trick enables gradient flow")

4.8 Stable-Baselines3 Practical Examples

4.8.1 Introduction to Stable-Baselines3

Stable-Baselines3 (SB3) is a reliable, well-tested implementation of RL algorithms. It provides production-ready code that follows best practices.

4.8.2 PPO with Stable-Baselines3

# Requirements:
# - Python 3.9+
# - gymnasium>=0.29.0
# - stable-baselines3>=2.1.0
# - tensorboard>=2.14.0 (optional, for logging)

import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback
import numpy as np

print("=== Stable-Baselines3 PPO Example ===\n")

# Create environment
env_id = "LunarLander-v2"
print(f"Environment: {env_id}")

# Single environment for simple training
env = gym.make(env_id)
print(f"  Observation space: {env.observation_space}")
print(f"  Action space: {env.action_space}")
env.close()

# Create vectorized environment for better performance
n_envs = 4
vec_env = make_vec_env(env_id, n_envs=n_envs)
print(f"  Parallel environments: {n_envs}")

# Create PPO model with custom hyperparameters
model = PPO(
    "MlpPolicy",           # Policy network type
    vec_env,
    learning_rate=3e-4,    # Learning rate
    n_steps=2048,          # Steps per environment per update
    batch_size=64,         # Mini-batch size
    n_epochs=10,           # Epochs per update
    gamma=0.99,            # Discount factor
    gae_lambda=0.95,       # GAE lambda
    clip_range=0.2,        # PPO clip range
    ent_coef=0.01,         # Entropy coefficient
    vf_coef=0.5,           # Value function coefficient
    verbose=1,
    tensorboard_log="./ppo_lunarlander_tensorboard/"
)

print("\nPPO Model Configuration:")
print(f"  Policy: {model.policy.__class__.__name__}")
print(f"  Learning rate: {model.learning_rate}")
print(f"  Clip range: {model.clip_range}")
print(f"  GAE lambda: {model.gae_lambda}")

# Training
print("\nTraining PPO...")
total_timesteps = 100000

model.learn(
    total_timesteps=total_timesteps,
    progress_bar=True  # Requires tqdm
)

print("\nTraining complete!")

# Evaluation
print("\nEvaluating trained model...")
eval_env = gym.make(env_id)
mean_reward, std_reward = evaluate_policy(
    model,
    eval_env,
    n_eval_episodes=10,
    deterministic=True
)
print(f"Mean reward: {mean_reward:.2f} +/- {std_reward:.2f}")
print(f"(Solved threshold for LunarLander: 200)")

# Save model
model.save("ppo_lunarlander")
print("\nModel saved to 'ppo_lunarlander.zip'")

# Load and test
print("\nLoading saved model and testing...")
loaded_model = PPO.load("ppo_lunarlander")

# Test episode with rendering info
state, _ = eval_env.reset()
episode_reward = 0
done = False

while not done:
    action, _ = loaded_model.predict(state, deterministic=True)
    state, reward, terminated, truncated, _ = eval_env.step(action)
    done = terminated or truncated
    episode_reward += reward

print(f"Test episode reward: {episode_reward:.2f}")

vec_env.close()
eval_env.close()

print("\nSB3 PPO Features:")
print("  - Production-ready implementation")
print("  - Vectorized environments for parallelism")
print("  - TensorBoard logging built-in")
print("  - Easy save/load functionality")
print("  - Extensive documentation and community")

Expected Output:

=== Stable-Baselines3 PPO Example ===

Environment: LunarLander-v2
  Observation space: Box([-inf, ...], [inf, ...], (8,), float32)
  Action space: Discrete(4)
  Parallel environments: 4

PPO Model Configuration:
  Policy: ActorCriticPolicy
  Learning rate: 0.0003
  Clip range: 0.2
  GAE lambda: 0.95

Training PPO...
| rollout/           |          |
|    ep_len_mean     | 89.3     |
|    ep_rew_mean     | -156     |
| time/              |          |
|    fps             | 1245     |
|    iterations      | 1        |
...
| rollout/           |          |
|    ep_len_mean     | 287      |
|    ep_rew_mean     | 234      |

Training complete!

Evaluating trained model...
Mean reward: 256.34 +/- 23.12
(Solved threshold for LunarLander: 200)

Model saved to 'ppo_lunarlander.zip'

Loading saved model and testing...
Test episode reward: 267.45

4.8.3 TensorBoard Monitoring

# To view TensorBoard logs, run in terminal:
# tensorboard --logdir ./ppo_lunarlander_tensorboard/

# Key metrics to monitor:
# - rollout/ep_rew_mean: Average episode reward
# - rollout/ep_len_mean: Average episode length
# - train/policy_loss: Actor loss
# - train/value_loss: Critic loss
# - train/entropy_loss: Entropy for exploration
# - train/approx_kl: KL divergence (should stay small)
# - train/clip_fraction: How often clipping activates

print("TensorBoard Metrics Guide:")
print("  ep_rew_mean: Should increase over time")
print("  approx_kl: If too high (>0.02), reduce learning rate")
print("  clip_fraction: ~10-20% is typical")
print("  entropy: Should decrease as policy becomes deterministic")

4.9 Summary and Exercises

Chapter Summary

Topic Key Points
Policy Gradient Direct optimization of $\pi_\theta$; handles continuous actions
REINFORCE Monte Carlo PG; high variance; use baseline for stability
Actor-Critic Combines policy and value learning; TD-based updates
A2C N-step returns; entropy regularization; synchronous training
PPO Clipped objective; GAE; multiple epochs; industry standard
Continuous Control Gaussian policy with learned mean and std

Algorithm Comparison

Algorithm Update Type Variance Sample Efficiency Complexity
REINFORCE Episode end High Low Simple
Actor-Critic Each step Medium Medium Medium
A2C N-steps Medium Medium Medium
PPO Batch (multi-epoch) Low High Medium

Exercises

Exercise 4.1: Baseline Comparison

Task: Compare REINFORCE with and without baseline on CartPole.

Steps:

Exercise 4.2: PPO Hyperparameter Study

Task: Investigate PPO hyperparameter sensitivity on LunarLander.

Parameters to vary:

Exercise 4.3: Continuous Control

Task: Train PPO on Pendulum-v1 using Gaussian policy.

Requirements:

Exercise 4.4: Entropy Scheduling

Task: Implement entropy coefficient annealing.

Approach:

Exercise 4.5: SB3 Custom Environment

Task: Train PPO on a custom Gymnasium environment.

Steps:

Exercise 4.6: Multi-Environment Training

Task: Compare single vs. parallel environment training.

Metrics:


Next Chapter Preview

In Chapter 5, we will explore Advanced Applications and Frontiers of RL:

Coming Topics:
- Model-based RL and world models
- Multi-agent reinforcement learning
- Offline RL and imitation learning
- RL from human feedback (RLHF)
- Real-world deployment challenges
- Current research directions

Disclaimer