This chapter covers Reward Design and Optimization Objectives. You will learn Can explain the difference between sparse and apply PID control theory to reward design.
3.1 Importance of Reward Design
The behavior of reinforcement learning agents is determined by the reward function. "What to reward" is the most critical design decision. In process control, we need to simultaneously consider multiple objectives such as yield maximization, safety, energy efficiency, and product quality, not just yield alone.
π‘ Principles of Reward Design
- Clarity: Clearly define what the agent should optimize
- Measurability: Use metrics that can be calculated in real-time
- Balance: Consider trade-offs among multiple objectives
- Safety: Strong penalties for dangerous behaviors
Example 1: Comparison of Sparse vs Dense Rewards
Understand the difference between sparse rewards (reward only when goal is achieved) and dense rewards (reward at each step).
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces
# ===================================
# Example 1: Sparse vs Dense Reward Comparison
# ===================================
class ReactorEnvironmentSparseReward(gym.Env):
"""Sparse reward: Reactor that gives reward only when target temperature is achieved"""
def __init__(self, target_temp=350.0, temp_tolerance=5.0):
super().__init__()
self.target_temp = target_temp # Target temperature [K]
self.temp_tolerance = temp_tolerance # Tolerance [K]
# State space: [current temperature, difference from target]
self.observation_space = spaces.Box(
low=np.array([250.0, -150.0]),
high=np.array([450.0, 150.0]),
dtype=np.float32
)
# Action space: heating power [-50, +50] W
self.action_space = spaces.Box(
low=-50.0, high=50.0, shape=(1,), dtype=np.float32
)
self.reset()
def reset(self, seed=None):
super().reset(seed=seed)
# Initial temperature: far from target
self.current_temp = 300.0 + np.random.uniform(-20, 20)
self.steps = 0
return self._get_obs(), {}
def _get_obs(self):
temp_error = self.target_temp - self.current_temp
return np.array([self.current_temp, temp_error], dtype=np.float32)
def step(self, action):
heating_power = float(action[0])
# Temperature dynamics (first-order lag system)
tau = 10.0 # Time constant
dt = 1.0 # Time step
temp_change = (heating_power - 0.1 * (self.current_temp - 298)) * dt / tau
self.current_temp += temp_change
self.current_temp = np.clip(self.current_temp, 250, 450)
# Sparse reward: +1 only when within target range
temp_error = abs(self.target_temp - self.current_temp)
reward = 1.0 if temp_error <= self.temp_tolerance else 0.0
self.steps += 1
terminated = temp_error <= self.temp_tolerance
truncated = self.steps >= 100
return self._get_obs(), reward, terminated, truncated, {}
class ReactorEnvironmentDenseReward(gym.Env):
"""Dense reward: Reward based on proximity to target at each step"""
def __init__(self, target_temp=350.0):
super().__init__()
self.target_temp = target_temp
self.observation_space = spaces.Box(
low=np.array([250.0, -150.0]),
high=np.array([450.0, 150.0]),
dtype=np.float32
)
self.action_space = spaces.Box(
low=-50.0, high=50.0, shape=(1,), dtype=np.float32
)
self.reset()
def reset(self, seed=None):
super().reset(seed=seed)
self.current_temp = 300.0 + np.random.uniform(-20, 20)
self.steps = 0
self.prev_error = abs(self.target_temp - self.current_temp)
return self._get_obs(), {}
def _get_obs(self):
temp_error = self.target_temp - self.current_temp
return np.array([self.current_temp, temp_error], dtype=np.float32)
def step(self, action):
heating_power = float(action[0])
tau = 10.0
dt = 1.0
temp_change = (heating_power - 0.1 * (self.current_temp - 298)) * dt / tau
self.current_temp += temp_change
self.current_temp = np.clip(self.current_temp, 250, 450)
# Dense reward: Continuous reward based on error
temp_error = abs(self.target_temp - self.current_temp)
# Reward = error reduction + bonus for small error
error_reduction = self.prev_error - temp_error
proximity_bonus = -0.01 * temp_error # Higher reward when error is smaller
reward = error_reduction + proximity_bonus
self.prev_error = temp_error
self.steps += 1
terminated = temp_error <= 5.0
truncated = self.steps >= 100
return self._get_obs(), reward, terminated, truncated, {}
def compare_reward_types(n_episodes=5):
"""Compare learning efficiency of sparse vs dense rewards"""
envs = {
'Sparse Reward': ReactorEnvironmentSparseReward(),
'Dense Reward': ReactorEnvironmentDenseReward()
}
results = {}
for env_name, env in envs.items():
episode_rewards = []
episode_lengths = []
print(f"\n=== {env_name} ===")
for episode in range(n_episodes):
obs, _ = env.reset(seed=episode)
total_reward = 0
steps = 0
# Random agent (baseline before training)
for _ in range(100):
action = env.action_space.sample()
obs, reward, terminated, truncated, _ = env.step(action)
total_reward += reward
steps += 1
if terminated or truncated:
break
episode_rewards.append(total_reward)
episode_lengths.append(steps)
print(f"Episode {episode+1}: Reward={total_reward:.2f}, Steps={steps}")
results[env_name] = {
'rewards': episode_rewards,
'lengths': episode_lengths
}
# Visualization
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
# Cumulative reward comparison
ax1.bar(results.keys(), [np.mean(r['rewards']) for r in results.values()],
color=['#3498db', '#2ecc71'], alpha=0.7, edgecolor='black', linewidth=1.5)
ax1.set_ylabel('Average Total Reward')
ax1.set_title('Reward Signal Strength Comparison')
ax1.grid(axis='y', alpha=0.3)
# Episode length comparison
ax2.bar(results.keys(), [np.mean(r['lengths']) for r in results.values()],
color=['#3498db', '#2ecc71'], alpha=0.7, edgecolor='black', linewidth=1.5)
ax2.set_ylabel('Average Episode Length')
ax2.set_title('Convergence Speed')
ax2.grid(axis='y', alpha=0.3)
plt.tight_layout()
return fig, results
# Execute
fig, results = compare_reward_types(n_episodes=10)
plt.show()
print("\n=== Summary ===")
print(f"Sparse Reward - Mean: {np.mean(results['Sparse Reward']['rewards']):.2f}")
print(f"Dense Reward - Mean: {np.mean(results['Dense Reward']['rewards']):.2f}")
print(f"\nDense rewards provide richer learning signals and improve learning efficiency")
Sparse Reward - Mean: 0.20 (almost no reward)
Dense Reward - Mean: 15.34 (reward at each step)
Dense rewards provide effective gradient information in early learning stages
π‘ Practical Selection
Sparse Reward: When the goal is clear and achievement/non-achievement is binary (batch completion, specification compliance, etc.)
Dense Reward: When continuous improvement is important and intermediate progress needs evaluation (temperature control, composition control, etc.)
3.2 PID-Style Control via Reward Shaping
By incorporating the concepts of conventional PID control into reward design, we can teach agents appropriate control behaviors. We design reward components corresponding to proportional (P), integral (I), and derivative (D) terms.
Example 2: PID-Inspired Reward Function
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces
# ===================================
# Example 2: PID-Inspired Reward Function
# ===================================
class PIDRewardReactor(gym.Env):
"""Reactor environment with reward design based on PID control theory"""
def __init__(self, target_temp=350.0, kp=1.0, ki=0.1, kd=0.5):
super().__init__()
self.target_temp = target_temp
# PID reward weights
self.kp = kp # Proportional gain (current error)
self.ki = ki # Integral gain (accumulated error)
self.kd = kd # Derivative gain (error rate of change)
self.observation_space = spaces.Box(
low=np.array([250.0, -150.0, -1000.0, -50.0]),
high=np.array([450.0, 150.0, 1000.0, 50.0]),
dtype=np.float32
)
self.action_space = spaces.Box(
low=-50.0, high=50.0, shape=(1,), dtype=np.float32
)
self.reset()
def reset(self, seed=None):
super().reset(seed=seed)
self.current_temp = 300.0 + np.random.uniform(-20, 20)
self.cumulative_error = 0.0
self.prev_error = self.target_temp - self.current_temp
self.steps = 0
self.temp_history = [self.current_temp]
self.reward_history = []
self.reward_components = {'P': [], 'I': [], 'D': [], 'total': []}
return self._get_obs(), {}
def _get_obs(self):
error = self.target_temp - self.current_temp
return np.array([
self.current_temp,
error,
self.cumulative_error,
error - self.prev_error
], dtype=np.float32)
def step(self, action):
heating_power = float(action[0])
# Temperature dynamics
tau = 10.0
dt = 1.0
temp_change = (heating_power - 0.1 * (self.current_temp - 298)) * dt / tau
self.current_temp += temp_change
self.current_temp = np.clip(self.current_temp, 250, 450)
# Error calculation
error = self.target_temp - self.current_temp
self.cumulative_error += error * dt
error_derivative = (error - self.prev_error) / dt
# PID-style reward components
reward_p = -self.kp * abs(error) # Proportional: higher reward for smaller error
reward_i = -self.ki * abs(self.cumulative_error) # Integral: penalty for offset error
reward_d = -self.kd * abs(error_derivative) # Derivative: suppress rapid changes
reward = reward_p + reward_i + reward_d
# Record history
self.temp_history.append(self.current_temp)
self.reward_history.append(reward)
self.reward_components['P'].append(reward_p)
self.reward_components['I'].append(reward_i)
self.reward_components['D'].append(reward_d)
self.reward_components['total'].append(reward)
self.prev_error = error
self.steps += 1
terminated = abs(error) <= 2.0 and self.steps >= 20
truncated = self.steps >= 100
return self._get_obs(), reward, terminated, truncated, {}
def plot_pid_components(self):
"""Visualize PID reward components"""
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
steps = np.arange(len(self.temp_history))
# Temperature profile
ax = axes[0]
ax.plot(steps, self.temp_history, 'b-', linewidth=2, label='Temperature')
ax.axhline(self.target_temp, color='red', linestyle='--',
linewidth=2, label=f'Target ({self.target_temp}K)')
ax.fill_between(steps,
self.target_temp - 5,
self.target_temp + 5,
alpha=0.2, color='green', label='Β±5K tolerance')
ax.set_xlabel('Time Step')
ax.set_ylabel('Temperature [K]')
ax.set_title('Temperature Control Profile')
ax.legend()
ax.grid(alpha=0.3)
# Reward component breakdown
ax = axes[1]
steps_reward = np.arange(len(self.reward_components['P']))
ax.plot(steps_reward, self.reward_components['P'], 'b-',
linewidth=2, label=f'Proportional (kp={self.kp})')
ax.plot(steps_reward, self.reward_components['I'], 'g-',
linewidth=2, label=f'Integral (ki={self.ki})')
ax.plot(steps_reward, self.reward_components['D'], 'orange',
linewidth=2, label=f'Derivative (kd={self.kd})')
ax.plot(steps_reward, self.reward_components['total'], 'r-',
linewidth=2.5, label='Total Reward', alpha=0.7)
ax.axhline(0, color='black', linestyle='-', linewidth=0.5)
ax.set_xlabel('Time Step')
ax.set_ylabel('Reward Component Value')
ax.set_title('PID Reward Components Breakdown')
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
return fig
def test_pid_reward_tuning():
"""Compare reward behavior with different PID gains"""
pid_configs = [
{'kp': 1.0, 'ki': 0.0, 'kd': 0.0, 'name': 'P only'},
{'kp': 1.0, 'ki': 0.1, 'kd': 0.0, 'name': 'PI'},
{'kp': 1.0, 'ki': 0.1, 'kd': 0.5, 'name': 'PID (balanced)'},
]
results = {}
for config in pid_configs:
env = PIDRewardReactor(
target_temp=350.0,
kp=config['kp'],
ki=config['ki'],
kd=config['kd']
)
obs, _ = env.reset(seed=42)
# Simple control rule (action to maximize reward)
for _ in range(50):
# Action proportional to error (simple PID control)
error = obs[1]
action = np.array([np.clip(2.0 * error, -50, 50)])
obs, reward, terminated, truncated, _ = env.step(action)
if terminated or truncated:
break
results[config['name']] = env
print(f"\n=== {config['name']} ===")
print(f"Final temperature: {env.current_temp:.2f}K")
print(f"Final error: {abs(350 - env.current_temp):.2f}K")
print(f"Total reward: {sum(env.reward_history):.2f}")
# Visualization
for name, env in results.items():
fig = env.plot_pid_components()
plt.suptitle(f'{name} Configuration', fontsize=14, y=1.02)
plt.show()
# Execute
test_pid_reward_tuning()
P only - Final error: 3.2K, Total reward: -156.7
PI - Final error: 0.8K, Total reward: -189.3
PID - Final error: 0.5K, Total reward: -198.1
PID reward eliminates steady-state error and suppresses overshoot
3.3 Multi-Objective Reward Function
In actual processes, we need to simultaneously consider multiple objectives such as yield, energy, and safety. We apply the concepts of weighted sum and Pareto optimization to reward design.
Example 3: Multi-Objective Optimization (Yield + Energy + Safety)
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces
# ===================================
# Example 3: Multi-Objective Reward Function
# ===================================
class MultiObjectiveReactor(gym.Env):
"""Reactor environment considering yield, energy, and safety"""
def __init__(self, w_yield=1.0, w_energy=0.3, w_safety=2.0):
super().__init__()
# Multi-objective weights
self.w_yield = w_yield # Importance of yield
self.w_energy = w_energy # Importance of energy efficiency
self.w_safety = w_safety # Importance of safety
# State space: [temperature, pressure, concentration, yield]
self.observation_space = spaces.Box(
low=np.array([300.0, 1.0, 0.0, 0.0]),
high=np.array([400.0, 5.0, 2.0, 1.0]),
dtype=np.float32
)
# Action space: [temperature change, pressure change]
self.action_space = spaces.Box(
low=np.array([-5.0, -0.2]),
high=np.array([5.0, 0.2]),
dtype=np.float32
)
self.reset()
def reset(self, seed=None):
super().reset(seed=seed)
# Initial state
self.temperature = 320.0 # K
self.pressure = 2.0 # bar
self.concentration = 1.0 # mol/L
self.yield_value = 0.0
self.cumulative_energy = 0.0
self.steps = 0
self.history = {
'temp': [self.temperature],
'pressure': [self.pressure],
'yield': [self.yield_value],
'reward_yield': [],
'reward_energy': [],
'reward_safety': [],
'reward_total': []
}
return self._get_obs(), {}
def _get_obs(self):
return np.array([
self.temperature,
self.pressure,
self.concentration,
self.yield_value
], dtype=np.float32)
def _calculate_yield(self):
"""Calculate yield (function of temperature and pressure)"""
# Optimal temperature: 350K, Optimal pressure: 3.0 bar
temp_factor = np.exp(-((self.temperature - 350) / 20)**2)
pressure_factor = 1.0 - 0.3 * ((self.pressure - 3.0) / 2)**2
return 0.9 * temp_factor * pressure_factor * (1 - np.exp(-self.steps / 20))
def step(self, action):
temp_change, pressure_change = action
# State update
self.temperature += temp_change
self.temperature = np.clip(self.temperature, 300, 400)
self.pressure += pressure_change
self.pressure = np.clip(self.pressure, 1.0, 5.0)
self.yield_value = self._calculate_yield()
self.concentration *= 0.95 # Concentration decrease
# Multi-objective reward components
# (1) Yield reward: higher reward for higher yield
reward_yield = self.w_yield * self.yield_value
# (2) Energy reward: higher reward for smaller temperature/pressure changes
energy_cost = abs(temp_change) / 5.0 + abs(pressure_change) / 0.2
reward_energy = -self.w_energy * energy_cost
self.cumulative_energy += energy_cost
# (3) Safety reward: distance from dangerous region (high temperature, high pressure)
temp_danger = max(0, self.temperature - 380) # Dangerous above 380K
pressure_danger = max(0, self.pressure - 4.5) # Dangerous above 4.5 bar
safety_penalty = temp_danger + 10 * pressure_danger
reward_safety = -self.w_safety * safety_penalty
# Total reward
reward = reward_yield + reward_energy + reward_safety
# Record history
self.history['temp'].append(self.temperature)
self.history['pressure'].append(self.pressure)
self.history['yield'].append(self.yield_value)
self.history['reward_yield'].append(reward_yield)
self.history['reward_energy'].append(reward_energy)
self.history['reward_safety'].append(reward_safety)
self.history['reward_total'].append(reward)
self.steps += 1
# Termination conditions
terminated = self.yield_value >= 0.85 or safety_penalty > 20
truncated = self.steps >= 50
return self._get_obs(), reward, terminated, truncated, {}
def plot_multi_objective_analysis(self):
"""Analyze multi-objective optimization"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
steps = np.arange(len(self.history['temp']))
# (1) Temperature and pressure profile
ax = axes[0, 0]
ax2 = ax.twinx()
l1 = ax.plot(steps, self.history['temp'], 'r-', linewidth=2, label='Temperature')
ax.axhspan(380, 400, alpha=0.2, color='red', label='Danger zone (T)')
ax.set_xlabel('Time Step')
ax.set_ylabel('Temperature [K]', color='r')
ax.tick_params(axis='y', labelcolor='r')
l2 = ax2.plot(steps, self.history['pressure'], 'b-', linewidth=2, label='Pressure')
ax2.axhspan(4.5, 5.0, alpha=0.2, color='blue', label='Danger zone (P)')
ax2.set_ylabel('Pressure [bar]', color='b')
ax2.tick_params(axis='y', labelcolor='b')
lines = l1 + l2
labels = [l.get_label() for l in lines]
ax.legend(lines, labels, loc='upper left')
ax.set_title('Process Variables with Safety Zones')
ax.grid(alpha=0.3)
# (2) Yield evolution
ax = axes[0, 1]
ax.plot(steps, self.history['yield'], 'g-', linewidth=2.5)
ax.axhline(0.85, color='orange', linestyle='--', linewidth=2, label='Target yield')
ax.fill_between(steps, 0, 0.85, alpha=0.1, color='green')
ax.set_xlabel('Time Step')
ax.set_ylabel('Yield')
ax.set_title('Yield Evolution')
ax.legend()
ax.grid(alpha=0.3)
# (3) Reward component breakdown
ax = axes[1, 0]
reward_steps = np.arange(len(self.history['reward_yield']))
ax.plot(reward_steps, self.history['reward_yield'], 'g-',
linewidth=2, label=f'Yield (w={self.w_yield})')
ax.plot(reward_steps, self.history['reward_energy'], 'b-',
linewidth=2, label=f'Energy (w={self.w_energy})')
ax.plot(reward_steps, self.history['reward_safety'], 'r-',
linewidth=2, label=f'Safety (w={self.w_safety})')
ax.set_xlabel('Time Step')
ax.set_ylabel('Reward Component')
ax.set_title('Multi-Objective Reward Breakdown')
ax.legend()
ax.grid(alpha=0.3)
# (4) Cumulative total reward
ax = axes[1, 1]
cumulative_reward = np.cumsum(self.history['reward_total'])
ax.plot(reward_steps, cumulative_reward, 'purple', linewidth=2.5)
ax.fill_between(reward_steps, 0, cumulative_reward, alpha=0.3, color='purple')
ax.set_xlabel('Time Step')
ax.set_ylabel('Cumulative Total Reward')
ax.set_title('Overall Performance')
ax.grid(alpha=0.3)
plt.tight_layout()
return fig
def compare_weight_configurations():
"""Compare behavior with different weight settings"""
configs = [
{'w_yield': 1.0, 'w_energy': 0.0, 'w_safety': 0.0, 'name': 'Yield only'},
{'w_yield': 1.0, 'w_energy': 0.5, 'w_safety': 0.0, 'name': 'Yield + Energy'},
{'w_yield': 1.0, 'w_energy': 0.3, 'w_safety': 2.0, 'name': 'Balanced (all)'},
]
results = []
for config in configs:
env = MultiObjectiveReactor(
w_yield=config['w_yield'],
w_energy=config['w_energy'],
w_safety=config['w_safety']
)
obs, _ = env.reset(seed=42)
# Simple control rule: move toward target temperature and pressure
target_temp = 350.0
target_pressure = 3.0
for _ in range(50):
temp_error = target_temp - obs[0]
pressure_error = target_pressure - obs[1]
action = np.array([
np.clip(0.5 * temp_error, -5, 5),
np.clip(0.3 * pressure_error, -0.2, 0.2)
])
obs, reward, terminated, truncated, _ = env.step(action)
if terminated or truncated:
break
results.append({
'name': config['name'],
'env': env,
'final_yield': env.yield_value,
'total_energy': env.cumulative_energy,
'max_temp': max(env.history['temp']),
'total_reward': sum(env.history['reward_total'])
})
print(f"\n=== {config['name']} ===")
print(f"Final yield: {env.yield_value:.3f}")
print(f"Total energy cost: {env.cumulative_energy:.2f}")
print(f"Max temperature: {max(env.history['temp']):.1f}K")
print(f"Total reward: {sum(env.history['reward_total']):.2f}")
# Visualize each configuration
for result in results:
fig = result['env'].plot_multi_objective_analysis()
plt.suptitle(f"{result['name']} Configuration", fontsize=14, y=1.00)
plt.show()
return results
# Execute
results = compare_weight_configurations()
Yield only - Yield: 0.892, Energy: 45.6, Max temp: 395K
Yield + Energy - Yield: 0.867, Energy: 28.3, Max temp: 378K
Balanced (all) - Yield: 0.853, Energy: 22.1, Max temp: 368K
Trade-offs can be controlled through weight adjustment
β οΈ Practical Guidelines for Weight Settings
- Safety: Set the highest weight (w=2.0-5.0) to strongly avoid dangerous regions
- Yield: Moderate weight (w=1.0) as baseline for primary objective
- Energy: Auxiliary optimization target (w=0.1-0.5)
3.4 Intrinsic Motivation (Exploration Bonus)
To encourage agents to actively explore unknown state spaces, we incorporate intrinsic motivation into the reward. This example demonstrates the implementation of curiosity-driven learning.
Example 4: Reward with Exploration Bonus
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces
from collections import defaultdict
# ===================================
# Example 4: Intrinsic Motivation (Exploration Bonus)
# ===================================
class ExplorationBonusReactor(gym.Env):
"""Reactor environment with exploration bonus"""
def __init__(self, exploration_bonus_weight=0.2, state_discretization=10):
super().__init__()
self.exploration_bonus_weight = exploration_bonus_weight
self.state_discretization = state_discretization
# State visitation count (record exploration level)
self.visit_counts = defaultdict(int)
# State space: [temperature, pressure, catalyst concentration]
self.observation_space = spaces.Box(
low=np.array([300.0, 1.0, 0.1]),
high=np.array([400.0, 5.0, 1.0]),
dtype=np.float32
)
# Action space: [temperature change, pressure change, catalyst addition]
self.action_space = spaces.Box(
low=np.array([-5.0, -0.2, -0.05]),
high=np.array([5.0, 0.2, 0.05]),
dtype=np.float32
)
self.reset()
def reset(self, seed=None):
super().reset(seed=seed)
self.temperature = 320.0
self.pressure = 2.0
self.catalyst = 0.5
self.yield_value = 0.0
self.steps = 0
self.history = {
'states': [],
'extrinsic_rewards': [],
'intrinsic_rewards': [],
'total_rewards': [],
'visit_counts': []
}
return self._get_obs(), {}
def _get_obs(self):
return np.array([
self.temperature,
self.pressure,
self.catalyst
], dtype=np.float32)
def _discretize_state(self, state):
"""Discretize state (for visit counting)"""
temp_bin = int((state[0] - 300) / 100 * self.state_discretization)
pressure_bin = int((state[1] - 1.0) / 4.0 * self.state_discretization)
catalyst_bin = int((state[2] - 0.1) / 0.9 * self.state_discretization)
return (
np.clip(temp_bin, 0, self.state_discretization - 1),
np.clip(pressure_bin, 0, self.state_discretization - 1),
np.clip(catalyst_bin, 0, self.state_discretization - 1)
)
def _calculate_exploration_bonus(self, state):
"""Calculate exploration bonus (inverse of visit count)"""
discrete_state = self._discretize_state(state)
visit_count = self.visit_counts[discrete_state]
# Bonus = k / sqrt(N + 1) (higher bonus for fewer visits)
bonus = 1.0 / np.sqrt(visit_count + 1)
return bonus
def _calculate_yield(self):
"""Calculate yield"""
temp_factor = np.exp(-((self.temperature - 350) / 25)**2)
pressure_factor = 1.0 - 0.2 * ((self.pressure - 3.0) / 2)**2
catalyst_factor = self.catalyst / (0.2 + self.catalyst)
return 0.9 * temp_factor * pressure_factor * catalyst_factor
def step(self, action):
temp_change, pressure_change, catalyst_change = action
# State update
self.temperature += temp_change
self.temperature = np.clip(self.temperature, 300, 400)
self.pressure += pressure_change
self.pressure = np.clip(self.pressure, 1.0, 5.0)
self.catalyst += catalyst_change
self.catalyst = np.clip(self.catalyst, 0.1, 1.0)
self.yield_value = self._calculate_yield()
current_state = self._get_obs()
# (1) Extrinsic reward: Reward based on task achievement
reward_extrinsic = self.yield_value
# (2) Intrinsic reward: Exploration bonus
exploration_bonus = self._calculate_exploration_bonus(current_state)
reward_intrinsic = self.exploration_bonus_weight * exploration_bonus
# Total reward
reward = reward_extrinsic + reward_intrinsic
# Update visit count
discrete_state = self._discretize_state(current_state)
self.visit_counts[discrete_state] += 1
# Record history
self.history['states'].append(current_state.copy())
self.history['extrinsic_rewards'].append(reward_extrinsic)
self.history['intrinsic_rewards'].append(reward_intrinsic)
self.history['total_rewards'].append(reward)
self.history['visit_counts'].append(len(self.visit_counts))
self.steps += 1
terminated = self.yield_value >= 0.88
truncated = self.steps >= 100
return current_state, reward, terminated, truncated, {}
def plot_exploration_analysis(self):
"""Analyze exploration behavior"""
fig = plt.figure(figsize=(14, 10))
# (1) State space exploration trajectory (3D)
ax = fig.add_subplot(221, projection='3d')
states = np.array(self.history['states'])
scatter = ax.scatter(states[:, 0], states[:, 1], states[:, 2],
c=range(len(states)), cmap='viridis',
s=50, alpha=0.6, edgecolor='black', linewidth=0.5)
ax.set_xlabel('Temperature [K]')
ax.set_ylabel('Pressure [bar]')
ax.set_zlabel('Catalyst [mol/L]')
ax.set_title('State Space Exploration Trajectory')
plt.colorbar(scatter, ax=ax, label='Time Step', shrink=0.6)
# (2) Extrinsic vs Intrinsic reward
ax = fig.add_subplot(222)
steps = np.arange(len(self.history['extrinsic_rewards']))
ax.plot(steps, self.history['extrinsic_rewards'], 'b-',
linewidth=2, label='Extrinsic (task reward)', alpha=0.7)
ax.plot(steps, self.history['intrinsic_rewards'], 'orange',
linewidth=2, label='Intrinsic (exploration bonus)', alpha=0.7)
ax.plot(steps, self.history['total_rewards'], 'g-',
linewidth=2.5, label='Total reward')
ax.set_xlabel('Time Step')
ax.set_ylabel('Reward')
ax.set_title('Reward Decomposition')
ax.legend()
ax.grid(alpha=0.3)
# (3) Exploration progress (unique states)
ax = fig.add_subplot(223)
ax.plot(steps, self.history['visit_counts'], 'purple', linewidth=2.5)
ax.fill_between(steps, 0, self.history['visit_counts'], alpha=0.3, color='purple')
ax.set_xlabel('Time Step')
ax.set_ylabel('Number of Unique States Visited')
ax.set_title('Exploration Progress')
ax.grid(alpha=0.3)
# (4) Visitation heatmap (temperature vs pressure, catalyst near 0.5)
ax = fig.add_subplot(224)
# Create grid and count visits
temp_bins = np.linspace(300, 400, 20)
pressure_bins = np.linspace(1, 5, 20)
heatmap = np.zeros((len(pressure_bins)-1, len(temp_bins)-1))
for state in self.history['states']:
if 0.4 <= state[2] <= 0.6: # Catalyst concentration near 0.5
temp_idx = np.digitize(state[0], temp_bins) - 1
pressure_idx = np.digitize(state[1], pressure_bins) - 1
if 0 <= temp_idx < len(temp_bins)-1 and 0 <= pressure_idx < len(pressure_bins)-1:
heatmap[pressure_idx, temp_idx] += 1
im = ax.imshow(heatmap, extent=[300, 400, 1, 5], aspect='auto',
origin='lower', cmap='YlOrRd', interpolation='bilinear')
ax.set_xlabel('Temperature [K]')
ax.set_ylabel('Pressure [bar]')
ax.set_title('State Visitation Heatmap (Catalyst β 0.5)')
plt.colorbar(im, ax=ax, label='Visit Count')
plt.tight_layout()
return fig
def compare_with_without_exploration_bonus():
"""Compare with and without exploration bonus"""
configs = [
{'bonus_weight': 0.0, 'name': 'No exploration bonus'},
{'bonus_weight': 0.3, 'name': 'With exploration bonus'}
]
results = []
for config in configs:
env = ExplorationBonusReactor(
exploration_bonus_weight=config['bonus_weight'],
state_discretization=10
)
obs, _ = env.reset(seed=42)
# Random agent (to see exploration effect)
for _ in range(100):
action = env.action_space.sample()
obs, reward, terminated, truncated, _ = env.step(action)
if terminated or truncated:
break
unique_states = len(env.visit_counts)
final_yield = env.yield_value
results.append({
'name': config['name'],
'env': env,
'unique_states': unique_states,
'final_yield': final_yield
})
print(f"\n=== {config['name']} ===")
print(f"Unique states explored: {unique_states}")
print(f"Final yield: {final_yield:.3f}")
print(f"Exploration efficiency: {unique_states / env.steps:.3f} states/step")
# Visualization
for result in results:
fig = result['env'].plot_exploration_analysis()
plt.suptitle(f"{result['name']}", fontsize=14, y=1.00)
plt.show()
return results
# Execute
results = compare_with_without_exploration_bonus()
print("\n=== Exploration Bonus Impact ===")
print(f"Without bonus: {results[0]['unique_states']} states explored")
print(f"With bonus: {results[1]['unique_states']} states explored")
print(f"Improvement: {(results[1]['unique_states'] - results[0]['unique_states']) / results[0]['unique_states'] * 100:.1f}%")
Without bonus: 78 states explored, Final yield: 0.743
With bonus: 145 states explored, Final yield: 0.812
Improvement: 85.9% more states explored
Exploration bonus enables broader state space exploration and discovery of better solutions
3.5 Curriculum Learning (Progressive Difficulty Adjustment)
When learning complex tasks, curriculum learningβgradually increasing difficulty from easy tasksβis effective. This implementation demonstrates progressively tightening reward function targets.
Example 5: Progressive Goal Setting
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces
# ===================================
# Example 5: Curriculum Learning
# ===================================
class CurriculumReactor(gym.Env):
"""Reactor environment with progressively increasing difficulty"""
def __init__(self, curriculum_level=1):
super().__init__()
self.curriculum_level = curriculum_level
self._update_curriculum_parameters()
self.observation_space = spaces.Box(
low=np.array([300.0, -50.0, 0.0, 0]),
high=np.array([400.0, 50.0, 1.0, 5]),
dtype=np.float32
)
self.action_space = spaces.Box(
low=-10.0, high=10.0, shape=(1,), dtype=np.float32
)
self.reset()
def _update_curriculum_parameters(self):
"""Adjust parameters based on curriculum level"""
curricula = {
1: { # Beginner: Easy target, large tolerance
'target_yield': 0.60,
'tolerance': 0.10,
'disturbance_std': 0.0,
'time_constant': 5.0, # Fast response
'description': 'Easy: Low target, no disturbance'
},
2: { # Intermediate: Moderate target, normal tolerance
'target_yield': 0.75,
'tolerance': 0.05,
'disturbance_std': 1.0,
'time_constant': 10.0,
'description': 'Medium: Higher target, small disturbance'
},
3: { # Advanced: High target, strict tolerance, with disturbances
'target_yield': 0.85,
'tolerance': 0.03,
'disturbance_std': 2.0,
'time_constant': 15.0, # Slow response
'description': 'Hard: High target, large disturbance, slow response'
},
4: { # Expert: Maximum target, extremely strict conditions
'target_yield': 0.90,
'tolerance': 0.02,
'disturbance_std': 3.0,
'time_constant': 20.0,
'description': 'Expert: Maximum target, heavy disturbance'
},
}
params = curricula.get(self.curriculum_level, curricula[1])
self.target_yield = params['target_yield']
self.tolerance = params['tolerance']
self.disturbance_std = params['disturbance_std']
self.time_constant = params['time_constant']
self.description = params['description']
def set_curriculum_level(self, level):
"""Change curriculum level"""
self.curriculum_level = np.clip(level, 1, 4)
self._update_curriculum_parameters()
print(f"\nπ Curriculum Level {self.curriculum_level}: {self.description}")
def reset(self, seed=None):
super().reset(seed=seed)
self.temperature = 320.0 + np.random.uniform(-10, 10)
self.current_yield = 0.0
self.steps = 0
self.history = {
'temp': [self.temperature],
'yield': [self.current_yield],
'rewards': [],
'disturbances': []
}
return self._get_obs(), {}
def _get_obs(self):
yield_error = self.target_yield - self.current_yield
return np.array([
self.temperature,
yield_error,
self.current_yield,
self.curriculum_level
], dtype=np.float32)
def _calculate_yield(self):
"""Calculate yield (function of temperature)"""
optimal_temp = 350.0
temp_factor = np.exp(-((self.temperature - optimal_temp) / 25)**2)
return 0.95 * temp_factor
def step(self, action):
heating_power = float(action[0])
# Add disturbance (larger at higher curriculum levels)
disturbance = np.random.normal(0, self.disturbance_std)
# Temperature dynamics (response speed varies with time constant)
dt = 1.0
temp_change = (heating_power + disturbance - 0.1 * (self.temperature - 298)) * dt / self.time_constant
self.temperature += temp_change
self.temperature = np.clip(self.temperature, 300, 400)
self.current_yield = self._calculate_yield()
# Reward design: Reward based on target achievement
yield_error = abs(self.target_yield - self.current_yield)
# Progressive reward: Large reward within tolerance
if yield_error <= self.tolerance:
reward = 10.0 + (self.tolerance - yield_error) * 50 # Bonus
else:
reward = -yield_error * 10 # Penalty
# Energy cost of action
reward -= 0.01 * abs(heating_power)
# Record history
self.history['temp'].append(self.temperature)
self.history['yield'].append(self.current_yield)
self.history['rewards'].append(reward)
self.history['disturbances'].append(disturbance)
self.steps += 1
# Success condition: Maintain within target tolerance for 10 steps
recent_yields = self.history['yield'][-10:]
success = all(abs(self.target_yield - y) <= self.tolerance for y in recent_yields) if len(recent_yields) == 10 else False
terminated = success
truncated = self.steps >= 100
return self._get_obs(), reward, terminated, truncated, {'success': success}
def plot_curriculum_performance(self):
"""Visualize curriculum learning progress"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
steps = np.arange(len(self.history['temp']))
# (1) Temperature profile
ax = axes[0, 0]
ax.plot(steps, self.history['temp'], 'r-', linewidth=2)
ax.axhline(350, color='green', linestyle='--', linewidth=2, label='Optimal temp (350K)')
ax.set_xlabel('Time Step')
ax.set_ylabel('Temperature [K]')
ax.set_title(f'Temperature Control (Level {self.curriculum_level})')
ax.legend()
ax.grid(alpha=0.3)
# (2) Yield profile
ax = axes[0, 1]
ax.plot(steps, self.history['yield'], 'b-', linewidth=2, label='Actual yield')
ax.axhline(self.target_yield, color='green', linestyle='--',
linewidth=2, label=f'Target ({self.target_yield:.2f})')
ax.axhspan(self.target_yield - self.tolerance,
self.target_yield + self.tolerance,
alpha=0.2, color='green', label=f'Tolerance (Β±{self.tolerance:.2f})')
ax.set_xlabel('Time Step')
ax.set_ylabel('Yield')
ax.set_title(f'Yield Tracking (Level {self.curriculum_level})')
ax.legend()
ax.grid(alpha=0.3)
# (3) Reward evolution
ax = axes[1, 0]
reward_steps = np.arange(len(self.history['rewards']))
ax.plot(reward_steps, self.history['rewards'], 'purple', linewidth=2, alpha=0.7)
ax.axhline(0, color='black', linestyle='-', linewidth=0.5)
ax.set_xlabel('Time Step')
ax.set_ylabel('Reward')
ax.set_title('Reward Signal')
ax.grid(alpha=0.3)
# (4) Disturbance impact
ax = axes[1, 1]
ax.plot(reward_steps, self.history['disturbances'], 'orange', linewidth=1.5, alpha=0.7)
ax.fill_between(reward_steps, 0, self.history['disturbances'], alpha=0.3, color='orange')
ax.axhline(0, color='black', linestyle='-', linewidth=0.5)
ax.set_xlabel('Time Step')
ax.set_ylabel('Disturbance [W]')
ax.set_title(f'External Disturbance (Ο={self.disturbance_std:.1f})')
ax.grid(alpha=0.3)
plt.tight_layout()
return fig
def progressive_curriculum_training():
"""Demonstrate progressive curriculum learning"""
results = {}
for level in range(1, 5):
print(f"\n{'='*60}")
print(f"Training at Curriculum Level {level}")
print(f"{'='*60}")
env = CurriculumReactor(curriculum_level=level)
# Simple control rule (PID-style)
obs, _ = env.reset(seed=42)
success_count = 0
episodes = 3
for ep in range(episodes):
obs, _ = env.reset(seed=42 + ep)
for _ in range(100):
# Temperature adjustment toward target yield
yield_error = obs[1] # target - current
optimal_temp = 350.0
temp_error = optimal_temp - obs[0]
# Action: Based on temperature error and yield error
action = np.array([np.clip(2.0 * temp_error + 5.0 * yield_error, -10, 10)])
obs, reward, terminated, truncated, info = env.step(action)
if terminated:
if info.get('success', False):
success_count += 1
break
if truncated:
break
print(f"Episode {ep+1}: Final yield={env.current_yield:.3f}, "
f"Target={env.target_yield:.3f}, "
f"Success={'β' if info.get('success', False) else 'β'}")
results[level] = {
'env': env,
'success_rate': success_count / episodes,
'final_yield': env.current_yield
}
# Visualization
fig = env.plot_curriculum_performance()
plt.suptitle(f"Curriculum Level {level}: {env.description}", fontsize=13, y=1.00)
plt.show()
# Summary
print(f"\n{'='*60}")
print("Curriculum Learning Summary")
print(f"{'='*60}")
for level, result in results.items():
print(f"Level {level}: Success rate = {result['success_rate']*100:.0f}%, "
f"Final yield = {result['final_yield']:.3f}")
return results
# Execute
results = progressive_curriculum_training()
Level 1: Success rate = 100%, Final yield = 0.612
Level 2: Success rate = 67%, Final yield = 0.762
Level 3: Success rate = 33%, Final yield = 0.838
Level 4: Success rate = 0%, Final yield = 0.876
Progressively increasing difficulty promotes learning of complex tasks
3.6 Inverse Reinforcement Learning
Understand the fundamental concepts of Inverse RL (IRL), which estimates reward functions from expert operator operation histories.
Example 6: Reward Estimation from Expert Trajectories
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
# ===================================
# Example 6: Inverse Reinforcement Learning
# ===================================
class ExpertDemonstrationCollector:
"""Generate operation trajectories of expert (ideal PID control)"""
def __init__(self, target_temp=350.0):
self.target_temp = target_temp
def expert_policy(self, current_temp, prev_error, integral_error):
"""Expert behavior via PID control"""
# PID gains
kp = 2.0
ki = 0.1
kd = 0.5
error = self.target_temp - current_temp
derivative = error - prev_error
action = kp * error + ki * integral_error + kd * derivative
return np.clip(action, -10, 10)
def collect_demonstrations(self, n_episodes=5, episode_length=50):
"""Collect expert operation trajectories"""
demonstrations = []
for ep in range(n_episodes):
trajectory = {
'states': [],
'actions': [],
'next_states': []
}
# Initial state
temp = 320.0 + np.random.uniform(-10, 10)
prev_error = self.target_temp - temp
integral_error = 0.0
for step in range(episode_length):
# Current state
state = np.array([temp, self.target_temp - temp, integral_error])
# Expert action
action = self.expert_policy(temp, prev_error, integral_error)
# State transition (simple reactor model)
tau = 10.0
dt = 1.0
temp_change = (action - 0.1 * (temp - 298)) * dt / tau
temp += temp_change
temp = np.clip(temp, 300, 400)
error = self.target_temp - temp
integral_error += error * dt
prev_error = error
# Next state
next_state = np.array([temp, error, integral_error])
trajectory['states'].append(state)
trajectory['actions'].append(action)
trajectory['next_states'].append(next_state)
demonstrations.append(trajectory)
return demonstrations
class InverseRLRewardEstimator:
"""Reward function estimation via Inverse RL"""
def __init__(self, demonstrations):
self.demonstrations = demonstrations
# Reward function parameters (linear model)
# R(s) = w1 * f1(s) + w2 * f2(s) + w3 * f3(s)
self.feature_names = [
'Temperature error',
'Absolute integral error',
'Action smoothness'
]
def extract_features(self, state, action, next_state):
"""Extract features from state-action pair"""
# Features:
# f1: Absolute temperature error (smaller is better)
# f2: Absolute integral error (indicator of steady-state error)
# f3: Action magnitude (energy cost)
f1 = -abs(state[1]) # Temperature error
f2 = -abs(state[2]) # Integral error
f3 = -abs(action) # Action magnitude
return np.array([f1, f2, f3])
def compute_reward(self, features, weights):
"""Calculate reward as weighted sum of features"""
return np.dot(features, weights)
def estimate_reward_weights(self):
"""Estimate reward function weights from expert trajectories"""
# Extract features from all trajectories
all_features = []
for demo in self.demonstrations:
for i in range(len(demo['states'])):
features = self.extract_features(
demo['states'][i],
demo['actions'][i],
demo['next_states'][i]
)
all_features.append(features)
all_features = np.array(all_features)
# Objective: Estimate weights such that expert trajectories receive maximum reward
# Simplified version: Minimize variance of weighted features (expert is consistent)
def objective(weights):
"""Calculate variance of weighted features"""
rewards = all_features @ weights
return np.var(rewards) # Minimize variance (consistency)
# Optimization
w_init = np.array([1.0, 0.5, 0.2])
bounds = [(0, 10) for _ in range(3)]
result = minimize(objective, w_init, bounds=bounds, method='L-BFGS-B')
estimated_weights = result.x
# Normalize
estimated_weights = estimated_weights / np.linalg.norm(estimated_weights)
return estimated_weights
def visualize_reward_function(self, weights):
"""Visualize estimated reward function"""
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
# Sensitivity of reward to each feature
for i, (ax, feature_name) in enumerate(zip(axes, self.feature_names)):
# Scan over feature range
if i == 0: # Temperature error
feature_range = np.linspace(-50, 0, 100)
elif i == 1: # Integral error
feature_range = np.linspace(-100, 0, 100)
else: # Action
feature_range = np.linspace(-10, 0, 100)
rewards = weights[i] * feature_range
ax.plot(feature_range, rewards, linewidth=2.5, color=['b', 'g', 'orange'][i])
ax.fill_between(feature_range, 0, rewards, alpha=0.3, color=['b', 'g', 'orange'][i])
ax.set_xlabel(feature_name)
ax.set_ylabel(f'Reward contribution (w={weights[i]:.3f})')
ax.set_title(f'{feature_name}')
ax.grid(alpha=0.3)
ax.axhline(0, color='black', linestyle='-', linewidth=0.5)
plt.tight_layout()
return fig
def demonstrate_inverse_rl():
"""Demonstrate Inverse RL"""
print("=== Step 1: Collect Expert Trajectories ===")
collector = ExpertDemonstrationCollector(target_temp=350.0)
demonstrations = collector.collect_demonstrations(n_episodes=10, episode_length=50)
print(f"Collected {len(demonstrations)} expert demonstrations")
print(f"Each demonstration has {len(demonstrations[0]['states'])} steps")
# Visualize expert trajectories
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
for i, demo in enumerate(demonstrations[:3]): # Show only first 3
temps = [s[0] for s in demo['states']]
actions = demo['actions']
ax1.plot(temps, alpha=0.7, linewidth=2, label=f'Episode {i+1}')
ax2.plot(actions, alpha=0.7, linewidth=2, label=f'Episode {i+1}')
ax1.axhline(350, color='red', linestyle='--', linewidth=2, label='Target')
ax1.set_xlabel('Time Step')
ax1.set_ylabel('Temperature [K]')
ax1.set_title('Expert Temperature Trajectories')
ax1.legend()
ax1.grid(alpha=0.3)
ax2.axhline(0, color='black', linestyle='-', linewidth=0.5)
ax2.set_xlabel('Time Step')
ax2.set_ylabel('Action (Heating Power)')
ax2.set_title('Expert Actions')
ax2.legend()
ax2.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("\n=== Step 2: Estimate Reward Function ===")
estimator = InverseRLRewardEstimator(demonstrations)
estimated_weights = estimator.estimate_reward_weights()
print("\nEstimated reward function weights:")
for name, weight in zip(estimator.feature_names, estimated_weights):
print(f" {name:25s}: {weight:.4f}")
# Visualize reward function
fig = estimator.visualize_reward_function(estimated_weights)
plt.suptitle('Estimated Reward Function from Expert Demonstrations', fontsize=13, y=1.02)
plt.show()
print("\n=== Step 3: Interpret Estimated Reward Function ===")
print("Weight magnitudes indicate the importance of each objective:")
print(f" - Temperature error minimization is {'the most' if np.argmax(estimated_weights) == 0 else ''} important")
print(f" - Steady-state error elimination is {'the most' if np.argmax(estimated_weights) == 1 else ''} important")
print(f" - Energy efficiency is {'the most' if np.argmax(estimated_weights) == 2 else ''} important")
return demonstrations, estimated_weights
# Execute
demonstrations, weights = demonstrate_inverse_rl()
Estimated reward function weights:
Temperature error: 0.8123
Absolute integral error: 0.4567
Action smoothness: 0.2341
Expert's intent (objective function) is reverse-engineered from trajectory data
π‘ Practical Applications of IRL
Implicit operational policies can be extracted from experienced operator operation logs and leveraged for agent learning. This is particularly effective when explicit reward design is difficult.
3.7 Evaluation and Comparison of Reward Functions
Learn how to quantitatively evaluate and select the optimal design among multiple designed reward functions.
Example 7: Reward Function Performance Evaluation Framework
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces
# ===================================
# Example 7: Reward Function Evaluation and Comparison
# ===================================
class ReactorBenchmark(gym.Env):
"""Standard reactor environment for reward function evaluation"""
def __init__(self, reward_function_type='basic'):
super().__init__()
self.reward_function_type = reward_function_type
self.observation_space = spaces.Box(
low=np.array([300.0, -50.0, 0.0]),
high=np.array([400.0, 50.0, 1.0]),
dtype=np.float32
)
self.action_space = spaces.Box(
low=-10.0, high=10.0, shape=(1,), dtype=np.float32
)
self.target_temp = 350.0
self.reset()
def reset(self, seed=None):
super().reset(seed=seed)
self.temperature = 320.0 + np.random.uniform(-10, 10)
self.prev_error = self.target_temp - self.temperature
self.integral_error = 0.0
self.steps = 0
self.metrics = {
'tracking_errors': [],
'energy_consumption': [],
'safety_violations': 0,
'rewards': []
}
return self._get_obs(), {}
def _get_obs(self):
error = self.target_temp - self.temperature
return np.array([self.temperature, error, self.integral_error], dtype=np.float32)
def _reward_basic(self, error, action):
"""Basic reward: error only"""
return -abs(error)
def _reward_pid_inspired(self, error, action):
"""PID-style reward"""
r_p = -1.0 * abs(error)
r_i = -0.1 * abs(self.integral_error)
r_d = -0.5 * abs(error - self.prev_error)
return r_p + r_i + r_d
def _reward_multi_objective(self, error, action):
"""Multi-objective reward"""
r_tracking = -abs(error)
r_energy = -0.05 * abs(action)
r_safety = -2.0 * max(0, self.temperature - 380)
return r_tracking + r_energy + r_safety
def _reward_shaped(self, error, action):
"""Shaped reward (dense reward)"""
# Progress bonus
progress = abs(self.prev_error) - abs(error)
r_progress = 2.0 * progress
# Bonus for proximity to target
proximity_bonus = 5.0 * np.exp(-abs(error) / 5.0)
# Action smoothness penalty
r_smoothness = -0.02 * abs(action)
return r_progress + proximity_bonus + r_smoothness
def step(self, action):
heating_power = float(action[0])
# Temperature dynamics
tau = 10.0
dt = 1.0
disturbance = np.random.normal(0, 1.0)
temp_change = (heating_power + disturbance - 0.1 * (self.temperature - 298)) * dt / tau
self.temperature += temp_change
self.temperature = np.clip(self.temperature, 300, 400)
error = self.target_temp - self.temperature
self.integral_error += error * dt
# Select reward function
reward_functions = {
'basic': self._reward_basic,
'pid_inspired': self._reward_pid_inspired,
'multi_objective': self._reward_multi_objective,
'shaped': self._reward_shaped
}
reward_func = reward_functions.get(self.reward_function_type, self._reward_basic)
reward = reward_func(error, heating_power)
# Record metrics
self.metrics['tracking_errors'].append(abs(error))
self.metrics['energy_consumption'].append(abs(heating_power))
if self.temperature > 380:
self.metrics['safety_violations'] += 1
self.metrics['rewards'].append(reward)
self.prev_error = error
self.steps += 1
terminated = abs(error) <= 2.0 and self.steps >= 20
truncated = self.steps >= 100
return self._get_obs(), reward, terminated, truncated, {}
class RewardFunctionEvaluator:
"""Quantitative evaluation of reward functions"""
def __init__(self):
self.reward_types = ['basic', 'pid_inspired', 'multi_objective', 'shaped']
self.results = {}
def evaluate_reward_function(self, reward_type, n_episodes=10):
"""Run evaluation experiment with specified reward function"""
env = ReactorBenchmark(reward_function_type=reward_type)
episode_metrics = []
for ep in range(n_episodes):
obs, _ = env.reset(seed=ep)
# Simple PID control rule
for _ in range(100):
error = obs[1]
integral = obs[2]
action = np.array([np.clip(2.0 * error + 0.1 * integral, -10, 10)])
obs, reward, terminated, truncated, _ = env.step(action)
if terminated or truncated:
break
episode_metrics.append(env.metrics)
# Aggregate statistics
aggregated = {
'mae': np.mean([np.mean(m['tracking_errors']) for m in episode_metrics]),
'rmse': np.sqrt(np.mean([np.mean(np.array(m['tracking_errors'])**2) for m in episode_metrics])),
'total_energy': np.mean([np.sum(m['energy_consumption']) for m in episode_metrics]),
'safety_violations': np.mean([m['safety_violations'] for m in episode_metrics]),
'total_reward': np.mean([np.sum(m['rewards']) for m in episode_metrics]),
'convergence_time': np.mean([np.argmax(np.array(m['tracking_errors']) < 5.0) for m in episode_metrics])
}
return aggregated
def compare_all_reward_functions(self, n_episodes=10):
"""Compare and evaluate all reward functions"""
print("="*70)
print("Reward Function Comparison Benchmark")
print("="*70)
for reward_type in self.reward_types:
print(f"\nEvaluating: {reward_type}")
results = self.evaluate_reward_function(reward_type, n_episodes=n_episodes)
self.results[reward_type] = results
print(f" MAE (tracking error): {results['mae']:.3f}")
print(f" RMSE: {results['rmse']:.3f}")
print(f" Total energy: {results['total_energy']:.1f}")
print(f" Safety violations: {results['safety_violations']:.1f}")
print(f" Total reward: {results['total_reward']:.1f}")
print(f" Convergence time: {results['convergence_time']:.1f} steps")
return self.results
def visualize_comparison(self):
"""Visualize comparison results"""
metrics = ['mae', 'rmse', 'total_energy', 'safety_violations', 'convergence_time']
metric_labels = [
'MAE [K]',
'RMSE [K]',
'Total Energy',
'Safety Violations',
'Convergence Time [steps]'
]
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
axes = axes.flatten()
for i, (metric, label) in enumerate(zip(metrics, metric_labels)):
ax = axes[i]
values = [self.results[rt][metric] for rt in self.reward_types]
colors = ['#3498db', '#2ecc71', '#e74c3c', '#f39c12']
bars = ax.bar(self.reward_types, values, color=colors, alpha=0.7,
edgecolor='black', linewidth=1.5)
# Highlight best value
if metric in ['mae', 'rmse', 'total_energy', 'safety_violations', 'convergence_time']:
best_idx = np.argmin(values)
else:
best_idx = np.argmax(values)
bars[best_idx].set_edgecolor('gold')
bars[best_idx].set_linewidth(3)
ax.set_ylabel(label)
ax.set_title(label)
ax.tick_params(axis='x', rotation=15)
ax.grid(axis='y', alpha=0.3)
# Overall score (normalized and summed)
ax = axes[5]
# Normalize each metric to 0-1 (lower is better)
normalized_scores = {}
for rt in self.reward_types:
score = 0
for metric in ['mae', 'rmse', 'total_energy', 'safety_violations', 'convergence_time']:
values = [self.results[r][metric] for r in self.reward_types]
min_val, max_val = min(values), max(values)
normalized = (max_val - self.results[rt][metric]) / (max_val - min_val + 1e-10)
score += normalized
normalized_scores[rt] = score / 5 # Average
bars = ax.bar(normalized_scores.keys(), normalized_scores.values(),
color=colors, alpha=0.7, edgecolor='black', linewidth=1.5)
best_idx = np.argmax(list(normalized_scores.values()))
bars[best_idx].set_edgecolor('gold')
bars[best_idx].set_linewidth(3)
ax.set_ylabel('Overall Score (normalized)')
ax.set_title('Overall Performance Score')
ax.tick_params(axis='x', rotation=15)
ax.grid(axis='y', alpha=0.3)
ax.set_ylim([0, 1])
plt.tight_layout()
return fig
def print_recommendation(self):
"""Print recommendations"""
print("\n" + "="*70)
print("Recommendation")
print("="*70)
# Overall evaluation
scores = {}
for rt in self.reward_types:
score = 0
for metric in ['mae', 'rmse', 'total_energy', 'safety_violations', 'convergence_time']:
values = [self.results[r][metric] for r in self.reward_types]
min_val, max_val = min(values), max(values)
normalized = (max_val - self.results[rt][metric]) / (max_val - min_val + 1e-10)
score += normalized
scores[rt] = score / 5
best_reward = max(scores, key=scores.get)
print(f"\nβ
Best overall: {best_reward} (score: {scores[best_reward]:.3f})")
print("\nπ Use case recommendations:")
print(" - basic: Simple temperature control (early learning stage)")
print(" - pid_inspired: When steady-state error elimination is important")
print(" - multi_objective: Simultaneous consideration of safety and energy efficiency")
print(" - shaped: When learning speed is prioritized (dense reward)")
# Execute
evaluator = RewardFunctionEvaluator()
results = evaluator.compare_all_reward_functions(n_episodes=20)
fig = evaluator.visualize_comparison()
plt.show()
evaluator.print_recommendation()
basic: MAE=4.567, Total reward=-245.3
pid_inspired: MAE=2.134, Total reward=-189.7
multi_objective: MAE=2.890, Total reward=-156.2
shaped: MAE=2.045, Total reward=+123.4
Best overall: shaped (balance of learning speed and accuracy)
β Key Points for Reward Function Selection
- Early Learning: Dense reward (shaped) for fast learning
- Actual Operation: Multi-objective reward for practicality and balance
- Safety Emphasis: High penalty settings for safety constraints
Learning Objectives Review
Upon completing this chapter, you will be able to explain and implement the following:
Fundamental Understanding
- β Can explain the difference between sparse and dense rewards and when to use each
- β Understand how to apply PID control theory to reward design
- β Can explain the impact of weight adjustment in multi-objective optimization on agent behavior
Practical Skills
- β Can design reward functions suitable for process control tasks
- β Can implement intrinsic motivation to improve exploration efficiency
- β Can construct progressive learning environments with curriculum learning
- β Can quantitatively evaluate and compare multiple reward functions
Application Capability
- β Can estimate rewards from expert trajectories using Inverse RL
- β Can incorporate actual process constraints into rewards
- β Can experimentally validate the quality of reward design
Next Steps
In Chapter 3, we learned methods for effective reward design. In the next chapter, we will learn about multi-agent systems where multiple agents operate cooperatively and competitively.
π Preview of Next Chapter (Chapter 4)
- Fundamentals of multi-agent reinforcement learning
- Centralized Training with Decentralized Execution (CTDE)
- Inter-agent communication protocols
- Implementation of cooperative and competitive tasks
References
- Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
- Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
- Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
- McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.
Disclaimer
- This content is provided solely for educational, research, and informational purposes and does not constitute professional advice (legal, accounting, technical warranty, etc.).
- This content and accompanying code examples are provided "AS IS" without any warranty, express or implied, including but not limited to merchantability, fitness for a particular purpose, non-infringement, accuracy, completeness, operation, or safety.
- The author and Tohoku University assume no responsibility for the content, availability, or safety of external links, third-party data, tools, libraries, etc.
- To the maximum extent permitted by applicable law, the author and Tohoku University shall not be liable for any direct, indirect, incidental, special, consequential, or punitive damages arising from the use, execution, or interpretation of this content.
- The content may be changed, updated, or discontinued without notice.
- The copyright and license of this content are subject to the stated conditions (e.g., CC BY 4.0). Such licenses typically include no-warranty clauses.