Chapter 4: Multi-Agent Cooperative Control
This chapter covers Multi. You will learn essential concepts and techniques.
Chapter 4 Overview
In complex process plants, multiple reactors and distillation columns operate while mutually influencing each other. In such distributed systems, rather than using a single AI agent, having multiple agents cooperate for control enables more efficient and flexible operation.
This chapter covers multi-agent reinforcement learning (MARL: Multi-Agent Reinforcement Learning) from fundamentals to practical process control applications with seven implementation examples.
What You Will Learn in This Chapter
- CTDE (Centralized Training with Decentralized Execution): Centralized during training, distributed during execution
- Independent Q-Learning: Each agent learns independently
- QMIX: Credit assignment through value function decomposition and mixing
- Inter-agent Communication: Cooperation through message passing
- Cooperative Tasks: Synchronized control of multiple reactors
- Competitive Tasks: Allocation of limited resources
- Mixed Tasks: Realistic scenarios with both cooperative and competitive aspects
Fundamentals of Multi-Agent Reinforcement Learning
Formulation
Multi-agent systems are formulated as Partially Observable Stochastic Games:
Markov Game:
\[ \mathcal{G} = \langle \mathcal{N}, \mathcal{S}, \{\mathcal{A}^i\}_{i \in \mathcal{N}}, \mathcal{T}, \{R^i\}_{i \in \mathcal{N}}, \{\mathcal{O}^i\}_{i \in \mathcal{N}}, \gamma \rangle \]- \(\mathcal{N} = \{1, 2, \ldots, n\}\): Agent set
- \(\mathcal{S}\): Global state space
- \(\mathcal{A}^i\): Action space of agent \(i\)
- \(\mathcal{T}: \mathcal{S} \times \mathcal{A}^1 \times \cdots \times \mathcal{A}^n \to \Delta(\mathcal{S})\): State transition function
- \(R^i: \mathcal{S} \times \mathcal{A}^1 \times \cdots \times \mathcal{A}^n \to \mathbb{R}\): Reward function for agent \(i\)
- \(\mathcal{O}^i\): Observation space of agent \(i\)
Classification: Cooperative, Competitive, Mixed
R¹=R²=...=Rⁿ] C --> F[Zero-Sum
Σᵢ Rⁱ = 0] D --> G[General Game
Cooperation+Competition]
In process control, the following situations can be considered:
- Cooperative Tasks: Coordinating multiple reactors to maximize overall productivity
- Competitive Tasks: Plants competing for limited utilities (steam, cooling water)
- Mixed Tasks: Each plant achieving its own production targets while maintaining overall stability
CTDE is a framework that trains centrally using information from all agents during training, while during execution, each agent acts in a distributed manner using only its own observations. This achieves both training efficiency and execution-time scalability.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# CTDE basic framework implementation
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
class Actor(nn.Module):
"""Actor for each agent (executable in distributed manner)"""
def __init__(self, obs_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, 64), nn.ReLU(),
nn.Linear(64, 64), nn.ReLU(),
nn.Linear(64, action_dim), nn.Tanh()
)
def forward(self, obs):
return self.net(obs)
class CentralizedCritic(nn.Module):
"""Central Critic (used only during training)"""
def __init__(self, state_dim, n_agents, action_dim):
super().__init__()
total_action_dim = n_agents * action_dim
self.net = nn.Sequential(
nn.Linear(state_dim + total_action_dim, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
nn.Linear(128, 1)
)
def forward(self, state, actions):
# actions: [n_agents, action_dim] -> flatten
x = torch.cat([state, actions.flatten()], dim=-1)
return self.net(x)
class CTDEAgent:
"""CTDE learning agent"""
def __init__(self, n_agents, obs_dim, action_dim, state_dim):
self.n_agents = n_agents
self.actors = [Actor(obs_dim, action_dim) for _ in range(n_agents)]
self.critic = CentralizedCritic(state_dim, n_agents, action_dim)
self.actor_opts = [optim.Adam(a.parameters(), lr=3e-4) for a in self.actors]
self.critic_opt = optim.Adam(self.critic.parameters(), lr=1e-3)
def select_actions(self, observations):
"""Execution time: Each agent selects actions independently"""
actions = []
for i, obs in enumerate(observations):
with torch.no_grad():
obs_t = torch.FloatTensor(obs)
action = self.actors[i](obs_t).numpy()
actions.append(action)
return np.array(actions)
def train_step(self, batch):
"""Training time: Update using central Critic"""
states, obs, actions, rewards, next_states, next_obs, dones = batch
# Critic update (TD error)
with torch.no_grad():
next_actions = torch.stack([
self.actors[i](torch.FloatTensor(next_obs[i]))
for i in range(self.n_agents)
])
target_q = rewards + 0.99 * self.critic(
torch.FloatTensor(next_states), next_actions
) * (1 - dones)
current_q = self.critic(torch.FloatTensor(states), torch.FloatTensor(actions))
critic_loss = nn.MSELoss()(current_q, target_q)
self.critic_opt.zero_grad()
critic_loss.backward()
self.critic_opt.step()
# Actor update (policy gradient)
for i in range(self.n_agents):
new_actions = [
self.actors[j](torch.FloatTensor(obs[j])) if j == i
else torch.FloatTensor(actions[j])
for j in range(self.n_agents)
]
new_actions = torch.stack(new_actions)
actor_loss = -self.critic(torch.FloatTensor(states), new_actions).mean()
self.actor_opts[i].zero_grad()
actor_loss.backward()
self.actor_opts[i].step()
# Usage example: Cooperative control of 3 CSTRs
n_agents = 3
agent = CTDEAgent(n_agents=n_agents, obs_dim=4, action_dim=2, state_dim=12)
# During execution, distributed (each agent uses only its own observation)
observations = [np.random.randn(4) for _ in range(n_agents)]
actions = agent.select_actions(observations) # Distributed execution
print(f"Actions in distributed execution: {actions.shape}") # (3, 2)
The simplest multi-agent learning approach is where each agent independently performs Q-learning. Other agents are treated as part of the environment, leading to non-stationarity issues, but the implementation is simple and practical in many cases.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# Independent Q-Learning implementation
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class QNetwork(nn.Module):
"""Independent Q-function for each agent"""
def __init__(self, obs_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, 64), nn.ReLU(),
nn.Linear(64, 64), nn.ReLU(),
nn.Linear(64, action_dim)
)
def forward(self, obs):
return self.net(obs)
class IQLAgent:
"""Independent Q-Learning agent"""
def __init__(self, obs_dim, action_dim, agent_id):
self.agent_id = agent_id
self.action_dim = action_dim
self.q_net = QNetwork(obs_dim, action_dim)
self.target_net = QNetwork(obs_dim, action_dim)
self.target_net.load_state_dict(self.q_net.state_dict())
self.optimizer = optim.Adam(self.q_net.parameters(), lr=1e-3)
self.memory = deque(maxlen=10000)
self.epsilon = 1.0
def select_action(self, obs):
"""ε-greedy action selection"""
if random.random() < self.epsilon:
return np.random.randint(self.action_dim)
else:
with torch.no_grad():
q_values = self.q_net(torch.FloatTensor(obs))
return q_values.argmax().item()
def store_transition(self, obs, action, reward, next_obs, done):
self.memory.append((obs, action, reward, next_obs, done))
def train_step(self, batch_size=32):
if len(self.memory) < batch_size:
return 0.0
batch = random.sample(self.memory, batch_size)
obs, actions, rewards, next_obs, dones = zip(*batch)
obs_t = torch.FloatTensor(obs)
actions_t = torch.LongTensor(actions)
rewards_t = torch.FloatTensor(rewards)
next_obs_t = torch.FloatTensor(next_obs)
dones_t = torch.FloatTensor(dones)
# Current Q-values
q_values = self.q_net(obs_t).gather(1, actions_t.unsqueeze(1)).squeeze()
# Target Q-values
with torch.no_grad():
next_q_values = self.target_net(next_obs_t).max(1)[0]
target_q = rewards_t + 0.99 * next_q_values * (1 - dones_t)
# Update
loss = nn.MSELoss()(q_values, target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# ε decay
self.epsilon = max(0.01, self.epsilon * 0.995)
return loss.item()
def update_target(self):
self.target_net.load_state_dict(self.q_net.state_dict())
# Multi-agent environment simulation
class MultiCSTREnv:
"""Environment with 3 CSTRs connected in series"""
def __init__(self):
self.n_agents = 3
self.reset()
def reset(self):
# State of each reactor [temperature, concentration]
self.states = np.array([[350.0, 0.5], [340.0, 0.3], [330.0, 0.1]])
return self.states
def step(self, actions):
# actions: [0=cooling, 1=heating, 2=maintain] for each agent
rewards = []
for i in range(self.n_agents):
T, C = self.states[i]
# Temperature change from action
if actions[i] == 0: # cooling
T -= 5
elif actions[i] == 1: # heating
T += 5
# Reaction progress (simple model)
k = 0.1 * np.exp((T - 350) / 10)
C = C * (1 - k * 0.1)
# Inflow to next reactor
if i < self.n_agents - 1:
self.states[i+1, 1] += C * 0.3
self.states[i] = [T, C]
# Reward: deviation from target temperature and productivity
temp_penalty = -abs(T - 350)
production = k * C
rewards.append(temp_penalty * 0.1 + production * 10)
done = False
return self.states.copy(), np.array(rewards), [done]*self.n_agents
# Training loop
env = MultiCSTREnv()
agents = [IQLAgent(obs_dim=2, action_dim=3, agent_id=i) for i in range(3)]
for episode in range(500):
obs = env.reset()
episode_rewards = [0] * 3
for step in range(100):
actions = [agents[i].select_action(obs[i]) for i in range(3)]
next_obs, rewards, dones = env.step(actions)
# Each agent learns independently
for i in range(3):
agents[i].store_transition(obs[i], actions[i], rewards[i],
next_obs[i], dones[i])
agents[i].train_step()
episode_rewards[i] += rewards[i]
obs = next_obs
# Periodic target update
if episode % 10 == 0:
for agent in agents:
agent.update_target()
if episode % 50 == 0:
print(f"Episode {episode}, Total Rewards: {sum(episode_rewards):.2f}")
QMIX integrates individual Q-values of each agent using a mixing network, achieving credit assignment while maintaining monotonicity constraints (Individual-Global-Max: IGM).
QMIX Value Decomposition:
\[ Q_{tot}(\boldsymbol{\tau}, \mathbf{u}) = f_{mix}(Q_1(\tau^1, u^1), \ldots, Q_n(\tau^n, u^n); s) \]Monotonicity constraint:
\[ \frac{\partial Q_{tot}}{\partial Q_i} \geq 0, \quad \forall i \]# Requirements:
# - Python 3.9+
# - torch>=2.0.0, <2.3.0
# QMIX implementation
import torch
import torch.nn as nn
import torch.optim as optim
class AgentQNetwork(nn.Module):
"""Q-function for each agent"""
def __init__(self, obs_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, 64), nn.ReLU(),
nn.Linear(64, action_dim)
)
def forward(self, obs):
return self.net(obs)
class QMixerNetwork(nn.Module):
"""Mixing network that guarantees monotonicity"""
def __init__(self, n_agents, state_dim):
super().__init__()
self.n_agents = n_agents
# Hypernetwork (generates weights from state)
self.hyper_w1 = nn.Linear(state_dim, n_agents * 32)
self.hyper_b1 = nn.Linear(state_dim, 32)
self.hyper_w2 = nn.Linear(state_dim, 32)
self.hyper_b2 = nn.Sequential(
nn.Linear(state_dim, 32), nn.ReLU(),
nn.Linear(32, 1)
)
def forward(self, agent_qs, state):
"""
agent_qs: [batch, n_agents]
state: [batch, state_dim]
"""
batch_size = agent_qs.size(0)
agent_qs = agent_qs.view(batch_size, 1, self.n_agents)
# First layer weights (monotonicity guaranteed by absolute value)
w1 = torch.abs(self.hyper_w1(state))
w1 = w1.view(batch_size, self.n_agents, 32)
b1 = self.hyper_b1(state).view(batch_size, 1, 32)
# First layer output
hidden = torch.bmm(agent_qs, w1) + b1
hidden = torch.relu(hidden)
# Second layer weights (monotonicity guaranteed by absolute value)
w2 = torch.abs(self.hyper_w2(state))
w2 = w2.view(batch_size, 32, 1)
b2 = self.hyper_b2(state).view(batch_size, 1, 1)
# Final output
q_tot = torch.bmm(hidden, w2) + b2
return q_tot.view(batch_size)
class QMIX:
"""QMIX learning algorithm"""
def __init__(self, n_agents, obs_dim, action_dim, state_dim):
self.n_agents = n_agents
self.agent_networks = [AgentQNetwork(obs_dim, action_dim)
for _ in range(n_agents)]
self.mixer = QMixerNetwork(n_agents, state_dim)
self.target_networks = [AgentQNetwork(obs_dim, action_dim)
for _ in range(n_agents)]
self.target_mixer = QMixerNetwork(n_agents, state_dim)
# Target initialization
for i in range(n_agents):
self.target_networks[i].load_state_dict(
self.agent_networks[i].state_dict())
self.target_mixer.load_state_dict(self.mixer.state_dict())
# Optimizer
params = list(self.mixer.parameters())
for net in self.agent_networks:
params += list(net.parameters())
self.optimizer = optim.Adam(params, lr=5e-4)
def select_actions(self, observations, epsilon=0.05):
"""Action selection for each agent"""
actions = []
for i, obs in enumerate(observations):
if torch.rand(1).item() < epsilon:
actions.append(torch.randint(0, 5, (1,)).item())
else:
with torch.no_grad():
q_vals = self.agent_networks[i](torch.FloatTensor(obs))
actions.append(q_vals.argmax().item())
return actions
def train_step(self, batch):
"""QMIX update"""
states, obs_list, actions, rewards, next_states, next_obs_list, dones = batch
# Current Q-values (each agent)
agent_qs = []
for i in range(self.n_agents):
q_vals = self.agent_networks[i](torch.FloatTensor(obs_list[i]))
q = q_vals.gather(1, torch.LongTensor(actions[:, i]).unsqueeze(1))
agent_qs.append(q)
agent_qs = torch.cat(agent_qs, dim=1)
# Mixed Q_tot
q_tot = self.mixer(agent_qs, torch.FloatTensor(states))
# Target Q-values
with torch.no_grad():
target_agent_qs = []
for i in range(self.n_agents):
target_q = self.target_networks[i](
torch.FloatTensor(next_obs_list[i])).max(1)[0]
target_agent_qs.append(target_q.unsqueeze(1))
target_agent_qs = torch.cat(target_agent_qs, dim=1)
target_q_tot = self.target_mixer(target_agent_qs,
torch.FloatTensor(next_states))
target = rewards + 0.99 * target_q_tot * (1 - dones)
# Loss
loss = nn.MSELoss()(q_tot, target)
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.mixer.parameters(), 10)
self.optimizer.step()
return loss.item()
# Usage example
qmix = QMIX(n_agents=3, obs_dim=4, action_dim=5, state_dim=12)
observations = [torch.randn(4) for _ in range(3)]
actions = qmix.select_actions(observations)
print(f"QMIX actions: {actions}")
Exchanging information between agents enables more effective cooperation. Methods like CommNet and TarMAC implement message passing using attention mechanisms.
Channel] A2[Agent 2] -->|msg₂| C A3[Agent 3] -->|msg₃| C C -->|aggregated| A1 C -->|aggregated| A2 C -->|aggregated| A3
# Requirements:
# - Python 3.9+
# - torch>=2.0.0, <2.3.0
# Message passing mechanism implementation
import torch
import torch.nn as nn
class AttentionCommModule(nn.Module):
"""Attention-based communication module"""
def __init__(self, hidden_dim, n_agents):
super().__init__()
self.n_agents = n_agents
self.hidden_dim = hidden_dim
# Message generation
self.msg_encoder = nn.Linear(hidden_dim, hidden_dim)
# Attention mechanism
self.query = nn.Linear(hidden_dim, hidden_dim)
self.key = nn.Linear(hidden_dim, hidden_dim)
self.value = nn.Linear(hidden_dim, hidden_dim)
def forward(self, hidden_states):
"""
hidden_states: [n_agents, hidden_dim]
returns: [n_agents, hidden_dim] (hidden states after communication)
"""
# Message generation
messages = self.msg_encoder(hidden_states) # [n_agents, hidden_dim]
# Attention score calculation
Q = self.query(hidden_states) # [n_agents, hidden_dim]
K = self.key(messages) # [n_agents, hidden_dim]
V = self.value(messages) # [n_agents, hidden_dim]
# Scaled dot-product attention
attn_scores = torch.matmul(Q, K.T) / (self.hidden_dim ** 0.5)
attn_weights = torch.softmax(attn_scores, dim=-1)
# Message aggregation
aggregated = torch.matmul(attn_weights, V)
return hidden_states + aggregated # Residual connection
class CommunicativeAgent(nn.Module):
"""Agent capable of communication"""
def __init__(self, obs_dim, action_dim, hidden_dim, n_agents):
super().__init__()
self.obs_encoder = nn.Sequential(
nn.Linear(obs_dim, hidden_dim), nn.ReLU()
)
self.comm_module = AttentionCommModule(hidden_dim, n_agents)
self.policy = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, observations, agent_idx):
"""
observations: [n_agents, obs_dim]
agent_idx: Index of this agent
"""
# Encode each agent's observation
hidden_states = self.obs_encoder(observations)
# Communication phase
comm_hidden = self.comm_module(hidden_states)
# Action selection with own hidden state
my_hidden = comm_hidden[agent_idx]
action_logits = self.policy(my_hidden)
return action_logits, comm_hidden
# Multi-round communication
class MultiRoundCommAgent(nn.Module):
"""Agent performing multiple rounds of communication"""
def __init__(self, obs_dim, action_dim, hidden_dim, n_agents, n_comm_rounds=2):
super().__init__()
self.n_comm_rounds = n_comm_rounds
self.obs_encoder = nn.Linear(obs_dim, hidden_dim)
self.comm_modules = nn.ModuleList([
AttentionCommModule(hidden_dim, n_agents)
for _ in range(n_comm_rounds)
])
self.policy = nn.Linear(hidden_dim, action_dim)
def forward(self, observations):
"""
observations: [n_agents, obs_dim]
returns: [n_agents, action_dim]
"""
hidden = torch.relu(self.obs_encoder(observations))
# Multiple rounds of communication
for comm_module in self.comm_modules:
hidden = comm_module(hidden)
# Each agent selects action
actions = self.policy(hidden)
return actions
# Usage example: Cooperative control of 3 reactors
n_agents = 3
obs_dim = 4 # [temperature, concentration, flow rate, pressure]
action_dim = 5 # Discrete actions
hidden_dim = 64
agent = MultiRoundCommAgent(obs_dim, action_dim, hidden_dim, n_agents, n_comm_rounds=2)
# Simulation
observations = torch.randn(n_agents, obs_dim)
actions = agent(observations)
print(f"Actions after communication: {actions.shape}") # [3, 5]
# Using individual agent
single_agent = CommunicativeAgent(obs_dim, action_dim, hidden_dim, n_agents)
action_logits, comm_hidden = single_agent(observations, agent_idx=0)
print(f"Agent 0 action: {torch.softmax(action_logits, dim=-1)}")
print(f"Hidden state after communication: {comm_hidden.shape}") # [3, 64]
Implement a cooperative task where three continuous stirred-tank reactors (CSTR) are connected in series, maximizing overall productivity while appropriately controlling the temperature of each reactor.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# Cooperative task: Synchronized control of 3 CSTRs
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
class CooperativeCSTREnv:
"""Cooperative environment with 3 CSTRs connected in series"""
def __init__(self):
self.n_agents = 3
self.dt = 0.1 # Time step [min]
self.reset()
def reset(self):
# Each CSTR: [temperature T(K), concentration CA(mol/L), flow rate F(L/min)]
self.states = np.array([
[350.0, 2.0, 100.0], # CSTR1
[340.0, 1.5, 100.0], # CSTR2
[330.0, 1.0, 100.0] # CSTR3
])
self.time = 0
return self._get_observations()
def _get_observations(self):
"""Observation for each agent (local information + neighboring information)"""
obs = []
for i in range(self.n_agents):
local = self.states[i].copy()
# Upstream information (for cooperation)
prev = self.states[i-1] if i > 0 else np.zeros(3)
# Downstream information
next_ = self.states[i+1] if i < self.n_agents-1 else np.zeros(3)
obs.append(np.concatenate([local, prev, next_]))
return obs
def step(self, actions):
"""
actions: [n_agents, 2] = [[Q1, Tin1], [Q2, Tin2], [Q3, Tin3]]
Q: Cooling flow rate [L/min] (0-50)
Tin: Inlet temperature [K] (300-400)
"""
rewards = []
for i in range(self.n_agents):
T, CA, F = self.states[i]
Q = actions[i][0] * 50 # Denormalization
Tin = actions[i][1] * 100 + 300
# Reaction rate constant (Arrhenius equation)
Ea = 50000 # [J/mol]
R = 8.314 # [J/(mol·K)]
k = 1e10 * np.exp(-Ea / (R * T))
# CSTR model
V = 1000 # Reactor volume [L]
rho = 1000 # Density [g/L]
Cp = 4.18 # Specific heat [J/(g·K)]
dHr = -50000 # Heat of reaction [J/mol]
# Inlet concentration (inflow from upstream)
CA_in = self.states[i-1, 1] if i > 0 else 2.5
# Mass balance
dCA = (F / V) * (CA_in - CA) - k * CA
CA_new = CA + dCA * self.dt
# Energy balance
Q_reaction = -dHr * k * CA * V
Q_cooling = Q * rho * Cp * (T - Tin)
dT = (Q_reaction - Q_cooling) / (V * rho * Cp)
T_new = T + dT * self.dt
self.states[i] = [T_new, max(0, CA_new), F]
# Cooperative reward: Overall productivity and stability
production = k * CA # Reaction rate
temp_penalty = -abs(T_new - 350) ** 2 # Deviation from target temperature
flow_continuity = -abs(F - 100) ** 2 if i > 0 else 0
reward = production * 100 + temp_penalty * 0.1 + flow_continuity * 0.01
rewards.append(reward)
# Common reward (cooperative task)
total_production = sum([self.states[i, 1] for i in range(self.n_agents)])
common_reward = total_production * 10
rewards = [r + common_reward for r in rewards]
self.time += self.dt
done = self.time >= 10 # 10-minute episode
return self._get_observations(), np.array(rewards), [done]*self.n_agents
# QMIX training (simplified version)
class SimpleQMIX:
def __init__(self, n_agents, obs_dim, action_dim):
self.n_agents = n_agents
self.q_nets = [nn.Sequential(
nn.Linear(obs_dim, 64), nn.ReLU(),
nn.Linear(64, action_dim)
) for _ in range(n_agents)]
self.mixer = nn.Sequential(
nn.Linear(n_agents, 32), nn.ReLU(),
nn.Linear(32, 1)
)
params = list(self.mixer.parameters())
for net in self.q_nets:
params += list(net.parameters())
self.optimizer = optim.Adam(params, lr=1e-3)
def select_actions(self, observations):
actions = []
for i, obs in enumerate(observations):
with torch.no_grad():
q_vals = self.q_nets[i](torch.FloatTensor(obs))
# Discretize continuous actions (simplified)
action = torch.rand(2) # [Q normalized, Tin normalized]
actions.append(action.numpy())
return np.array(actions)
# Training execution
env = CooperativeCSTREnv()
agent = SimpleQMIX(n_agents=3, obs_dim=9, action_dim=10)
for episode in range(100):
obs = env.reset()
episode_reward = 0
for step in range(100):
actions = agent.select_actions(obs)
next_obs, rewards, dones = env.step(actions)
episode_reward += sum(rewards)
obs = next_obs
if dones[0]:
break
if episode % 10 == 0:
print(f"Episode {episode}, Total Reward: {episode_reward:.2f}")
print(f" Final Temps: {[f'{s[0]:.1f}K' for s in env.states]}")
print(f" Final Concs: {[f'{s[1]:.3f}mol/L' for s in env.states]}")
A competitive scenario where multiple plants compete for limited utilities such as steam and cooling water. Each agent needs to maximize its own production while coordinating under resource constraints.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# Competitive task: Utility allocation problem
import numpy as np
import torch
import torch.nn as nn
class CompetitiveUtilityEnv:
"""Competition for limited utilities among multiple plants"""
def __init__(self, n_plants=3):
self.n_plants = n_plants
self.total_steam = 500.0 # Total steam supply [kg/h]
self.total_cooling = 1000.0 # Total cooling water [L/min]
self.reset()
def reset(self):
# Each plant: [production rate, temperature, steam usage, cooling water usage]
self.states = np.array([
[50.0, 350.0, 150.0, 300.0] for _ in range(self.n_plants)
])
return self._get_observations()
def _get_observations(self):
"""Observation for each plant"""
obs = []
for i in range(self.n_plants):
# Own state + resource availability information
steam_used = sum(self.states[:, 2])
cooling_used = sum(self.states[:, 3])
steam_avail = max(0, self.total_steam - steam_used)
cooling_avail = max(0, self.total_cooling - cooling_used)
obs_i = np.concatenate([
self.states[i],
[steam_avail, cooling_avail]
])
obs.append(obs_i)
return obs
def step(self, actions):
"""
actions: [n_plants, 2] = [[steam_request, cooling_request], ...]
Each value is normalized to 0-1
"""
# Convert requests to real values
steam_requests = actions[:, 0] * 200 # 0-200 kg/h
cooling_requests = actions[:, 1] * 400 # 0-400 L/min
# Resource allocation (proportional allocation)
total_steam_req = sum(steam_requests)
total_cooling_req = sum(cooling_requests)
if total_steam_req > self.total_steam:
steam_allocated = steam_requests * (self.total_steam / total_steam_req)
else:
steam_allocated = steam_requests
if total_cooling_req > self.total_cooling:
cooling_allocated = cooling_requests * (self.total_cooling / total_cooling_req)
else:
cooling_allocated = cooling_requests
rewards = []
for i in range(self.n_plants):
# Production rate depends on resources
steam_factor = steam_allocated[i] / 200
cooling_factor = cooling_allocated[i] / 400
production = 100 * steam_factor * cooling_factor
# Temperature management (penalty for insufficient cooling)
temp_change = (steam_allocated[i] * 0.5 - cooling_allocated[i] * 0.3)
temp_new = self.states[i, 1] + temp_change
temp_penalty = -abs(temp_new - 350) if temp_new > 380 else 0
self.states[i] = [production, temp_new,
steam_allocated[i], cooling_allocated[i]]
# Reward: productivity - resource shortage penalty
shortage_penalty = 0
if steam_allocated[i] < steam_requests[i]:
shortage_penalty += (steam_requests[i] - steam_allocated[i]) * 0.5
if cooling_allocated[i] < cooling_requests[i]:
shortage_penalty += (cooling_requests[i] - cooling_allocated[i]) * 0.3
reward = production - shortage_penalty + temp_penalty
rewards.append(reward)
done = False
return self._get_observations(), np.array(rewards), [done]*self.n_plants
# Nash Q-Learning (for competitive tasks)
class NashQLearningAgent:
"""Agent learning Nash equilibrium"""
def __init__(self, obs_dim, action_dim, agent_id):
self.agent_id = agent_id
self.q_net = nn.Sequential(
nn.Linear(obs_dim, 64), nn.ReLU(),
nn.Linear(64, 64), nn.ReLU(),
nn.Linear(64, action_dim)
)
self.optimizer = optim.Adam(self.q_net.parameters(), lr=1e-3)
self.epsilon = 0.3
def select_action(self, obs):
"""ε-greedy with Nash equilibrium consideration"""
if np.random.rand() < self.epsilon:
return np.random.rand(2)
else:
with torch.no_grad():
# Continuous action approximation (simplified)
return torch.sigmoid(torch.randn(2)).numpy()
def update_policy(self, obs, action, reward, next_obs):
"""Q-learning update (simplified version)"""
# Implementation omitted (learning algorithm for competitive environment)
pass
# Simulation
env = CompetitiveUtilityEnv(n_plants=3)
agents = [NashQLearningAgent(obs_dim=6, action_dim=2, agent_id=i)
for i in range(3)]
for episode in range(50):
obs = env.reset()
episode_rewards = [0] * 3
for step in range(50):
actions = np.array([agents[i].select_action(obs[i]) for i in range(3)])
next_obs, rewards, dones = env.step(actions)
for i in range(3):
episode_rewards[i] += rewards[i]
obs = next_obs
if episode % 10 == 0:
print(f"\nEpisode {episode}")
print(f" Individual Rewards: {[f'{r:.1f}' for r in episode_rewards]}")
print(f" Productions: {[f'{s[0]:.1f}' for s in env.states]}")
print(f" Steam Usage: {[f'{s[2]:.1f}' for s in env.states]} / {env.total_steam}")
print(f" Cooling Usage: {[f'{s[3]:.1f}' for s in env.states]} / {env.total_cooling}")
In real plants, both cooperative and competitive aspects exist. Each plant needs to achieve its own production targets (competition) while also considering overall stability and efficiency (cooperation).
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# Mixed task: Production system with coexisting cooperation and competition
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
class MixedCoopCompEnv:
"""Complex environment with mixed cooperation and competition"""
def __init__(self, n_plants=3):
self.n_plants = n_plants
self.total_energy = 1000.0 # Total energy [kW]
self.production_targets = [100, 120, 80] # Production target for each plant
self.reset()
def reset(self):
# Each plant: [production, temperature, energy usage, quality]
self.states = np.array([
[50.0, 350.0, 300.0, 0.9] for _ in range(self.n_plants)
])
self.time = 0
return self._get_observations()
def _get_observations(self):
obs = []
for i in range(self.n_plants):
# Own state + target + other plants' production (for cooperation)
others_production = [self.states[j, 0] for j in range(self.n_plants) if j != i]
total_energy_used = sum(self.states[:, 2])
obs_i = np.concatenate([
self.states[i],
[self.production_targets[i]],
others_production,
[total_energy_used, self.total_energy]
])
obs.append(obs_i)
return obs
def step(self, actions):
"""
actions: [n_plants, 3] = [[energy_req, temp_setpoint, quality_target], ...]
"""
# Energy allocation (competitive element)
energy_requests = actions[:, 0] * 500
total_energy_req = sum(energy_requests)
if total_energy_req > self.total_energy:
# When insufficient, allocate based on priority (prioritize plants below target)
priorities = [
max(0, self.production_targets[i] - self.states[i, 0])
for i in range(self.n_plants)
]
total_priority = sum(priorities) + 1e-6
energy_allocated = [
self.total_energy * (priorities[i] / total_priority)
for i in range(self.n_plants)
]
else:
energy_allocated = energy_requests
rewards = []
for i in range(self.n_plants):
temp_setpoint = actions[i, 1] * 100 + 300 # 300-400K
quality_target = actions[i, 2] # 0-1
# Production model
energy_factor = energy_allocated[i] / 500
temp_factor = 1.0 - abs(temp_setpoint - 350) / 100
production = self.production_targets[i] * energy_factor * temp_factor
# Quality model
quality = 0.5 + 0.5 * quality_target * temp_factor
# Temperature update
temp = self.states[i, 1] + (temp_setpoint - self.states[i, 1]) * 0.3
self.states[i] = [production, temp, energy_allocated[i], quality]
# Reward design (mixed)
# 1. Individual target achievement (competitive element)
target_achievement = -abs(production - self.production_targets[i])
# 2. Quality penalty
quality_reward = quality * 10
# 3. Energy efficiency (cooperative element)
energy_efficiency = production / (energy_allocated[i] + 1)
reward = target_achievement + quality_reward + energy_efficiency * 5
rewards.append(reward)
# Cooperation bonus: Overall stability
total_production = sum(self.states[:, 0])
stability = -np.std(self.states[:, 1]) # Temperature standard deviation
cooperation_bonus = (total_production / sum(self.production_targets)) * 50 + stability
# Final reward = Individual reward + cooperation bonus
rewards = [r + cooperation_bonus * 0.3 for r in rewards]
self.time += 1
done = self.time >= 100
return self._get_observations(), np.array(rewards), [done]*self.n_plants
# COMA (Counterfactual Multi-Agent Policy Gradient)
class COMAAgent:
"""COMA agent for mixed tasks"""
def __init__(self, n_agents, obs_dim, action_dim, state_dim):
self.n_agents = n_agents
# Actor for each agent
self.actors = [nn.Sequential(
nn.Linear(obs_dim, 64), nn.ReLU(),
nn.Linear(64, action_dim), nn.Tanh()
) for _ in range(n_agents)]
# Central Critic (counterfactual baseline)
self.critic = nn.Sequential(
nn.Linear(state_dim + n_agents * action_dim, 128), nn.ReLU(),
nn.Linear(128, n_agents) # Q-value for each agent
)
self.actor_opts = [optim.Adam(a.parameters(), lr=3e-4) for a in self.actors]
self.critic_opt = optim.Adam(self.critic.parameters(), lr=1e-3)
def select_actions(self, observations):
actions = []
for i, obs in enumerate(observations):
with torch.no_grad():
action = self.actors[i](torch.FloatTensor(obs)).numpy()
actions.append(action)
return np.array(actions)
# Training loop
env = MixedCoopCompEnv(n_plants=3)
agent = COMAAgent(n_agents=3, obs_dim=9, action_dim=3, state_dim=12)
for episode in range(100):
obs = env.reset()
episode_rewards = [0] * 3
for step in range(100):
actions = agent.select_actions(obs)
next_obs, rewards, dones = env.step(actions)
for i in range(3):
episode_rewards[i] += rewards[i]
obs = next_obs
if dones[0]:
break
if episode % 20 == 0:
print(f"\nEpisode {episode}")
print(f" Individual Rewards: {[f'{r:.1f}' for r in episode_rewards]}")
print(f" Productions: {[f'{s[0]:.1f}' for s in env.states]}")
print(f" Targets: {env.production_targets}")
print(f" Qualities: {[f'{s[3]:.2f}' for s in env.states]}")
print(f" Total Energy Used: {sum(env.states[:, 2]):.1f} / {env.total_energy}")
Chapter 4 Summary
What We Learned
- CTDE: Efficient framework with centralized training and distributed execution
- Independent Q-Learning: Simplest approach but has non-stationarity issues
- QMIX: Credit assignment through value decomposition and monotonicity constraints
- Communication Mechanisms: Strengthening cooperation through attention-based message passing
- Cooperative Tasks: Pursuing global optimum with common rewards
- Competitive Tasks: Resource allocation problems
- Mixed Tasks: Complex scenarios closer to real plant operation
Algorithm Comparison
| Method | Applicable Task | Learning Efficiency | Implementation Difficulty |
|---|---|---|---|
| IQL | Simple cooperation | Low | Low |
| QMIX | Cooperative tasks | High | Medium |
| CTDE | Mixed tasks | High | Medium |
| CommNet | Complex cooperation | High | High |
Implications for Process Control
- QMIX and CTDE are effective for cooperative control of multiple reactors and distillation columns
- Consider Nash Q-Learning for competitive tasks like utility allocation
- When communication delays exist, include buffer information in observations
- Mixed tasks are common in real plants, making reward design crucial
Next Chapter
Chapter 5: Real Plant Deployment and Safety
Learn about ensuring safety when deploying reinforcement learning agents to actual plants, including sim-to-real transfer, uncertainty quantification, and human override mechanisms.
References
- Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
- Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
- Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
- McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.
Disclaimer
- This content is provided solely for educational, research, and informational purposes and does not constitute professional advice (legal, accounting, technical warranty, etc.).
- This content and accompanying code examples are provided "AS IS" without any warranty, express or implied, including but not limited to merchantability, fitness for a particular purpose, non-infringement, accuracy, completeness, operation, or safety.
- The author and Tohoku University assume no responsibility for the content, availability, or safety of external links, third-party data, tools, libraries, etc.
- To the maximum extent permitted by applicable law, the author and Tohoku University shall not be liable for any direct, indirect, incidental, special, consequential, or punitive damages arising from the use, execution, or interpretation of this content.
- The content may be changed, updated, or discontinued without notice.
- The copyright and license of this content are subject to the stated conditions (e.g., CC BY 4.0). Such licenses typically include no-warranty clauses.