🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 4: Multi-Agent Cooperative Control

📖 Reading Time: 30-40 minutes 📊 Difficulty: Advanced
Autonomous Process Operation by AI Agents Series

Chapter 4: Multi-Agent Cooperative Control

This chapter covers Multi. You will learn essential concepts and techniques.

Chapter 4 Overview

In complex process plants, multiple reactors and distillation columns operate while mutually influencing each other. In such distributed systems, rather than using a single AI agent, having multiple agents cooperate for control enables more efficient and flexible operation.

This chapter covers multi-agent reinforcement learning (MARL: Multi-Agent Reinforcement Learning) from fundamentals to practical process control applications with seven implementation examples.

What You Will Learn in This Chapter

  • CTDE (Centralized Training with Decentralized Execution): Centralized during training, distributed during execution
  • Independent Q-Learning: Each agent learns independently
  • QMIX: Credit assignment through value function decomposition and mixing
  • Inter-agent Communication: Cooperation through message passing
  • Cooperative Tasks: Synchronized control of multiple reactors
  • Competitive Tasks: Allocation of limited resources
  • Mixed Tasks: Realistic scenarios with both cooperative and competitive aspects

Fundamentals of Multi-Agent Reinforcement Learning

Formulation

Multi-agent systems are formulated as Partially Observable Stochastic Games:

Markov Game:

\[ \mathcal{G} = \langle \mathcal{N}, \mathcal{S}, \{\mathcal{A}^i\}_{i \in \mathcal{N}}, \mathcal{T}, \{R^i\}_{i \in \mathcal{N}}, \{\mathcal{O}^i\}_{i \in \mathcal{N}}, \gamma \rangle \]
  • \(\mathcal{N} = \{1, 2, \ldots, n\}\): Agent set
  • \(\mathcal{S}\): Global state space
  • \(\mathcal{A}^i\): Action space of agent \(i\)
  • \(\mathcal{T}: \mathcal{S} \times \mathcal{A}^1 \times \cdots \times \mathcal{A}^n \to \Delta(\mathcal{S})\): State transition function
  • \(R^i: \mathcal{S} \times \mathcal{A}^1 \times \cdots \times \mathcal{A}^n \to \mathbb{R}\): Reward function for agent \(i\)
  • \(\mathcal{O}^i\): Observation space of agent \(i\)

Classification: Cooperative, Competitive, Mixed

graph TD A[Multi-Agent Tasks] --> B[Fully Cooperative] A --> C[Fully Competitive] A --> D[Mixed] B --> E[Common Reward
R¹=R²=...=Rⁿ] C --> F[Zero-Sum
Σᵢ Rⁱ = 0] D --> G[General Game
Cooperation+Competition]

In process control, the following situations can be considered:

  • Cooperative Tasks: Coordinating multiple reactors to maximize overall productivity
  • Competitive Tasks: Plants competing for limited utilities (steam, cooling water)
  • Mixed Tasks: Each plant achieving its own production targets while maintaining overall stability
1 CTDE (Centralized Training with Decentralized Execution) Framework

CTDE is a framework that trains centrally using information from all agents during training, while during execution, each agent acts in a distributed manner using only its own observations. This achieves both training efficiency and execution-time scalability.

graph LR subgraph Training[Training (Centralized)] S[Global State] --> C[Central Critic] O1[Obs 1] --> A1[Actor 1] O2[Obs 2] --> A2[Actor 2] O3[Obs 3] --> A3[Actor 3] C --> A1 C --> A2 C --> A3 end subgraph Execution[Execution (Distributed)] O1'[Obs 1] --> A1'[Actor 1] O2' [Obs 2] --> A2'[Actor 2] O3' [Obs 3] --> A3'[Actor 3] end
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# CTDE basic framework implementation
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class Actor(nn.Module):
    """Actor for each agent (executable in distributed manner)"""
    def __init__(self, obs_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 64), nn.ReLU(),
            nn.Linear(64, 64), nn.ReLU(),
            nn.Linear(64, action_dim), nn.Tanh()
        )

    def forward(self, obs):
        return self.net(obs)

class CentralizedCritic(nn.Module):
    """Central Critic (used only during training)"""
    def __init__(self, state_dim, n_agents, action_dim):
        super().__init__()
        total_action_dim = n_agents * action_dim
        self.net = nn.Sequential(
            nn.Linear(state_dim + total_action_dim, 128), nn.ReLU(),
            nn.Linear(128, 128), nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, state, actions):
        # actions: [n_agents, action_dim] -> flatten
        x = torch.cat([state, actions.flatten()], dim=-1)
        return self.net(x)

class CTDEAgent:
    """CTDE learning agent"""
    def __init__(self, n_agents, obs_dim, action_dim, state_dim):
        self.n_agents = n_agents
        self.actors = [Actor(obs_dim, action_dim) for _ in range(n_agents)]
        self.critic = CentralizedCritic(state_dim, n_agents, action_dim)

        self.actor_opts = [optim.Adam(a.parameters(), lr=3e-4) for a in self.actors]
        self.critic_opt = optim.Adam(self.critic.parameters(), lr=1e-3)

    def select_actions(self, observations):
        """Execution time: Each agent selects actions independently"""
        actions = []
        for i, obs in enumerate(observations):
            with torch.no_grad():
                obs_t = torch.FloatTensor(obs)
                action = self.actors[i](obs_t).numpy()
            actions.append(action)
        return np.array(actions)

    def train_step(self, batch):
        """Training time: Update using central Critic"""
        states, obs, actions, rewards, next_states, next_obs, dones = batch

        # Critic update (TD error)
        with torch.no_grad():
            next_actions = torch.stack([
                self.actors[i](torch.FloatTensor(next_obs[i]))
                for i in range(self.n_agents)
            ])
            target_q = rewards + 0.99 * self.critic(
                torch.FloatTensor(next_states), next_actions
            ) * (1 - dones)

        current_q = self.critic(torch.FloatTensor(states), torch.FloatTensor(actions))
        critic_loss = nn.MSELoss()(current_q, target_q)

        self.critic_opt.zero_grad()
        critic_loss.backward()
        self.critic_opt.step()

        # Actor update (policy gradient)
        for i in range(self.n_agents):
            new_actions = [
                self.actors[j](torch.FloatTensor(obs[j])) if j == i
                else torch.FloatTensor(actions[j])
                for j in range(self.n_agents)
            ]
            new_actions = torch.stack(new_actions)

            actor_loss = -self.critic(torch.FloatTensor(states), new_actions).mean()

            self.actor_opts[i].zero_grad()
            actor_loss.backward()
            self.actor_opts[i].step()

# Usage example: Cooperative control of 3 CSTRs
n_agents = 3
agent = CTDEAgent(n_agents=n_agents, obs_dim=4, action_dim=2, state_dim=12)

# During execution, distributed (each agent uses only its own observation)
observations = [np.random.randn(4) for _ in range(n_agents)]
actions = agent.select_actions(observations)  # Distributed execution
print(f"Actions in distributed execution: {actions.shape}")  # (3, 2)
2 Independent Q-Learning (IQL) for Multi-Agent Learning

The simplest multi-agent learning approach is where each agent independently performs Q-learning. Other agents are treated as part of the environment, leading to non-stationarity issues, but the implementation is simple and practical in many cases.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# Independent Q-Learning implementation
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class QNetwork(nn.Module):
    """Independent Q-function for each agent"""
    def __init__(self, obs_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 64), nn.ReLU(),
            nn.Linear(64, 64), nn.ReLU(),
            nn.Linear(64, action_dim)
        )

    def forward(self, obs):
        return self.net(obs)

class IQLAgent:
    """Independent Q-Learning agent"""
    def __init__(self, obs_dim, action_dim, agent_id):
        self.agent_id = agent_id
        self.action_dim = action_dim
        self.q_net = QNetwork(obs_dim, action_dim)
        self.target_net = QNetwork(obs_dim, action_dim)
        self.target_net.load_state_dict(self.q_net.state_dict())
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=1e-3)
        self.memory = deque(maxlen=10000)
        self.epsilon = 1.0

    def select_action(self, obs):
        """ε-greedy action selection"""
        if random.random() < self.epsilon:
            return np.random.randint(self.action_dim)
        else:
            with torch.no_grad():
                q_values = self.q_net(torch.FloatTensor(obs))
                return q_values.argmax().item()

    def store_transition(self, obs, action, reward, next_obs, done):
        self.memory.append((obs, action, reward, next_obs, done))

    def train_step(self, batch_size=32):
        if len(self.memory) < batch_size:
            return 0.0

        batch = random.sample(self.memory, batch_size)
        obs, actions, rewards, next_obs, dones = zip(*batch)

        obs_t = torch.FloatTensor(obs)
        actions_t = torch.LongTensor(actions)
        rewards_t = torch.FloatTensor(rewards)
        next_obs_t = torch.FloatTensor(next_obs)
        dones_t = torch.FloatTensor(dones)

        # Current Q-values
        q_values = self.q_net(obs_t).gather(1, actions_t.unsqueeze(1)).squeeze()

        # Target Q-values
        with torch.no_grad():
            next_q_values = self.target_net(next_obs_t).max(1)[0]
            target_q = rewards_t + 0.99 * next_q_values * (1 - dones_t)

        # Update
        loss = nn.MSELoss()(q_values, target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # ε decay
        self.epsilon = max(0.01, self.epsilon * 0.995)

        return loss.item()

    def update_target(self):
        self.target_net.load_state_dict(self.q_net.state_dict())

# Multi-agent environment simulation
class MultiCSTREnv:
    """Environment with 3 CSTRs connected in series"""
    def __init__(self):
        self.n_agents = 3
        self.reset()

    def reset(self):
        # State of each reactor [temperature, concentration]
        self.states = np.array([[350.0, 0.5], [340.0, 0.3], [330.0, 0.1]])
        return self.states

    def step(self, actions):
        # actions: [0=cooling, 1=heating, 2=maintain] for each agent
        rewards = []
        for i in range(self.n_agents):
            T, C = self.states[i]

            # Temperature change from action
            if actions[i] == 0:  # cooling
                T -= 5
            elif actions[i] == 1:  # heating
                T += 5

            # Reaction progress (simple model)
            k = 0.1 * np.exp((T - 350) / 10)
            C = C * (1 - k * 0.1)

            # Inflow to next reactor
            if i < self.n_agents - 1:
                self.states[i+1, 1] += C * 0.3

            self.states[i] = [T, C]

            # Reward: deviation from target temperature and productivity
            temp_penalty = -abs(T - 350)
            production = k * C
            rewards.append(temp_penalty * 0.1 + production * 10)

        done = False
        return self.states.copy(), np.array(rewards), [done]*self.n_agents

# Training loop
env = MultiCSTREnv()
agents = [IQLAgent(obs_dim=2, action_dim=3, agent_id=i) for i in range(3)]

for episode in range(500):
    obs = env.reset()
    episode_rewards = [0] * 3

    for step in range(100):
        actions = [agents[i].select_action(obs[i]) for i in range(3)]
        next_obs, rewards, dones = env.step(actions)

        # Each agent learns independently
        for i in range(3):
            agents[i].store_transition(obs[i], actions[i], rewards[i],
                                       next_obs[i], dones[i])
            agents[i].train_step()
            episode_rewards[i] += rewards[i]

        obs = next_obs

    # Periodic target update
    if episode % 10 == 0:
        for agent in agents:
            agent.update_target()

    if episode % 50 == 0:
        print(f"Episode {episode}, Total Rewards: {sum(episode_rewards):.2f}")
3 QMIX: Credit Assignment via Value Decomposition Network

QMIX integrates individual Q-values of each agent using a mixing network, achieving credit assignment while maintaining monotonicity constraints (Individual-Global-Max: IGM).

QMIX Value Decomposition:

\[ Q_{tot}(\boldsymbol{\tau}, \mathbf{u}) = f_{mix}(Q_1(\tau^1, u^1), \ldots, Q_n(\tau^n, u^n); s) \]

Monotonicity constraint:

\[ \frac{\partial Q_{tot}}{\partial Q_i} \geq 0, \quad \forall i \]
# Requirements:
# - Python 3.9+
# - torch>=2.0.0, <2.3.0

# QMIX implementation
import torch
import torch.nn as nn
import torch.optim as optim

class AgentQNetwork(nn.Module):
    """Q-function for each agent"""
    def __init__(self, obs_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 64), nn.ReLU(),
            nn.Linear(64, action_dim)
        )

    def forward(self, obs):
        return self.net(obs)

class QMixerNetwork(nn.Module):
    """Mixing network that guarantees monotonicity"""
    def __init__(self, n_agents, state_dim):
        super().__init__()
        self.n_agents = n_agents

        # Hypernetwork (generates weights from state)
        self.hyper_w1 = nn.Linear(state_dim, n_agents * 32)
        self.hyper_b1 = nn.Linear(state_dim, 32)
        self.hyper_w2 = nn.Linear(state_dim, 32)
        self.hyper_b2 = nn.Sequential(
            nn.Linear(state_dim, 32), nn.ReLU(),
            nn.Linear(32, 1)
        )

    def forward(self, agent_qs, state):
        """
        agent_qs: [batch, n_agents]
        state: [batch, state_dim]
        """
        batch_size = agent_qs.size(0)
        agent_qs = agent_qs.view(batch_size, 1, self.n_agents)

        # First layer weights (monotonicity guaranteed by absolute value)
        w1 = torch.abs(self.hyper_w1(state))
        w1 = w1.view(batch_size, self.n_agents, 32)
        b1 = self.hyper_b1(state).view(batch_size, 1, 32)

        # First layer output
        hidden = torch.bmm(agent_qs, w1) + b1
        hidden = torch.relu(hidden)

        # Second layer weights (monotonicity guaranteed by absolute value)
        w2 = torch.abs(self.hyper_w2(state))
        w2 = w2.view(batch_size, 32, 1)
        b2 = self.hyper_b2(state).view(batch_size, 1, 1)

        # Final output
        q_tot = torch.bmm(hidden, w2) + b2
        return q_tot.view(batch_size)

class QMIX:
    """QMIX learning algorithm"""
    def __init__(self, n_agents, obs_dim, action_dim, state_dim):
        self.n_agents = n_agents
        self.agent_networks = [AgentQNetwork(obs_dim, action_dim)
                               for _ in range(n_agents)]
        self.mixer = QMixerNetwork(n_agents, state_dim)
        self.target_networks = [AgentQNetwork(obs_dim, action_dim)
                                for _ in range(n_agents)]
        self.target_mixer = QMixerNetwork(n_agents, state_dim)

        # Target initialization
        for i in range(n_agents):
            self.target_networks[i].load_state_dict(
                self.agent_networks[i].state_dict())
        self.target_mixer.load_state_dict(self.mixer.state_dict())

        # Optimizer
        params = list(self.mixer.parameters())
        for net in self.agent_networks:
            params += list(net.parameters())
        self.optimizer = optim.Adam(params, lr=5e-4)

    def select_actions(self, observations, epsilon=0.05):
        """Action selection for each agent"""
        actions = []
        for i, obs in enumerate(observations):
            if torch.rand(1).item() < epsilon:
                actions.append(torch.randint(0, 5, (1,)).item())
            else:
                with torch.no_grad():
                    q_vals = self.agent_networks[i](torch.FloatTensor(obs))
                    actions.append(q_vals.argmax().item())
        return actions

    def train_step(self, batch):
        """QMIX update"""
        states, obs_list, actions, rewards, next_states, next_obs_list, dones = batch

        # Current Q-values (each agent)
        agent_qs = []
        for i in range(self.n_agents):
            q_vals = self.agent_networks[i](torch.FloatTensor(obs_list[i]))
            q = q_vals.gather(1, torch.LongTensor(actions[:, i]).unsqueeze(1))
            agent_qs.append(q)
        agent_qs = torch.cat(agent_qs, dim=1)

        # Mixed Q_tot
        q_tot = self.mixer(agent_qs, torch.FloatTensor(states))

        # Target Q-values
        with torch.no_grad():
            target_agent_qs = []
            for i in range(self.n_agents):
                target_q = self.target_networks[i](
                    torch.FloatTensor(next_obs_list[i])).max(1)[0]
                target_agent_qs.append(target_q.unsqueeze(1))
            target_agent_qs = torch.cat(target_agent_qs, dim=1)
            target_q_tot = self.target_mixer(target_agent_qs,
                                             torch.FloatTensor(next_states))
            target = rewards + 0.99 * target_q_tot * (1 - dones)

        # Loss
        loss = nn.MSELoss()(q_tot, target)
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.mixer.parameters(), 10)
        self.optimizer.step()

        return loss.item()

# Usage example
qmix = QMIX(n_agents=3, obs_dim=4, action_dim=5, state_dim=12)
observations = [torch.randn(4) for _ in range(3)]
actions = qmix.select_actions(observations)
print(f"QMIX actions: {actions}")
4 Inter-Agent Communication Protocol (Message Passing)

Exchanging information between agents enables more effective cooperation. Methods like CommNet and TarMAC implement message passing using attention mechanisms.

graph LR A1[Agent 1] -->|msg₁| C[Communication
Channel] A2[Agent 2] -->|msg₂| C A3[Agent 3] -->|msg₃| C C -->|aggregated| A1 C -->|aggregated| A2 C -->|aggregated| A3
# Requirements:
# - Python 3.9+
# - torch>=2.0.0, <2.3.0

# Message passing mechanism implementation
import torch
import torch.nn as nn

class AttentionCommModule(nn.Module):
    """Attention-based communication module"""
    def __init__(self, hidden_dim, n_agents):
        super().__init__()
        self.n_agents = n_agents
        self.hidden_dim = hidden_dim

        # Message generation
        self.msg_encoder = nn.Linear(hidden_dim, hidden_dim)

        # Attention mechanism
        self.query = nn.Linear(hidden_dim, hidden_dim)
        self.key = nn.Linear(hidden_dim, hidden_dim)
        self.value = nn.Linear(hidden_dim, hidden_dim)

    def forward(self, hidden_states):
        """
        hidden_states: [n_agents, hidden_dim]
        returns: [n_agents, hidden_dim] (hidden states after communication)
        """
        # Message generation
        messages = self.msg_encoder(hidden_states)  # [n_agents, hidden_dim]

        # Attention score calculation
        Q = self.query(hidden_states)  # [n_agents, hidden_dim]
        K = self.key(messages)  # [n_agents, hidden_dim]
        V = self.value(messages)  # [n_agents, hidden_dim]

        # Scaled dot-product attention
        attn_scores = torch.matmul(Q, K.T) / (self.hidden_dim ** 0.5)
        attn_weights = torch.softmax(attn_scores, dim=-1)

        # Message aggregation
        aggregated = torch.matmul(attn_weights, V)

        return hidden_states + aggregated  # Residual connection

class CommunicativeAgent(nn.Module):
    """Agent capable of communication"""
    def __init__(self, obs_dim, action_dim, hidden_dim, n_agents):
        super().__init__()
        self.obs_encoder = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim), nn.ReLU()
        )
        self.comm_module = AttentionCommModule(hidden_dim, n_agents)
        self.policy = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, observations, agent_idx):
        """
        observations: [n_agents, obs_dim]
        agent_idx: Index of this agent
        """
        # Encode each agent's observation
        hidden_states = self.obs_encoder(observations)

        # Communication phase
        comm_hidden = self.comm_module(hidden_states)

        # Action selection with own hidden state
        my_hidden = comm_hidden[agent_idx]
        action_logits = self.policy(my_hidden)

        return action_logits, comm_hidden

# Multi-round communication
class MultiRoundCommAgent(nn.Module):
    """Agent performing multiple rounds of communication"""
    def __init__(self, obs_dim, action_dim, hidden_dim, n_agents, n_comm_rounds=2):
        super().__init__()
        self.n_comm_rounds = n_comm_rounds

        self.obs_encoder = nn.Linear(obs_dim, hidden_dim)
        self.comm_modules = nn.ModuleList([
            AttentionCommModule(hidden_dim, n_agents)
            for _ in range(n_comm_rounds)
        ])
        self.policy = nn.Linear(hidden_dim, action_dim)

    def forward(self, observations):
        """
        observations: [n_agents, obs_dim]
        returns: [n_agents, action_dim]
        """
        hidden = torch.relu(self.obs_encoder(observations))

        # Multiple rounds of communication
        for comm_module in self.comm_modules:
            hidden = comm_module(hidden)

        # Each agent selects action
        actions = self.policy(hidden)
        return actions

# Usage example: Cooperative control of 3 reactors
n_agents = 3
obs_dim = 4  # [temperature, concentration, flow rate, pressure]
action_dim = 5  # Discrete actions
hidden_dim = 64

agent = MultiRoundCommAgent(obs_dim, action_dim, hidden_dim, n_agents, n_comm_rounds=2)

# Simulation
observations = torch.randn(n_agents, obs_dim)
actions = agent(observations)
print(f"Actions after communication: {actions.shape}")  # [3, 5]

# Using individual agent
single_agent = CommunicativeAgent(obs_dim, action_dim, hidden_dim, n_agents)
action_logits, comm_hidden = single_agent(observations, agent_idx=0)
print(f"Agent 0 action: {torch.softmax(action_logits, dim=-1)}")
print(f"Hidden state after communication: {comm_hidden.shape}")  # [3, 64]
5 Cooperative Task: Synchronized Control of 3 CSTRs

Implement a cooperative task where three continuous stirred-tank reactors (CSTR) are connected in series, maximizing overall productivity while appropriately controlling the temperature of each reactor.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# Cooperative task: Synchronized control of 3 CSTRs
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

class CooperativeCSTREnv:
    """Cooperative environment with 3 CSTRs connected in series"""
    def __init__(self):
        self.n_agents = 3
        self.dt = 0.1  # Time step [min]
        self.reset()

    def reset(self):
        # Each CSTR: [temperature T(K), concentration CA(mol/L), flow rate F(L/min)]
        self.states = np.array([
            [350.0, 2.0, 100.0],  # CSTR1
            [340.0, 1.5, 100.0],  # CSTR2
            [330.0, 1.0, 100.0]   # CSTR3
        ])
        self.time = 0
        return self._get_observations()

    def _get_observations(self):
        """Observation for each agent (local information + neighboring information)"""
        obs = []
        for i in range(self.n_agents):
            local = self.states[i].copy()
            # Upstream information (for cooperation)
            prev = self.states[i-1] if i > 0 else np.zeros(3)
            # Downstream information
            next_ = self.states[i+1] if i < self.n_agents-1 else np.zeros(3)
            obs.append(np.concatenate([local, prev, next_]))
        return obs

    def step(self, actions):
        """
        actions: [n_agents, 2] = [[Q1, Tin1], [Q2, Tin2], [Q3, Tin3]]
        Q: Cooling flow rate [L/min] (0-50)
        Tin: Inlet temperature [K] (300-400)
        """
        rewards = []

        for i in range(self.n_agents):
            T, CA, F = self.states[i]
            Q = actions[i][0] * 50  # Denormalization
            Tin = actions[i][1] * 100 + 300

            # Reaction rate constant (Arrhenius equation)
            Ea = 50000  # [J/mol]
            R = 8.314   # [J/(mol·K)]
            k = 1e10 * np.exp(-Ea / (R * T))

            # CSTR model
            V = 1000  # Reactor volume [L]
            rho = 1000  # Density [g/L]
            Cp = 4.18   # Specific heat [J/(g·K)]
            dHr = -50000  # Heat of reaction [J/mol]

            # Inlet concentration (inflow from upstream)
            CA_in = self.states[i-1, 1] if i > 0 else 2.5

            # Mass balance
            dCA = (F / V) * (CA_in - CA) - k * CA
            CA_new = CA + dCA * self.dt

            # Energy balance
            Q_reaction = -dHr * k * CA * V
            Q_cooling = Q * rho * Cp * (T - Tin)
            dT = (Q_reaction - Q_cooling) / (V * rho * Cp)
            T_new = T + dT * self.dt

            self.states[i] = [T_new, max(0, CA_new), F]

            # Cooperative reward: Overall productivity and stability
            production = k * CA  # Reaction rate
            temp_penalty = -abs(T_new - 350) ** 2  # Deviation from target temperature
            flow_continuity = -abs(F - 100) ** 2 if i > 0 else 0

            reward = production * 100 + temp_penalty * 0.1 + flow_continuity * 0.01
            rewards.append(reward)

        # Common reward (cooperative task)
        total_production = sum([self.states[i, 1] for i in range(self.n_agents)])
        common_reward = total_production * 10
        rewards = [r + common_reward for r in rewards]

        self.time += self.dt
        done = self.time >= 10  # 10-minute episode

        return self._get_observations(), np.array(rewards), [done]*self.n_agents

# QMIX training (simplified version)
class SimpleQMIX:
    def __init__(self, n_agents, obs_dim, action_dim):
        self.n_agents = n_agents
        self.q_nets = [nn.Sequential(
            nn.Linear(obs_dim, 64), nn.ReLU(),
            nn.Linear(64, action_dim)
        ) for _ in range(n_agents)]
        self.mixer = nn.Sequential(
            nn.Linear(n_agents, 32), nn.ReLU(),
            nn.Linear(32, 1)
        )
        params = list(self.mixer.parameters())
        for net in self.q_nets:
            params += list(net.parameters())
        self.optimizer = optim.Adam(params, lr=1e-3)

    def select_actions(self, observations):
        actions = []
        for i, obs in enumerate(observations):
            with torch.no_grad():
                q_vals = self.q_nets[i](torch.FloatTensor(obs))
                # Discretize continuous actions (simplified)
                action = torch.rand(2)  # [Q normalized, Tin normalized]
                actions.append(action.numpy())
        return np.array(actions)

# Training execution
env = CooperativeCSTREnv()
agent = SimpleQMIX(n_agents=3, obs_dim=9, action_dim=10)

for episode in range(100):
    obs = env.reset()
    episode_reward = 0

    for step in range(100):
        actions = agent.select_actions(obs)
        next_obs, rewards, dones = env.step(actions)
        episode_reward += sum(rewards)
        obs = next_obs

        if dones[0]:
            break

    if episode % 10 == 0:
        print(f"Episode {episode}, Total Reward: {episode_reward:.2f}")
        print(f"  Final Temps: {[f'{s[0]:.1f}K' for s in env.states]}")
        print(f"  Final Concs: {[f'{s[1]:.3f}mol/L' for s in env.states]}")
6 Competitive Task: Allocation of Limited Utilities

A competitive scenario where multiple plants compete for limited utilities such as steam and cooling water. Each agent needs to maximize its own production while coordinating under resource constraints.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# Competitive task: Utility allocation problem
import numpy as np
import torch
import torch.nn as nn

class CompetitiveUtilityEnv:
    """Competition for limited utilities among multiple plants"""
    def __init__(self, n_plants=3):
        self.n_plants = n_plants
        self.total_steam = 500.0  # Total steam supply [kg/h]
        self.total_cooling = 1000.0  # Total cooling water [L/min]
        self.reset()

    def reset(self):
        # Each plant: [production rate, temperature, steam usage, cooling water usage]
        self.states = np.array([
            [50.0, 350.0, 150.0, 300.0] for _ in range(self.n_plants)
        ])
        return self._get_observations()

    def _get_observations(self):
        """Observation for each plant"""
        obs = []
        for i in range(self.n_plants):
            # Own state + resource availability information
            steam_used = sum(self.states[:, 2])
            cooling_used = sum(self.states[:, 3])
            steam_avail = max(0, self.total_steam - steam_used)
            cooling_avail = max(0, self.total_cooling - cooling_used)

            obs_i = np.concatenate([
                self.states[i],
                [steam_avail, cooling_avail]
            ])
            obs.append(obs_i)
        return obs

    def step(self, actions):
        """
        actions: [n_plants, 2] = [[steam_request, cooling_request], ...]
        Each value is normalized to 0-1
        """
        # Convert requests to real values
        steam_requests = actions[:, 0] * 200  # 0-200 kg/h
        cooling_requests = actions[:, 1] * 400  # 0-400 L/min

        # Resource allocation (proportional allocation)
        total_steam_req = sum(steam_requests)
        total_cooling_req = sum(cooling_requests)

        if total_steam_req > self.total_steam:
            steam_allocated = steam_requests * (self.total_steam / total_steam_req)
        else:
            steam_allocated = steam_requests

        if total_cooling_req > self.total_cooling:
            cooling_allocated = cooling_requests * (self.total_cooling / total_cooling_req)
        else:
            cooling_allocated = cooling_requests

        rewards = []
        for i in range(self.n_plants):
            # Production rate depends on resources
            steam_factor = steam_allocated[i] / 200
            cooling_factor = cooling_allocated[i] / 400
            production = 100 * steam_factor * cooling_factor

            # Temperature management (penalty for insufficient cooling)
            temp_change = (steam_allocated[i] * 0.5 - cooling_allocated[i] * 0.3)
            temp_new = self.states[i, 1] + temp_change
            temp_penalty = -abs(temp_new - 350) if temp_new > 380 else 0

            self.states[i] = [production, temp_new,
                            steam_allocated[i], cooling_allocated[i]]

            # Reward: productivity - resource shortage penalty
            shortage_penalty = 0
            if steam_allocated[i] < steam_requests[i]:
                shortage_penalty += (steam_requests[i] - steam_allocated[i]) * 0.5
            if cooling_allocated[i] < cooling_requests[i]:
                shortage_penalty += (cooling_requests[i] - cooling_allocated[i]) * 0.3

            reward = production - shortage_penalty + temp_penalty
            rewards.append(reward)

        done = False
        return self._get_observations(), np.array(rewards), [done]*self.n_plants

# Nash Q-Learning (for competitive tasks)
class NashQLearningAgent:
    """Agent learning Nash equilibrium"""
    def __init__(self, obs_dim, action_dim, agent_id):
        self.agent_id = agent_id
        self.q_net = nn.Sequential(
            nn.Linear(obs_dim, 64), nn.ReLU(),
            nn.Linear(64, 64), nn.ReLU(),
            nn.Linear(64, action_dim)
        )
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=1e-3)
        self.epsilon = 0.3

    def select_action(self, obs):
        """ε-greedy with Nash equilibrium consideration"""
        if np.random.rand() < self.epsilon:
            return np.random.rand(2)
        else:
            with torch.no_grad():
                # Continuous action approximation (simplified)
                return torch.sigmoid(torch.randn(2)).numpy()

    def update_policy(self, obs, action, reward, next_obs):
        """Q-learning update (simplified version)"""
        # Implementation omitted (learning algorithm for competitive environment)
        pass

# Simulation
env = CompetitiveUtilityEnv(n_plants=3)
agents = [NashQLearningAgent(obs_dim=6, action_dim=2, agent_id=i)
          for i in range(3)]

for episode in range(50):
    obs = env.reset()
    episode_rewards = [0] * 3

    for step in range(50):
        actions = np.array([agents[i].select_action(obs[i]) for i in range(3)])
        next_obs, rewards, dones = env.step(actions)

        for i in range(3):
            episode_rewards[i] += rewards[i]

        obs = next_obs

    if episode % 10 == 0:
        print(f"\nEpisode {episode}")
        print(f"  Individual Rewards: {[f'{r:.1f}' for r in episode_rewards]}")
        print(f"  Productions: {[f'{s[0]:.1f}' for s in env.states]}")
        print(f"  Steam Usage: {[f'{s[2]:.1f}' for s in env.states]} / {env.total_steam}")
        print(f"  Cooling Usage: {[f'{s[3]:.1f}' for s in env.states]} / {env.total_cooling}")
7 Mixed Task: Production System with Coexisting Cooperation and Competition

In real plants, both cooperative and competitive aspects exist. Each plant needs to achieve its own production targets (competition) while also considering overall stability and efficiency (cooperation).

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# Mixed task: Production system with coexisting cooperation and competition
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

class MixedCoopCompEnv:
    """Complex environment with mixed cooperation and competition"""
    def __init__(self, n_plants=3):
        self.n_plants = n_plants
        self.total_energy = 1000.0  # Total energy [kW]
        self.production_targets = [100, 120, 80]  # Production target for each plant
        self.reset()

    def reset(self):
        # Each plant: [production, temperature, energy usage, quality]
        self.states = np.array([
            [50.0, 350.0, 300.0, 0.9] for _ in range(self.n_plants)
        ])
        self.time = 0
        return self._get_observations()

    def _get_observations(self):
        obs = []
        for i in range(self.n_plants):
            # Own state + target + other plants' production (for cooperation)
            others_production = [self.states[j, 0] for j in range(self.n_plants) if j != i]
            total_energy_used = sum(self.states[:, 2])

            obs_i = np.concatenate([
                self.states[i],
                [self.production_targets[i]],
                others_production,
                [total_energy_used, self.total_energy]
            ])
            obs.append(obs_i)
        return obs

    def step(self, actions):
        """
        actions: [n_plants, 3] = [[energy_req, temp_setpoint, quality_target], ...]
        """
        # Energy allocation (competitive element)
        energy_requests = actions[:, 0] * 500
        total_energy_req = sum(energy_requests)

        if total_energy_req > self.total_energy:
            # When insufficient, allocate based on priority (prioritize plants below target)
            priorities = [
                max(0, self.production_targets[i] - self.states[i, 0])
                for i in range(self.n_plants)
            ]
            total_priority = sum(priorities) + 1e-6
            energy_allocated = [
                self.total_energy * (priorities[i] / total_priority)
                for i in range(self.n_plants)
            ]
        else:
            energy_allocated = energy_requests

        rewards = []
        for i in range(self.n_plants):
            temp_setpoint = actions[i, 1] * 100 + 300  # 300-400K
            quality_target = actions[i, 2]  # 0-1

            # Production model
            energy_factor = energy_allocated[i] / 500
            temp_factor = 1.0 - abs(temp_setpoint - 350) / 100
            production = self.production_targets[i] * energy_factor * temp_factor

            # Quality model
            quality = 0.5 + 0.5 * quality_target * temp_factor

            # Temperature update
            temp = self.states[i, 1] + (temp_setpoint - self.states[i, 1]) * 0.3

            self.states[i] = [production, temp, energy_allocated[i], quality]

            # Reward design (mixed)
            # 1. Individual target achievement (competitive element)
            target_achievement = -abs(production - self.production_targets[i])

            # 2. Quality penalty
            quality_reward = quality * 10

            # 3. Energy efficiency (cooperative element)
            energy_efficiency = production / (energy_allocated[i] + 1)

            reward = target_achievement + quality_reward + energy_efficiency * 5
            rewards.append(reward)

        # Cooperation bonus: Overall stability
        total_production = sum(self.states[:, 0])
        stability = -np.std(self.states[:, 1])  # Temperature standard deviation
        cooperation_bonus = (total_production / sum(self.production_targets)) * 50 + stability

        # Final reward = Individual reward + cooperation bonus
        rewards = [r + cooperation_bonus * 0.3 for r in rewards]

        self.time += 1
        done = self.time >= 100

        return self._get_observations(), np.array(rewards), [done]*self.n_plants

# COMA (Counterfactual Multi-Agent Policy Gradient)
class COMAAgent:
    """COMA agent for mixed tasks"""
    def __init__(self, n_agents, obs_dim, action_dim, state_dim):
        self.n_agents = n_agents

        # Actor for each agent
        self.actors = [nn.Sequential(
            nn.Linear(obs_dim, 64), nn.ReLU(),
            nn.Linear(64, action_dim), nn.Tanh()
        ) for _ in range(n_agents)]

        # Central Critic (counterfactual baseline)
        self.critic = nn.Sequential(
            nn.Linear(state_dim + n_agents * action_dim, 128), nn.ReLU(),
            nn.Linear(128, n_agents)  # Q-value for each agent
        )

        self.actor_opts = [optim.Adam(a.parameters(), lr=3e-4) for a in self.actors]
        self.critic_opt = optim.Adam(self.critic.parameters(), lr=1e-3)

    def select_actions(self, observations):
        actions = []
        for i, obs in enumerate(observations):
            with torch.no_grad():
                action = self.actors[i](torch.FloatTensor(obs)).numpy()
            actions.append(action)
        return np.array(actions)

# Training loop
env = MixedCoopCompEnv(n_plants=3)
agent = COMAAgent(n_agents=3, obs_dim=9, action_dim=3, state_dim=12)

for episode in range(100):
    obs = env.reset()
    episode_rewards = [0] * 3

    for step in range(100):
        actions = agent.select_actions(obs)
        next_obs, rewards, dones = env.step(actions)

        for i in range(3):
            episode_rewards[i] += rewards[i]

        obs = next_obs

        if dones[0]:
            break

    if episode % 20 == 0:
        print(f"\nEpisode {episode}")
        print(f"  Individual Rewards: {[f'{r:.1f}' for r in episode_rewards]}")
        print(f"  Productions: {[f'{s[0]:.1f}' for s in env.states]}")
        print(f"  Targets: {env.production_targets}")
        print(f"  Qualities: {[f'{s[3]:.2f}' for s in env.states]}")
        print(f"  Total Energy Used: {sum(env.states[:, 2]):.1f} / {env.total_energy}")

Chapter 4 Summary

What We Learned

  • CTDE: Efficient framework with centralized training and distributed execution
  • Independent Q-Learning: Simplest approach but has non-stationarity issues
  • QMIX: Credit assignment through value decomposition and monotonicity constraints
  • Communication Mechanisms: Strengthening cooperation through attention-based message passing
  • Cooperative Tasks: Pursuing global optimum with common rewards
  • Competitive Tasks: Resource allocation problems
  • Mixed Tasks: Complex scenarios closer to real plant operation

Algorithm Comparison

Method Applicable Task Learning Efficiency Implementation Difficulty
IQL Simple cooperation Low Low
QMIX Cooperative tasks High Medium
CTDE Mixed tasks High Medium
CommNet Complex cooperation High High

Implications for Process Control

  • QMIX and CTDE are effective for cooperative control of multiple reactors and distillation columns
  • Consider Nash Q-Learning for competitive tasks like utility allocation
  • When communication delays exist, include buffer information in observations
  • Mixed tasks are common in real plants, making reward design crucial

Next Chapter

Chapter 5: Real Plant Deployment and Safety

Learn about ensuring safety when deploying reinforcement learning agents to actual plants, including sim-to-real transfer, uncertainty quantification, and human override mechanisms.

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer