🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 5: Real Plant Deployment and Safety

📖 Reading Time: 30-40 minutes 📊 Difficulty: Advanced
Autonomous Process Operation with AI Agents Series

Chapter 5: Real Plant Deployment and Safety

This chapter covers Real Plant Deployment and Safety. You will learn essential concepts and techniques.

Chapter 5 Overview

Training reinforcement learning agents in simulation environments alone is not sufficient. When deploying to real plants, there are many challenges including ensuring safety, overcoming the gap between simulation and reality, and dealing with uncertainty.

This chapter explains seven key technologies for real plant deployment with implementation examples. Chemical plants handle high temperatures, high pressures, and hazardous materials, requiring particularly careful approaches to autonomous control by AI.

Important Safety Notice

The technologies covered in this chapter assume deployment to real plants. Actual deployment requires process safety expertise, regulatory compliance, and thorough verification and testing. This code is for educational purposes, and appropriate safety assessment is essential for use in real plants.

What You Will Learn in This Chapter

  • Sim-to-Real Transfer: Robustness improvement through domain randomization
  • Safe Exploration: Avoiding dangerous regions with action constraints
  • Conservative Q-Learning (CQL): Offline learning to prevent overestimation
  • Human Override: Emergency human intervention mechanism
  • Uncertainty Quantification: Confidence interval estimation using Bayesian NNs and ensembles
  • Performance Monitoring and Drift Detection: Continuous monitoring
  • Integrated Deployment Framework: Implementation combining all elements

Real Plant Deployment Challenges

Sim-to-Real Gap

graph LR A[Simulation Environment] -->|Ideal Model| B[Perfect Control] C[Real Plant] -->|Model Error
Disturbance
Sensor Noise| D[Performance Degradation] B -.->|Sim-to-Real Gap| D E[Domain Randomization] --> F[Robust Policy] F --> C

Safety Hierarchy

Layer Function Implementation
1. Action Constraints Prohibition of dangerous actions Hard limits, safety filters
2. Uncertainty Consideration Confidence interval assessment Bayesian NN, ensembles
3. Performance Monitoring Anomaly detection Drift detection, KPI monitoring
4. Human Intervention Emergency stop Override mechanism
1 Sim-to-Real Transfer Through Domain Randomization

By learning with randomized parameters in simulation environments, acquire policies that are robust to uncertainties in real environments.

Domain Randomization:

\[ \theta \sim p(\Theta), \quad \pi^* = \arg\max_\pi \mathbb{E}_{\theta \sim p(\Theta)} [J(\pi; \theta)] \]

Where \(\theta\) is environment parameter, \(p(\Theta)\) is parameter distribution

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# Domain Randomization Implementation
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

class RandomizedCSTREnv:
    """CSTR environment with randomized parameters"""
    def __init__(self, randomize=True):
        self.randomize = randomize
        self.reset()

    def _sample_parameters(self):
        """Random sampling of physical parameters"""
        if self.randomize:
            # Activation energy (±20% variation)
            self.Ea = np.random.uniform(40000, 60000)

            # Reaction enthalpy (±15% variation)
            self.dHr = np.random.uniform(-57500, -42500)

            # Heat transfer coefficient (±25% variation)
            self.U = np.random.uniform(300, 500)

            # Volume (manufacturing variation ±5%)
            self.V = np.random.uniform(950, 1050)

            # Sensor noise standard deviation
            self.temp_noise = np.random.uniform(0.1, 1.0)
            self.conc_noise = np.random.uniform(0.01, 0.05)

            # Control delay (communication delay + valve response)
            self.control_delay = np.random.randint(1, 4)
        else:
            # Nominal values
            self.Ea = 50000
            self.dHr = -50000
            self.U = 400
            self.V = 1000
            self.temp_noise = 0.5
            self.conc_noise = 0.02
            self.control_delay = 2

    def reset(self):
        self._sample_parameters()
        self.state = np.array([350.0, 2.0])  # [temperature, concentration]
        self.action_buffer = [0.5] * self.control_delay
        return self._get_observation()

    def _get_observation(self):
        """Noisy observation"""
        T, CA = self.state
        T_obs = T + np.random.normal(0, self.temp_noise)
        CA_obs = CA + np.random.normal(0, self.conc_noise)
        return np.array([T_obs, CA_obs])

    def step(self, action):
        """Step with control delay"""
        # Apply delayed action
        self.action_buffer.append(action)
        actual_action = self.action_buffer.pop(0)

        T, CA = self.state

        # Reaction rate (randomized parameters)
        R = 8.314
        k = 1e10 * np.exp(-self.Ea / (R * T))

        # CSTR dynamics
        dt = 0.1
        F = 100  # Flow rate [L/min]
        CA_in = 2.5
        Tin = 350
        rho = 1000
        Cp = 4.18

        # Cooling amount (action)
        Q_cool = actual_action * 10000  # 0-10000 W

        # Material balance
        dCA = (F / self.V) * (CA_in - CA) - k * CA
        CA_new = CA + dCA * dt

        # Energy balance
        Q_rxn = -self.dHr * k * CA * self.V
        Q_jacket = self.U * 10 * (T - Tin) + Q_cool
        dT = (Q_rxn - Q_jacket) / (self.V * rho * Cp)
        T_new = T + dT * dt

        self.state = np.array([T_new, max(0, CA_new)])

        # Reward
        temp_penalty = -abs(T_new - 350) ** 2 * 0.01
        production = k * CA * 10
        reward = temp_penalty + production

        done = T_new > 400 or T_new < 300  # Outside safe range
        return self._get_observation(), reward, done

# Robust SAC learning
class RobustSACAgent:
    """SAC agent using domain randomization"""
    def __init__(self, obs_dim, action_dim):
        self.actor = nn.Sequential(
            nn.Linear(obs_dim, 128), nn.ReLU(),
            nn.Linear(128, 128), nn.ReLU(),
            nn.Linear(128, action_dim), nn.Tanh()
        )
        self.optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)

    def select_action(self, obs):
        with torch.no_grad():
            return self.actor(torch.FloatTensor(obs)).numpy()

# Training: train in randomized environment
train_env = RandomizedCSTREnv(randomize=True)
agent = RobustSACAgent(obs_dim=2, action_dim=1)

print("Training in randomized environment...")
for episode in range(500):
    obs = train_env.reset()
    episode_reward = 0

    for step in range(100):
        action = agent.select_action(obs)
        next_obs, reward, done = train_env.step(action[0])
        episode_reward += reward
        obs = next_obs

        if done:
            break

    if episode % 100 == 0:
        print(f"Episode {episode}, Reward: {episode_reward:.2f}")
        print(f"  Env params: Ea={train_env.Ea:.0f}, V={train_env.V:.0f}, "
              f"delay={train_env.control_delay}")

# Evaluation: test in nominal environment (assuming real plant)
test_env = RandomizedCSTREnv(randomize=False)
print("\nTesting in nominal environment...")

obs = test_env.reset()
test_reward = 0
temps = []

for step in range(100):
    action = agent.select_action(obs)
    obs, reward, done = test_env.step(action[0])
    test_reward += reward
    temps.append(obs[0])

    if done:
        break

print(f"Test Reward: {test_reward:.2f}")
print(f"Temp Mean: {np.mean(temps):.2f}K, Std: {np.std(temps):.2f}K")
2 Safe Exploration: Safety Filter with Action Constraints

Impose physical and safety constraints to prevent reinforcement learning agents from taking dangerous actions. Implement Control Barrier Functions (CBF) and safety layers.

graph LR A[RL Policy π] -->|Dangerous Action?| B[Safety Filter] B -->|Safe Action| C[Execute] B -->|Constraint Violation| D[Safe Alternative Action] D --> C
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# Safe Exploration: Safety Filter Implementation
import numpy as np
import torch
import torch.nn as nn

class SafetyConstraints:
    """Safety constraints for CSTR"""
    def __init__(self):
        # Temperature constraints
        self.T_min = 310.0  # [K]
        self.T_max = 390.0  # [K]
        self.T_target = 350.0

        # Concentration constraints
        self.CA_min = 0.1  # [mol/L]
        self.CA_max = 3.0

        # Control input constraints
        self.u_min = -100.0  # Maximum cooling [kW]
        self.u_max = 50.0    # Maximum heating [kW]

        # Rate of change constraints
        self.du_max = 20.0  # [kW/step]

    def is_safe_state(self, state):
        """Check if state is safe"""
        T, CA = state
        return (self.T_min <= T <= self.T_max and
                self.CA_min <= CA <= self.CA_max)

    def is_safe_action(self, state, action, prev_action):
        """Check if action is safe"""
        # Control input range
        if not (self.u_min <= action <= self.u_max):
            return False

        # Rate of change constraint
        if abs(action - prev_action) > self.du_max:
            return False

        return True

    def project_to_safe(self, action, prev_action):
        """Project action to safe region"""
        # Range constraint
        action = np.clip(action, self.u_min, self.u_max)

        # Rate of change constraint
        delta = action - prev_action
        if abs(delta) > self.du_max:
            action = prev_action + np.sign(delta) * self.du_max

        return action

class ControlBarrierFunction:
    """Safety assurance with Control Barrier Function (CBF)"""
    def __init__(self, safety_constraints):
        self.constraints = safety_constraints
        self.alpha = 0.5  # Class K function gain

    def barrier_function(self, state):
        """Barrier function h(x) >= 0 is safe region"""
        T, CA = state

        # Temperature barrier (distance function)
        h_T_min = T - self.constraints.T_min
        h_T_max = self.constraints.T_max - T

        # Concentration barrier
        h_CA_min = CA - self.constraints.CA_min
        h_CA_max = self.constraints.CA_max - CA

        # Minimum value (strictest constraint)
        return min(h_T_min, h_T_max, h_CA_min, h_CA_max)

    def safe_action(self, state, desired_action, env_model):
        """Calculate safe action satisfying CBF constraint"""
        h = self.barrier_function(state)

        # Do nothing if in safe region
        if h > 10.0:
            return desired_action

        # Impose constraint near boundary
        # Simple implementation: check barrier condition for predicted next state
        next_state_pred = env_model.predict(state, desired_action)
        h_next = self.barrier_function(next_state_pred)

        # CBF condition: h_next >= -alpha * h
        if h_next >= -self.alpha * h:
            return desired_action
        else:
            # Modify to safe side (conservative action)
            T, CA = state
            if T > self.constraints.T_target:
                # Enhance cooling
                return max(desired_action, 0)
            else:
                # Suppress heating
                return min(desired_action, 0)

class SimpleCSTRModel:
    """Simple prediction model for CSTR"""
    def predict(self, state, action, dt=0.1):
        T, CA = state
        k = 1e10 * np.exp(-50000 / (8.314 * T))

        # Simple dynamics
        dCA = -k * CA * dt
        dT = (action * 1000 - 400 * (T - 350)) / 4180 * dt

        return np.array([T + dT, CA + dCA])

# Usage example
safety = SafetyConstraints()
cbf = ControlBarrierFunction(safety)
model = SimpleCSTRModel()

# Apply safety filter to RL agent actions
class SafeRLAgent:
    def __init__(self, base_agent, safety_filter):
        self.base_agent = base_agent
        self.safety_filter = safety_filter
        self.prev_action = 0.0

    def select_safe_action(self, state):
        # Base agent action
        desired_action = self.base_agent.select_action(state)[0] * 100

        # Apply safety filter
        safe_action = self.safety_filter.project_to_safe(
            desired_action, self.prev_action)

        # CBF constraint
        safe_action = cbf.safe_action(state, safe_action, model)

        self.prev_action = safe_action
        return safe_action

# Simulation
from example1 import RobustSACAgent, RandomizedCSTREnv

base_agent = RobustSACAgent(obs_dim=2, action_dim=1)
safe_agent = SafeRLAgent(base_agent, safety)
env = RandomizedCSTREnv(randomize=False)

print("Running with safety constraints...")
state = env.reset()
unsafe_count = 0

for step in range(200):
    action = safe_agent.select_safe_action(state)
    next_state, reward, done = env.step(action / 100)

    if not safety.is_safe_state(state):
        unsafe_count += 1
        print(f"Step {step}: UNSAFE STATE! T={state[0]:.1f}K, CA={state[1]:.3f}")

    state = next_state

    if done:
        break

print(f"\nUnsafe states encountered: {unsafe_count} / {step+1}")
print(f"Safety rate: {(1 - unsafe_count/(step+1))*100:.1f}%")
3 Conservative Q-Learning (CQL): Conservative Offline Learning

In real plants, exploration is dangerous, so we learn offline from historical data. CQL underestimates Q-values for out-of-distribution actions, learning safe policies.

CQL Objective Function:

\[ \min_Q \alpha \cdot \mathbb{E}_{s \sim \mathcal{D}} \left[ \log \sum_a \exp(Q(s,a)) - \mathbb{E}_{a \sim \mu(a|s)} [Q(s,a)] \right] + \mathcal{L}_{TD}(Q) \]

First term: lower Q-values for out-of-distribution actions, second term: maintain Q-values for in-data actions

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# Conservative Q-Learning (CQL) Implementation
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random

class CQLQNetwork(nn.Module):
    """Q function for CQL"""
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256), nn.ReLU(),
            nn.Linear(256, 256), nn.ReLU(),
            nn.Linear(256, 1)
        )

    def forward(self, state, action):
        x = torch.cat([state, action], dim=-1)
        return self.net(x)

class CQLAgent:
    """Conservative Q-Learning Agent"""
    def __init__(self, state_dim, action_dim, alpha=1.0):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.alpha = alpha  # CQL regularization coefficient

        self.q_net = CQLQNetwork(state_dim, action_dim)
        self.target_q = CQLQNetwork(state_dim, action_dim)
        self.target_q.load_state_dict(self.q_net.state_dict())

        self.policy = nn.Sequential(
            nn.Linear(state_dim, 256), nn.ReLU(),
            nn.Linear(256, action_dim), nn.Tanh()
        )

        self.q_optimizer = optim.Adam(self.q_net.parameters(), lr=3e-4)
        self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=3e-4)

    def select_action(self, state, deterministic=False):
        with torch.no_grad():
            state_t = torch.FloatTensor(state).unsqueeze(0)
            action = self.policy(state_t)
            if not deterministic:
                action += torch.randn_like(action) * 0.1
            return action.squeeze().numpy()

    def train_step(self, batch):
        """CQL update"""
        states, actions, rewards, next_states, dones = batch

        states_t = torch.FloatTensor(states)
        actions_t = torch.FloatTensor(actions)
        rewards_t = torch.FloatTensor(rewards).unsqueeze(1)
        next_states_t = torch.FloatTensor(next_states)
        dones_t = torch.FloatTensor(dones).unsqueeze(1)

        # Q function update
        # 1. Bellman error
        with torch.no_grad():
            next_actions = self.policy(next_states_t)
            target_q = rewards_t + 0.99 * self.target_q(
                next_states_t, next_actions) * (1 - dones_t)

        current_q = self.q_net(states_t, actions_t)
        bellman_loss = nn.MSELoss()(current_q, target_q)

        # 2. CQL regularization term
        # Calculate Q values for random actions
        num_random = 10
        random_actions = torch.FloatTensor(
            np.random.uniform(-1, 1, (states_t.shape[0], num_random, self.action_dim)))

        random_q = []
        for i in range(num_random):
            q = self.q_net(states_t, random_actions[:, i, :])
            random_q.append(q)
        random_q = torch.cat(random_q, dim=1)

        # Q values for policy actions
        policy_actions = self.policy(states_t)
        policy_q = self.q_net(states_t, policy_actions)

        # CQL term: estimate random action Q-values high, in-data actions low
        cql_loss = (torch.logsumexp(random_q, dim=1).mean() -
                    policy_q.mean())

        # Total loss
        q_loss = bellman_loss + self.alpha * cql_loss

        self.q_optimizer.zero_grad()
        q_loss.backward()
        self.q_optimizer.step()

        # 3. Policy update (maximize Q-value)
        policy_loss = -self.q_net(states_t, self.policy(states_t)).mean()

        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()

        # Target update
        for target_param, param in zip(self.target_q.parameters(),
                                        self.q_net.parameters()):
            target_param.data.copy_(0.995 * target_param.data + 0.005 * param.data)

        return q_loss.item(), cql_loss.item(), policy_loss.item()

# Offline dataset generation (past operational data)
def generate_offline_data(n_trajectories=100):
    from example1 import RandomizedCSTREnv

    env = RandomizedCSTREnv(randomize=True)
    dataset = []

    for _ in range(n_trajectories):
        state = env.reset()
        for _ in range(50):
            # Random policy + noise (realistic data)
            action = np.random.uniform(-1, 1, 1)
            next_state, reward, done = env.step(action[0])
            dataset.append((state, action, reward, next_state, done))
            state = next_state
            if done:
                break

    return dataset

# CQL learning
print("Generating offline data...")
offline_data = generate_offline_data(n_trajectories=200)
print(f"Dataset size: {len(offline_data)}")

agent = CQLAgent(state_dim=2, action_dim=1, alpha=1.0)

print("\nCQL training in progress...")
batch_size = 256
for epoch in range(100):
    # Mini-batch sampling
    batch = random.sample(offline_data, min(batch_size, len(offline_data)))
    states, actions, rewards, next_states, dones = zip(*batch)

    batch_tuple = (
        np.array(states),
        np.array(actions),
        np.array(rewards),
        np.array(next_states),
        np.array(dones)
    )

    q_loss, cql_loss, p_loss = agent.train_step(batch_tuple)

    if epoch % 20 == 0:
        print(f"Epoch {epoch}: Q_loss={q_loss:.3f}, CQL_loss={cql_loss:.3f}, "
              f"Policy_loss={p_loss:.3f}")

# Test
print("\nTesting CQL policy...")
from example1 import RandomizedCSTREnv
test_env = RandomizedCSTREnv(randomize=False)

state = test_env.reset()
total_reward = 0

for step in range(100):
    action = agent.select_action(state, deterministic=True)
    next_state, reward, done = test_env.step(action[0])
    total_reward += reward
    state = next_state
    if done:
        break

print(f"Test Reward: {total_reward:.2f}")
4 Human-in-the-Loop: Human Override Mechanism

When AI agents take unexpected actions, operators need to be able to intervene. Implement intervention decision-making, smooth transitions, and history recording.

graph TD A[AI Control] -->|Anomaly Detection| B{Human Decision} B -->|OK| A B -->|Intervene| C[Manual Control] C -->|Stabilization| D{Return Condition} D -->|Satisfied| A D -->|Not Satisfied| C
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

# Human-in-the-Loop System Implementation
import numpy as np
import time
from datetime import datetime
from enum import Enum

class ControlMode(Enum):
    """Control mode"""
    AI_CONTROL = "AI"
    HUMAN_OVERRIDE = "Human"
    TRANSITION = "Transition"

class HumanOverrideSystem:
    """Human intervention system"""
    def __init__(self):
        self.mode = ControlMode.AI_CONTROL
        self.intervention_history = []
        self.confidence_threshold = 0.7

    def check_intervention_needed(self, state, ai_action, confidence):
        """Check if intervention is needed"""
        T, CA = state

        # Trigger conditions
        triggers = {
            'high_temp': T > 380,
            'low_temp': T < 320,
            'low_confidence': confidence < self.confidence_threshold,
            'extreme_action': abs(ai_action) > 0.9,
            'unstable_state': CA < 0.2 or CA > 2.8
        }

        if any(triggers.values()):
            reason = [k for k, v in triggers.items() if v]
            return True, reason
        return False, []

    def request_human_action(self, state, ai_suggestion):
        """Request action from operator (GUI/CLI in practice)"""
        T, CA = state
        print(f"\n{'='*60}")
        print(f"Human Intervention Requested!")
        print(f"Current State: T={T:.2f}K, CA={CA:.3f}mol/L")
        print(f"AI Suggested Action: {ai_suggestion:.3f}")
        print(f"{'='*60}")

        # Simplified: simulate with rule-based
        # In practice, wait for operator input
        if T > 380:
            human_action = -0.8  # Strong cooling
            override = True
        elif T < 320:
            human_action = 0.6   # Heating
            override = True
        else:
            human_action = ai_suggestion
            override = False

        return human_action, override

    def smooth_transition(self, from_action, to_action, alpha=0.3):
        """Smooth control transition"""
        return alpha * to_action + (1 - alpha) * from_action

    def log_intervention(self, timestamp, state, ai_action, human_action, reason):
        """Log intervention history"""
        log_entry = {
            'timestamp': timestamp,
            'state': state.copy(),
            'ai_action': ai_action,
            'human_action': human_action,
            'reason': reason
        }
        self.intervention_history.append(log_entry)

    def generate_report(self):
        """Generate intervention report"""
        if not self.intervention_history:
            return "No interventions recorded."

        report = "\n" + "="*60 + "\n"
        report += "Human Intervention Report\n"
        report += "="*60 + "\n"
        report += f"Total interventions: {len(self.intervention_history)}\n\n"

        for i, entry in enumerate(self.intervention_history):
            report += f"Intervention {i+1}:\n"
            report += f"  Time: {entry['timestamp']}\n"
            report += f"  State: T={entry['state'][0]:.2f}K, CA={entry['state'][1]:.3f}\n"
            report += f"  AI action: {entry['ai_action']:.3f}\n"
            report += f"  Human action: {entry['human_action']:.3f}\n"
            report += f"  Reason: {', '.join(entry['reason'])}\n\n"

        return report

class HITLController:
    """Human-in-the-Loop control system"""
    def __init__(self, ai_agent, override_system):
        self.ai_agent = ai_agent
        self.override_system = override_system
        self.prev_action = 0.0

    def select_action(self, state, confidence=1.0):
        """Action selection considering human intervention"""
        # AI suggestion
        ai_action = self.ai_agent.select_action(state)[0]

        # Intervention decision
        need_intervention, reasons = self.override_system.check_intervention_needed(
            state, ai_action, confidence)

        if need_intervention:
            # Query human
            human_action, overridden = self.override_system.request_human_action(
                state, ai_action)

            if overridden:
                # Record intervention
                self.override_system.log_intervention(
                    datetime.now(), state, ai_action, human_action, reasons)
                self.override_system.mode = ControlMode.HUMAN_OVERRIDE
                final_action = human_action
            else:
                final_action = ai_action
        else:
            final_action = ai_action
            self.override_system.mode = ControlMode.AI_CONTROL

        # Smooth transition
        final_action = self.override_system.smooth_transition(
            self.prev_action, final_action)

        self.prev_action = final_action
        return final_action

# Usage example
from example1 import RobustSACAgent, RandomizedCSTREnv

print("Launching Human-in-the-Loop system...")

ai_agent = RobustSACAgent(obs_dim=2, action_dim=1)
override_system = HumanOverrideSystem()
controller = HITLController(ai_agent, override_system)

env = RandomizedCSTREnv(randomize=True)
state = env.reset()

# Test under severe conditions
for step in range(50):
    # Simulate confidence (high uncertainty situation)
    confidence = np.random.uniform(0.5, 1.0)

    action = controller.select_action(state, confidence)
    next_state, reward, done = env.step(action)

    print(f"Step {step}: Mode={override_system.mode.value}, "
          f"T={state[0]:.1f}K, Action={action:.3f}")

    state = next_state

    if done:
        print("Episode ended (abnormal state)")
        break

# Generate report
print(override_system.generate_report())
5 Uncertainty Quantification: Bayesian NN and Ensemble Methods

By quantifying how confident AI predictions are, take conservative actions in situations with high uncertainty. Estimate confidence intervals using Bayesian neural networks or ensemble methods.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# Uncertainty Quantification: Ensemble and Bayesian Approximation
import torch
import torch.nn as nn
import numpy as np

class EnsembleQNetwork:
    """Ensemble of Q functions"""
    def __init__(self, state_dim, action_dim, n_models=5):
        self.n_models = n_models
        self.models = [
            nn.Sequential(
                nn.Linear(state_dim + action_dim, 128), nn.ReLU(),
                nn.Linear(128, 128), nn.ReLU(),
                nn.Linear(128, 1)
            ) for _ in range(n_models)
        ]
        self.optimizers = [
            torch.optim.Adam(m.parameters(), lr=1e-3) for m in self.models
        ]

    def predict_with_uncertainty(self, state, action):
        """Return prediction mean and uncertainty (standard deviation)"""
        state_t = torch.FloatTensor(state).unsqueeze(0)
        action_t = torch.FloatTensor(action).unsqueeze(0)
        x = torch.cat([state_t, action_t], dim=-1)

        predictions = []
        for model in self.models:
            with torch.no_grad():
                pred = model(x).item()
            predictions.append(pred)

        mean = np.mean(predictions)
        std = np.std(predictions)

        return mean, std

    def train_step(self, batch):
        """Train all models with different bootstrap samples"""
        states, actions, targets = batch

        losses = []
        for i, (model, opt) in enumerate(zip(self.models, self.optimizers)):
            # Bootstrap sampling
            indices = np.random.choice(len(states), len(states), replace=True)
            s = torch.FloatTensor(states[indices])
            a = torch.FloatTensor(actions[indices])
            t = torch.FloatTensor(targets[indices])

            x = torch.cat([s, a], dim=-1)
            pred = model(x).squeeze()

            loss = nn.MSELoss()(pred, t)
            opt.zero_grad()
            loss.backward()
            opt.step()

            losses.append(loss.item())

        return np.mean(losses)

class MCDropoutQNetwork(nn.Module):
    """Monte Carlo Dropout (Bayesian approximation)"""
    def __init__(self, state_dim, action_dim, dropout_rate=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim + action_dim, 128),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(128, 1)
        )

    def forward(self, state, action):
        x = torch.cat([state, action], dim=-1)
        return self.net(x)

    def predict_with_uncertainty(self, state, action, n_samples=20):
        """Uncertainty estimation with MC Dropout"""
        self.train()  # Enable Dropout

        state_t = torch.FloatTensor(state).unsqueeze(0)
        action_t = torch.FloatTensor(action).unsqueeze(0)

        predictions = []
        for _ in range(n_samples):
            pred = self.forward(state_t, action_t).item()
            predictions.append(pred)

        mean = np.mean(predictions)
        std = np.std(predictions)

        return mean, std

class UncertaintyAwareAgent:
    """Agent considering uncertainty"""
    def __init__(self, state_dim, action_dim, method='ensemble'):
        self.method = method
        if method == 'ensemble':
            self.q_network = EnsembleQNetwork(state_dim, action_dim, n_models=5)
        else:
            self.q_network = MCDropoutQNetwork(state_dim, action_dim)

        self.policy = nn.Sequential(
            nn.Linear(state_dim, 128), nn.ReLU(),
            nn.Linear(128, action_dim), nn.Tanh()
        )
        self.uncertainty_threshold = 0.5  # Uncertainty threshold

    def select_action(self, state):
        """Action selection considering uncertainty"""
        with torch.no_grad():
            nominal_action = self.policy(torch.FloatTensor(state)).numpy()

        # Evaluate uncertainty of multiple candidates
        action_candidates = [
            nominal_action,
            nominal_action * 0.5,  # Conservative
            np.zeros_like(nominal_action)  # Maintain current
        ]

        best_action = nominal_action
        min_uncertainty = float('inf')

        for action in action_candidates:
            if self.method == 'ensemble':
                q_mean, q_std = self.q_network.predict_with_uncertainty(state, action)
            else:
                q_mean, q_std = self.q_network.predict_with_uncertainty(state, action)

            # Select action with low uncertainty and high Q-value
            if q_std < self.uncertainty_threshold and q_std < min_uncertainty:
                best_action = action
                min_uncertainty = q_std

        return best_action, min_uncertainty

# Demonstration
print("Uncertainty Quantification Demo\n")

# Ensemble method
ensemble_agent = UncertaintyAwareAgent(state_dim=2, action_dim=1, method='ensemble')

test_states = [
    np.array([350.0, 1.5]),  # Normal
    np.array([385.0, 0.8]),  # High temperature
    np.array([310.0, 2.5])   # Low temperature
]

print("Ensemble Method:")
for i, state in enumerate(test_states):
    action, uncertainty = ensemble_agent.select_action(state)
    print(f"State {i+1}: T={state[0]}K, CA={state[1]}")
    print(f"  Action: {action[0]:.3f}, Uncertainty: {uncertainty:.3f}")

    if uncertainty > ensemble_agent.uncertainty_threshold:
        print(f"  WARNING: High uncertainty! Consider human oversight.")
    print()

# MC Dropout method
mc_agent = UncertaintyAwareAgent(state_dim=2, action_dim=1, method='mcdropout')

print("\nMC Dropout Method:")
for i, state in enumerate(test_states):
    action, uncertainty = mc_agent.select_action(state)
    print(f"State {i+1}: T={state[0]}K, CA={state[1]}")
    print(f"  Action: {action[0]:.3f}, Uncertainty: {uncertainty:.3f}")

    if uncertainty > mc_agent.uncertainty_threshold:
        print(f"  WARNING: High uncertainty!")
    print()
6 Performance Monitoring and Drift Detection

Continuously monitor performance after deployment to detect plant aging and model drift. Implement KPI monitoring, statistical testing, and anomaly detection.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - scipy>=1.11.0

# Performance Monitoring and Drift Detection System
import numpy as np
from collections import deque
from scipy import stats

class PerformanceMonitor:
    """Performance monitoring system"""
    def __init__(self, window_size=100):
        self.window_size = window_size

        # KPI history
        self.rewards = deque(maxlen=window_size)
        self.temperatures = deque(maxlen=window_size)
        self.concentrations = deque(maxlen=window_size)
        self.actions = deque(maxlen=window_size)

        # Baseline statistics
        self.baseline_reward_mean = None
        self.baseline_reward_std = None

        # Anomaly counter
        self.anomaly_count = 0
        self.total_steps = 0

    def update(self, state, action, reward):
        """Update KPI"""
        T, CA = state
        self.rewards.append(reward)
        self.temperatures.append(T)
        self.concentrations.append(CA)
        self.actions.append(action)
        self.total_steps += 1

    def set_baseline(self):
        """Set baseline performance"""
        if len(self.rewards) >= self.window_size:
            self.baseline_reward_mean = np.mean(self.rewards)
            self.baseline_reward_std = np.std(self.rewards)
            print(f"Baseline set: μ={self.baseline_reward_mean:.2f}, "
                  f"σ={self.baseline_reward_std:.2f}")

    def detect_drift(self, alpha=0.05):
        """Statistical drift detection (t-test)"""
        if self.baseline_reward_mean is None or len(self.rewards) < 50:
            return False, None

        current_mean = np.mean(list(self.rewards)[-50:])

        # t-test
        t_stat, p_value = stats.ttest_1samp(
            list(self.rewards)[-50:],
            self.baseline_reward_mean
        )

        drift_detected = p_value < alpha and current_mean < self.baseline_reward_mean

        return drift_detected, {
            't_stat': t_stat,
            'p_value': p_value,
            'current_mean': current_mean,
            'baseline_mean': self.baseline_reward_mean
        }

    def detect_anomaly(self, state, action):
        """Anomaly detection (3-sigma rule)"""
        if len(self.rewards) < 30:
            return False

        T, CA = state

        # Temperature anomaly
        temp_mean = np.mean(self.temperatures)
        temp_std = np.std(self.temperatures)
        temp_anomaly = abs(T - temp_mean) > 3 * temp_std

        # Concentration anomaly
        conc_mean = np.mean(self.concentrations)
        conc_std = np.std(self.concentrations)
        conc_anomaly = abs(CA - conc_mean) > 3 * conc_std

        # Action anomaly
        action_mean = np.mean(self.actions)
        action_std = np.std(self.actions)
        action_anomaly = abs(action - action_mean) > 3 * action_std

        is_anomaly = temp_anomaly or conc_anomaly or action_anomaly

        if is_anomaly:
            self.anomaly_count += 1

        return is_anomaly

    def generate_report(self):
        """Generate monitoring report"""
        if len(self.rewards) == 0:
            return "No data collected."

        report = "\n" + "="*60 + "\n"
        report += "Performance Monitoring Report\n"
        report += "="*60 + "\n\n"

        report += f"Total steps: {self.total_steps}\n"
        report += f"Anomalies detected: {self.anomaly_count} "
        report += f"({self.anomaly_count/self.total_steps*100:.2f}%)\n\n"

        report += "KPI Statistics (last {} steps):\n".format(len(self.rewards))
        report += f"  Reward: μ={np.mean(self.rewards):.2f}, "
        report += f"σ={np.std(self.rewards):.2f}\n"
        report += f"  Temperature: μ={np.mean(self.temperatures):.2f}K, "
        report += f"σ={np.std(self.temperatures):.2f}K\n"
        report += f"  Concentration: μ={np.mean(self.concentrations):.3f}, "
        report += f"σ={np.std(self.concentrations):.3f}\n"
        report += f"  Action: μ={np.mean(self.actions):.3f}, "
        report += f"σ={np.std(self.actions):.3f}\n\n"

        # Drift detection
        drift, drift_info = self.detect_drift()
        if drift:
            report += "WARNING: Performance drift detected!\n"
            report += f"  Current mean reward: {drift_info['current_mean']:.2f}\n"
            report += f"  Baseline mean reward: {drift_info['baseline_mean']:.2f}\n"
            report += f"  p-value: {drift_info['p_value']:.4f}\n"
        else:
            report += "No significant drift detected.\n"

        report += "="*60 + "\n"
        return report

class DriftDetector:
    """More advanced drift detection (ADWIN)"""
    def __init__(self, delta=0.002):
        self.delta = delta
        self.window = deque()
        self.drift_detected = False

    def add_element(self, value):
        """Add data point and check for drift"""
        self.window.append(value)

        # Simplified ADWIN: split window in half and compare means
        if len(self.window) > 50:
            mid = len(self.window) // 2
            window1 = list(self.window)[:mid]
            window2 = list(self.window)[mid:]

            # Welch's t-test (does not assume equal variance)
            t_stat, p_value = stats.ttest_ind(window1, window2, equal_var=False)

            if p_value < self.delta:
                self.drift_detected = True
                self.window.clear()  # Reset after drift detection
                return True

        return False

# Demonstration
from example1 import RandomizedCSTREnv, RobustSACAgent

print("Launching performance monitoring system...\n")

env = RandomizedCSTREnv(randomize=False)
agent = RobustSACAgent(obs_dim=2, action_dim=1)
monitor = PerformanceMonitor(window_size=100)
drift_detector = DriftDetector()

# Phase 1: Normal operation (baseline setting)
print("Phase 1: Setting baseline...")
state = env.reset()

for step in range(100):
    action = agent.select_action(state)
    next_state, reward, done = env.step(action[0])

    monitor.update(state, action[0], reward)

    state = next_state if not done else env.reset()

monitor.set_baseline()

# Phase 2: Degradation simulation
print("\nPhase 2: Simulating plant degradation...")

for step in range(200):
    action = agent.select_action(state)

    # Simulate performance degradation over time
    degradation = step / 200 * 0.3
    next_state, reward, done = env.step(action[0])
    reward -= degradation * 10  # Performance decline

    monitor.update(state, action[0], reward)

    # Anomaly detection
    if monitor.detect_anomaly(state, action[0]):
        print(f"Step {step+100}: Anomaly detected!")

    # Drift detection
    if drift_detector.add_element(reward):
        print(f"Step {step+100}: DRIFT DETECTED by ADWIN!")

    state = next_state if not done else env.reset()

    # Periodic drift check
    if step % 50 == 0:
        drift, info = monitor.detect_drift()
        if drift:
            print(f"\nStep {step+100}: Statistical drift detected!")
            print(f"  Current: {info['current_mean']:.2f}, "
                  f"Baseline: {info['baseline_mean']:.2f}")

# Final report
print(monitor.generate_report())
7 Integrated Deployment Framework

A practical deployment system integrating all previous elements (domain randomization, safety constraints, human override, uncertainty quantification, performance monitoring).

graph TD A[Sensor Data] --> B[Preprocessing] B --> C[Uncertainty Estimation] C --> D{High Uncertainty?} D -->|Yes| E[Conservative Action] D -->|No| F[AI Policy] F --> G[Safety Filter] E --> G G --> H{Safe?} H -->|No| I[Human Override] H -->|Yes| J[Actuator] I --> J J --> K[Performance Monitoring] K --> L{Drift?} L -->|Yes| M[Alert + Retraining] L -->|No| A
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

# Integrated Deployment Framework
import numpy as np
import torch
from datetime import datetime

class IntegratedDeploymentSystem:
    """Fully integrated deployment system"""
    def __init__(self, agent, env):
        # Components
        self.agent = agent
        self.env = env

        # Example 2: Safety constraints
        from example2 import SafetyConstraints, ControlBarrierFunction, SimpleCSTRModel
        self.safety = SafetyConstraints()
        self.cbf = ControlBarrierFunction(self.safety)
        self.model = SimpleCSTRModel()

        # Example 4: Human override
        from example4 import HumanOverrideSystem, ControlMode
        self.override_system = HumanOverrideSystem()

        # Example 5: Uncertainty quantification
        from example5 import EnsembleQNetwork
        self.uncertainty_estimator = EnsembleQNetwork(
            state_dim=2, action_dim=1, n_models=5)

        # Example 6: Performance monitoring
        from example6 import PerformanceMonitor, DriftDetector
        self.monitor = PerformanceMonitor(window_size=100)
        self.drift_detector = DriftDetector()

        # System state
        self.prev_action = 0.0
        self.running = True
        self.emergency_stop = False

    def preprocess_observation(self, raw_obs):
        """Preprocess sensor data"""
        # Outlier removal (simple)
        T, CA = raw_obs
        T = np.clip(T, 250, 450)
        CA = np.clip(CA, 0, 5)
        return np.array([T, CA])

    def estimate_uncertainty(self, state, action):
        """Estimate uncertainty"""
        q_mean, q_std = self.uncertainty_estimator.predict_with_uncertainty(
            state, action)
        return q_std

    def apply_safety_filter(self, state, action):
        """Apply safety filter"""
        # Constraint projection
        safe_action = self.safety.project_to_safe(action, self.prev_action)

        # CBF constraint
        safe_action = self.cbf.safe_action(state, safe_action, self.model)

        return safe_action

    def check_human_override(self, state, action, uncertainty):
        """Check for human intervention"""
        need_intervention, reasons = self.override_system.check_intervention_needed(
            state, action, confidence=1.0 - uncertainty)

        if need_intervention:
            human_action, overridden = self.override_system.request_human_action(
                state, action)

            if overridden:
                self.override_system.log_intervention(
                    datetime.now(), state, action, human_action, reasons)
                return human_action, True

        return action, False

    def monitor_performance(self, state, action, reward):
        """Performance monitoring and drift detection"""
        self.monitor.update(state, action, reward)

        # Anomaly detection
        if self.monitor.detect_anomaly(state, action):
            print(f"  [MONITOR] Anomaly detected at step {self.monitor.total_steps}")

        # Drift detection
        if self.drift_detector.add_element(reward):
            print(f"  [MONITOR] Performance drift detected!")
            return True  # Retraining trigger

        # Periodic statistical drift check
        if self.monitor.total_steps % 100 == 0:
            drift, info = self.monitor.detect_drift()
            if drift:
                print(f"  [MONITOR] Statistical drift: "
                      f"current={info['current_mean']:.2f}, "
                      f"baseline={info['baseline_mean']:.2f}")
                return True

        return False

    def control_loop(self, n_steps=500):
        """Main control loop"""
        print("Integrated Deployment System Launch\n")
        print("="*60)

        state = self.env.reset()
        state = self.preprocess_observation(state)

        # Baseline setting
        for step in range(100):
            action = self.agent.select_action(state)[0]
            next_state, reward, done = self.env.step(action)
            next_state = self.preprocess_observation(next_state)

            self.monitor.update(state, action, reward)
            state = next_state if not done else self.env.reset()

        self.monitor.set_baseline()
        print("Baseline setting complete\n")

        # Main loop
        for step in range(n_steps):
            if self.emergency_stop:
                print("Emergency stop!")
                break

            # 1. AI policy
            raw_action = self.agent.select_action(state)[0]

            # 2. Uncertainty estimation
            uncertainty = self.estimate_uncertainty(state, np.array([raw_action]))

            # Conservative when high uncertainty
            if uncertainty > 0.5:
                raw_action *= 0.5
                print(f"  [UNCERTAINTY] High uncertainty ({uncertainty:.3f}), "
                      f"conservative action")

            # 3. Safety filter
            safe_action = self.apply_safety_filter(state, raw_action)

            # 4. Human override
            final_action, overridden = self.check_human_override(
                state, safe_action, uncertainty)

            # 5. Execute
            next_state, reward, done = self.env.step(final_action)
            next_state = self.preprocess_observation(next_state)

            # 6. Performance monitoring
            need_retraining = self.monitor_performance(state, final_action, reward)

            if need_retraining:
                print(f"  [SYSTEM] Retraining recommended")

            # Periodic reporting
            if step % 100 == 0:
                print(f"\nStep {step}:")
                print(f"  State: T={state[0]:.2f}K, CA={state[1]:.3f}")
                print(f"  Action: {final_action:.3f}, Uncertainty: {uncertainty:.3f}")
                print(f"  Mode: {self.override_system.mode.value}")
                print(f"  Anomalies: {self.monitor.anomaly_count}")

            self.prev_action = final_action
            state = next_state if not done else self.env.reset()

        # Final report
        print("\n" + "="*60)
        print("Operation ended")
        print("="*60)
        print(self.monitor.generate_report())
        print(self.override_system.generate_report())

# Execution
from example1 import RobustSACAgent, RandomizedCSTREnv

print("Integrated Deployment System Demonstration\n")

env = RandomizedCSTREnv(randomize=True)
agent = RobustSACAgent(obs_dim=2, action_dim=1)

system = IntegratedDeploymentSystem(agent, env)
system.control_loop(n_steps=300)

Chapter 5 Summary

What We Learned

  • Sim-to-Real Transfer: Achieving robustness through domain randomization
  • Safe Exploration: Danger avoidance with safety filters and CBF
  • Conservative Q-Learning: Safe learning from offline data
  • Human Override: Emergency intervention mechanism
  • Uncertainty Quantification: Confidence assessment with ensemble or MC Dropout
  • Performance Monitoring: Drift detection and anomaly detection
  • Integrated Framework: Practical system combining all elements

Deployment Maturity Model

Level Description Required Technology
L1: Laboratory Simulation only Basic RL
L2: Testing Pilot plant Sim-to-real, safety constraints
L3: Supervised Real plant (human supervision) Override, performance monitoring
L4: Autonomous Fully autonomous operation Full integration, continual learning

Real Plant Deployment Checklist

  • ✓ Sufficient simulation validation (1000+ episodes)
  • ✓ Validity confirmation of parameter randomization
  • ✓ Comprehensive definition of safety constraints
  • ✓ Implementation and testing of emergency stop protocol
  • ✓ Operator training and manual preparation
  • ✓ Construction of performance monitoring dashboard
  • ✓ Periodic performance evaluation and retraining plan
  • ✓ Reporting structure to regulatory authorities

Final Cautionary Note

Real plant deployment of reinforcement learning is still an emerging technology. Especially in safety-critical environments like chemical plants, a phased approach is essential:

  1. Thorough validation in simulation
  2. Demonstration in pilot plant
  3. Limited operation under human supervision
  4. Gradual increase in autonomy level

It is important to always collaborate with human experts and use AI as a tool.

Series Complete

Congratulations!

You have completed all 5 chapters of the "Autonomous Process Operation with AI Agents" series. You have acquired a wide range of knowledge from reinforcement learning basics to real plant deployment.

Return to Series Contents

For further learning, please also use the following series:

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer