Chapter 5: Real Plant Deployment and Safety
This chapter covers Real Plant Deployment and Safety. You will learn essential concepts and techniques.
Chapter 5 Overview
Training reinforcement learning agents in simulation environments alone is not sufficient. When deploying to real plants, there are many challenges including ensuring safety, overcoming the gap between simulation and reality, and dealing with uncertainty.
This chapter explains seven key technologies for real plant deployment with implementation examples. Chemical plants handle high temperatures, high pressures, and hazardous materials, requiring particularly careful approaches to autonomous control by AI.
Important Safety Notice
The technologies covered in this chapter assume deployment to real plants. Actual deployment requires process safety expertise, regulatory compliance, and thorough verification and testing. This code is for educational purposes, and appropriate safety assessment is essential for use in real plants.
What You Will Learn in This Chapter
- Sim-to-Real Transfer: Robustness improvement through domain randomization
- Safe Exploration: Avoiding dangerous regions with action constraints
- Conservative Q-Learning (CQL): Offline learning to prevent overestimation
- Human Override: Emergency human intervention mechanism
- Uncertainty Quantification: Confidence interval estimation using Bayesian NNs and ensembles
- Performance Monitoring and Drift Detection: Continuous monitoring
- Integrated Deployment Framework: Implementation combining all elements
Real Plant Deployment Challenges
Sim-to-Real Gap
Disturbance
Sensor Noise| D[Performance Degradation] B -.->|Sim-to-Real Gap| D E[Domain Randomization] --> F[Robust Policy] F --> C
Safety Hierarchy
| Layer | Function | Implementation |
|---|---|---|
| 1. Action Constraints | Prohibition of dangerous actions | Hard limits, safety filters |
| 2. Uncertainty Consideration | Confidence interval assessment | Bayesian NN, ensembles |
| 3. Performance Monitoring | Anomaly detection | Drift detection, KPI monitoring |
| 4. Human Intervention | Emergency stop | Override mechanism |
By learning with randomized parameters in simulation environments, acquire policies that are robust to uncertainties in real environments.
Domain Randomization:
\[ \theta \sim p(\Theta), \quad \pi^* = \arg\max_\pi \mathbb{E}_{\theta \sim p(\Theta)} [J(\pi; \theta)] \]Where \(\theta\) is environment parameter, \(p(\Theta)\) is parameter distribution
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# Domain Randomization Implementation
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
class RandomizedCSTREnv:
"""CSTR environment with randomized parameters"""
def __init__(self, randomize=True):
self.randomize = randomize
self.reset()
def _sample_parameters(self):
"""Random sampling of physical parameters"""
if self.randomize:
# Activation energy (±20% variation)
self.Ea = np.random.uniform(40000, 60000)
# Reaction enthalpy (±15% variation)
self.dHr = np.random.uniform(-57500, -42500)
# Heat transfer coefficient (±25% variation)
self.U = np.random.uniform(300, 500)
# Volume (manufacturing variation ±5%)
self.V = np.random.uniform(950, 1050)
# Sensor noise standard deviation
self.temp_noise = np.random.uniform(0.1, 1.0)
self.conc_noise = np.random.uniform(0.01, 0.05)
# Control delay (communication delay + valve response)
self.control_delay = np.random.randint(1, 4)
else:
# Nominal values
self.Ea = 50000
self.dHr = -50000
self.U = 400
self.V = 1000
self.temp_noise = 0.5
self.conc_noise = 0.02
self.control_delay = 2
def reset(self):
self._sample_parameters()
self.state = np.array([350.0, 2.0]) # [temperature, concentration]
self.action_buffer = [0.5] * self.control_delay
return self._get_observation()
def _get_observation(self):
"""Noisy observation"""
T, CA = self.state
T_obs = T + np.random.normal(0, self.temp_noise)
CA_obs = CA + np.random.normal(0, self.conc_noise)
return np.array([T_obs, CA_obs])
def step(self, action):
"""Step with control delay"""
# Apply delayed action
self.action_buffer.append(action)
actual_action = self.action_buffer.pop(0)
T, CA = self.state
# Reaction rate (randomized parameters)
R = 8.314
k = 1e10 * np.exp(-self.Ea / (R * T))
# CSTR dynamics
dt = 0.1
F = 100 # Flow rate [L/min]
CA_in = 2.5
Tin = 350
rho = 1000
Cp = 4.18
# Cooling amount (action)
Q_cool = actual_action * 10000 # 0-10000 W
# Material balance
dCA = (F / self.V) * (CA_in - CA) - k * CA
CA_new = CA + dCA * dt
# Energy balance
Q_rxn = -self.dHr * k * CA * self.V
Q_jacket = self.U * 10 * (T - Tin) + Q_cool
dT = (Q_rxn - Q_jacket) / (self.V * rho * Cp)
T_new = T + dT * dt
self.state = np.array([T_new, max(0, CA_new)])
# Reward
temp_penalty = -abs(T_new - 350) ** 2 * 0.01
production = k * CA * 10
reward = temp_penalty + production
done = T_new > 400 or T_new < 300 # Outside safe range
return self._get_observation(), reward, done
# Robust SAC learning
class RobustSACAgent:
"""SAC agent using domain randomization"""
def __init__(self, obs_dim, action_dim):
self.actor = nn.Sequential(
nn.Linear(obs_dim, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
nn.Linear(128, action_dim), nn.Tanh()
)
self.optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)
def select_action(self, obs):
with torch.no_grad():
return self.actor(torch.FloatTensor(obs)).numpy()
# Training: train in randomized environment
train_env = RandomizedCSTREnv(randomize=True)
agent = RobustSACAgent(obs_dim=2, action_dim=1)
print("Training in randomized environment...")
for episode in range(500):
obs = train_env.reset()
episode_reward = 0
for step in range(100):
action = agent.select_action(obs)
next_obs, reward, done = train_env.step(action[0])
episode_reward += reward
obs = next_obs
if done:
break
if episode % 100 == 0:
print(f"Episode {episode}, Reward: {episode_reward:.2f}")
print(f" Env params: Ea={train_env.Ea:.0f}, V={train_env.V:.0f}, "
f"delay={train_env.control_delay}")
# Evaluation: test in nominal environment (assuming real plant)
test_env = RandomizedCSTREnv(randomize=False)
print("\nTesting in nominal environment...")
obs = test_env.reset()
test_reward = 0
temps = []
for step in range(100):
action = agent.select_action(obs)
obs, reward, done = test_env.step(action[0])
test_reward += reward
temps.append(obs[0])
if done:
break
print(f"Test Reward: {test_reward:.2f}")
print(f"Temp Mean: {np.mean(temps):.2f}K, Std: {np.std(temps):.2f}K")
Impose physical and safety constraints to prevent reinforcement learning agents from taking dangerous actions. Implement Control Barrier Functions (CBF) and safety layers.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# Safe Exploration: Safety Filter Implementation
import numpy as np
import torch
import torch.nn as nn
class SafetyConstraints:
"""Safety constraints for CSTR"""
def __init__(self):
# Temperature constraints
self.T_min = 310.0 # [K]
self.T_max = 390.0 # [K]
self.T_target = 350.0
# Concentration constraints
self.CA_min = 0.1 # [mol/L]
self.CA_max = 3.0
# Control input constraints
self.u_min = -100.0 # Maximum cooling [kW]
self.u_max = 50.0 # Maximum heating [kW]
# Rate of change constraints
self.du_max = 20.0 # [kW/step]
def is_safe_state(self, state):
"""Check if state is safe"""
T, CA = state
return (self.T_min <= T <= self.T_max and
self.CA_min <= CA <= self.CA_max)
def is_safe_action(self, state, action, prev_action):
"""Check if action is safe"""
# Control input range
if not (self.u_min <= action <= self.u_max):
return False
# Rate of change constraint
if abs(action - prev_action) > self.du_max:
return False
return True
def project_to_safe(self, action, prev_action):
"""Project action to safe region"""
# Range constraint
action = np.clip(action, self.u_min, self.u_max)
# Rate of change constraint
delta = action - prev_action
if abs(delta) > self.du_max:
action = prev_action + np.sign(delta) * self.du_max
return action
class ControlBarrierFunction:
"""Safety assurance with Control Barrier Function (CBF)"""
def __init__(self, safety_constraints):
self.constraints = safety_constraints
self.alpha = 0.5 # Class K function gain
def barrier_function(self, state):
"""Barrier function h(x) >= 0 is safe region"""
T, CA = state
# Temperature barrier (distance function)
h_T_min = T - self.constraints.T_min
h_T_max = self.constraints.T_max - T
# Concentration barrier
h_CA_min = CA - self.constraints.CA_min
h_CA_max = self.constraints.CA_max - CA
# Minimum value (strictest constraint)
return min(h_T_min, h_T_max, h_CA_min, h_CA_max)
def safe_action(self, state, desired_action, env_model):
"""Calculate safe action satisfying CBF constraint"""
h = self.barrier_function(state)
# Do nothing if in safe region
if h > 10.0:
return desired_action
# Impose constraint near boundary
# Simple implementation: check barrier condition for predicted next state
next_state_pred = env_model.predict(state, desired_action)
h_next = self.barrier_function(next_state_pred)
# CBF condition: h_next >= -alpha * h
if h_next >= -self.alpha * h:
return desired_action
else:
# Modify to safe side (conservative action)
T, CA = state
if T > self.constraints.T_target:
# Enhance cooling
return max(desired_action, 0)
else:
# Suppress heating
return min(desired_action, 0)
class SimpleCSTRModel:
"""Simple prediction model for CSTR"""
def predict(self, state, action, dt=0.1):
T, CA = state
k = 1e10 * np.exp(-50000 / (8.314 * T))
# Simple dynamics
dCA = -k * CA * dt
dT = (action * 1000 - 400 * (T - 350)) / 4180 * dt
return np.array([T + dT, CA + dCA])
# Usage example
safety = SafetyConstraints()
cbf = ControlBarrierFunction(safety)
model = SimpleCSTRModel()
# Apply safety filter to RL agent actions
class SafeRLAgent:
def __init__(self, base_agent, safety_filter):
self.base_agent = base_agent
self.safety_filter = safety_filter
self.prev_action = 0.0
def select_safe_action(self, state):
# Base agent action
desired_action = self.base_agent.select_action(state)[0] * 100
# Apply safety filter
safe_action = self.safety_filter.project_to_safe(
desired_action, self.prev_action)
# CBF constraint
safe_action = cbf.safe_action(state, safe_action, model)
self.prev_action = safe_action
return safe_action
# Simulation
from example1 import RobustSACAgent, RandomizedCSTREnv
base_agent = RobustSACAgent(obs_dim=2, action_dim=1)
safe_agent = SafeRLAgent(base_agent, safety)
env = RandomizedCSTREnv(randomize=False)
print("Running with safety constraints...")
state = env.reset()
unsafe_count = 0
for step in range(200):
action = safe_agent.select_safe_action(state)
next_state, reward, done = env.step(action / 100)
if not safety.is_safe_state(state):
unsafe_count += 1
print(f"Step {step}: UNSAFE STATE! T={state[0]:.1f}K, CA={state[1]:.3f}")
state = next_state
if done:
break
print(f"\nUnsafe states encountered: {unsafe_count} / {step+1}")
print(f"Safety rate: {(1 - unsafe_count/(step+1))*100:.1f}%")
In real plants, exploration is dangerous, so we learn offline from historical data. CQL underestimates Q-values for out-of-distribution actions, learning safe policies.
CQL Objective Function:
\[ \min_Q \alpha \cdot \mathbb{E}_{s \sim \mathcal{D}} \left[ \log \sum_a \exp(Q(s,a)) - \mathbb{E}_{a \sim \mu(a|s)} [Q(s,a)] \right] + \mathcal{L}_{TD}(Q) \]First term: lower Q-values for out-of-distribution actions, second term: maintain Q-values for in-data actions
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# Conservative Q-Learning (CQL) Implementation
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
class CQLQNetwork(nn.Module):
"""Q function for CQL"""
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, 256), nn.ReLU(),
nn.Linear(256, 256), nn.ReLU(),
nn.Linear(256, 1)
)
def forward(self, state, action):
x = torch.cat([state, action], dim=-1)
return self.net(x)
class CQLAgent:
"""Conservative Q-Learning Agent"""
def __init__(self, state_dim, action_dim, alpha=1.0):
self.state_dim = state_dim
self.action_dim = action_dim
self.alpha = alpha # CQL regularization coefficient
self.q_net = CQLQNetwork(state_dim, action_dim)
self.target_q = CQLQNetwork(state_dim, action_dim)
self.target_q.load_state_dict(self.q_net.state_dict())
self.policy = nn.Sequential(
nn.Linear(state_dim, 256), nn.ReLU(),
nn.Linear(256, action_dim), nn.Tanh()
)
self.q_optimizer = optim.Adam(self.q_net.parameters(), lr=3e-4)
self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=3e-4)
def select_action(self, state, deterministic=False):
with torch.no_grad():
state_t = torch.FloatTensor(state).unsqueeze(0)
action = self.policy(state_t)
if not deterministic:
action += torch.randn_like(action) * 0.1
return action.squeeze().numpy()
def train_step(self, batch):
"""CQL update"""
states, actions, rewards, next_states, dones = batch
states_t = torch.FloatTensor(states)
actions_t = torch.FloatTensor(actions)
rewards_t = torch.FloatTensor(rewards).unsqueeze(1)
next_states_t = torch.FloatTensor(next_states)
dones_t = torch.FloatTensor(dones).unsqueeze(1)
# Q function update
# 1. Bellman error
with torch.no_grad():
next_actions = self.policy(next_states_t)
target_q = rewards_t + 0.99 * self.target_q(
next_states_t, next_actions) * (1 - dones_t)
current_q = self.q_net(states_t, actions_t)
bellman_loss = nn.MSELoss()(current_q, target_q)
# 2. CQL regularization term
# Calculate Q values for random actions
num_random = 10
random_actions = torch.FloatTensor(
np.random.uniform(-1, 1, (states_t.shape[0], num_random, self.action_dim)))
random_q = []
for i in range(num_random):
q = self.q_net(states_t, random_actions[:, i, :])
random_q.append(q)
random_q = torch.cat(random_q, dim=1)
# Q values for policy actions
policy_actions = self.policy(states_t)
policy_q = self.q_net(states_t, policy_actions)
# CQL term: estimate random action Q-values high, in-data actions low
cql_loss = (torch.logsumexp(random_q, dim=1).mean() -
policy_q.mean())
# Total loss
q_loss = bellman_loss + self.alpha * cql_loss
self.q_optimizer.zero_grad()
q_loss.backward()
self.q_optimizer.step()
# 3. Policy update (maximize Q-value)
policy_loss = -self.q_net(states_t, self.policy(states_t)).mean()
self.policy_optimizer.zero_grad()
policy_loss.backward()
self.policy_optimizer.step()
# Target update
for target_param, param in zip(self.target_q.parameters(),
self.q_net.parameters()):
target_param.data.copy_(0.995 * target_param.data + 0.005 * param.data)
return q_loss.item(), cql_loss.item(), policy_loss.item()
# Offline dataset generation (past operational data)
def generate_offline_data(n_trajectories=100):
from example1 import RandomizedCSTREnv
env = RandomizedCSTREnv(randomize=True)
dataset = []
for _ in range(n_trajectories):
state = env.reset()
for _ in range(50):
# Random policy + noise (realistic data)
action = np.random.uniform(-1, 1, 1)
next_state, reward, done = env.step(action[0])
dataset.append((state, action, reward, next_state, done))
state = next_state
if done:
break
return dataset
# CQL learning
print("Generating offline data...")
offline_data = generate_offline_data(n_trajectories=200)
print(f"Dataset size: {len(offline_data)}")
agent = CQLAgent(state_dim=2, action_dim=1, alpha=1.0)
print("\nCQL training in progress...")
batch_size = 256
for epoch in range(100):
# Mini-batch sampling
batch = random.sample(offline_data, min(batch_size, len(offline_data)))
states, actions, rewards, next_states, dones = zip(*batch)
batch_tuple = (
np.array(states),
np.array(actions),
np.array(rewards),
np.array(next_states),
np.array(dones)
)
q_loss, cql_loss, p_loss = agent.train_step(batch_tuple)
if epoch % 20 == 0:
print(f"Epoch {epoch}: Q_loss={q_loss:.3f}, CQL_loss={cql_loss:.3f}, "
f"Policy_loss={p_loss:.3f}")
# Test
print("\nTesting CQL policy...")
from example1 import RandomizedCSTREnv
test_env = RandomizedCSTREnv(randomize=False)
state = test_env.reset()
total_reward = 0
for step in range(100):
action = agent.select_action(state, deterministic=True)
next_state, reward, done = test_env.step(action[0])
total_reward += reward
state = next_state
if done:
break
print(f"Test Reward: {total_reward:.2f}")
When AI agents take unexpected actions, operators need to be able to intervene. Implement intervention decision-making, smooth transitions, and history recording.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# Human-in-the-Loop System Implementation
import numpy as np
import time
from datetime import datetime
from enum import Enum
class ControlMode(Enum):
"""Control mode"""
AI_CONTROL = "AI"
HUMAN_OVERRIDE = "Human"
TRANSITION = "Transition"
class HumanOverrideSystem:
"""Human intervention system"""
def __init__(self):
self.mode = ControlMode.AI_CONTROL
self.intervention_history = []
self.confidence_threshold = 0.7
def check_intervention_needed(self, state, ai_action, confidence):
"""Check if intervention is needed"""
T, CA = state
# Trigger conditions
triggers = {
'high_temp': T > 380,
'low_temp': T < 320,
'low_confidence': confidence < self.confidence_threshold,
'extreme_action': abs(ai_action) > 0.9,
'unstable_state': CA < 0.2 or CA > 2.8
}
if any(triggers.values()):
reason = [k for k, v in triggers.items() if v]
return True, reason
return False, []
def request_human_action(self, state, ai_suggestion):
"""Request action from operator (GUI/CLI in practice)"""
T, CA = state
print(f"\n{'='*60}")
print(f"Human Intervention Requested!")
print(f"Current State: T={T:.2f}K, CA={CA:.3f}mol/L")
print(f"AI Suggested Action: {ai_suggestion:.3f}")
print(f"{'='*60}")
# Simplified: simulate with rule-based
# In practice, wait for operator input
if T > 380:
human_action = -0.8 # Strong cooling
override = True
elif T < 320:
human_action = 0.6 # Heating
override = True
else:
human_action = ai_suggestion
override = False
return human_action, override
def smooth_transition(self, from_action, to_action, alpha=0.3):
"""Smooth control transition"""
return alpha * to_action + (1 - alpha) * from_action
def log_intervention(self, timestamp, state, ai_action, human_action, reason):
"""Log intervention history"""
log_entry = {
'timestamp': timestamp,
'state': state.copy(),
'ai_action': ai_action,
'human_action': human_action,
'reason': reason
}
self.intervention_history.append(log_entry)
def generate_report(self):
"""Generate intervention report"""
if not self.intervention_history:
return "No interventions recorded."
report = "\n" + "="*60 + "\n"
report += "Human Intervention Report\n"
report += "="*60 + "\n"
report += f"Total interventions: {len(self.intervention_history)}\n\n"
for i, entry in enumerate(self.intervention_history):
report += f"Intervention {i+1}:\n"
report += f" Time: {entry['timestamp']}\n"
report += f" State: T={entry['state'][0]:.2f}K, CA={entry['state'][1]:.3f}\n"
report += f" AI action: {entry['ai_action']:.3f}\n"
report += f" Human action: {entry['human_action']:.3f}\n"
report += f" Reason: {', '.join(entry['reason'])}\n\n"
return report
class HITLController:
"""Human-in-the-Loop control system"""
def __init__(self, ai_agent, override_system):
self.ai_agent = ai_agent
self.override_system = override_system
self.prev_action = 0.0
def select_action(self, state, confidence=1.0):
"""Action selection considering human intervention"""
# AI suggestion
ai_action = self.ai_agent.select_action(state)[0]
# Intervention decision
need_intervention, reasons = self.override_system.check_intervention_needed(
state, ai_action, confidence)
if need_intervention:
# Query human
human_action, overridden = self.override_system.request_human_action(
state, ai_action)
if overridden:
# Record intervention
self.override_system.log_intervention(
datetime.now(), state, ai_action, human_action, reasons)
self.override_system.mode = ControlMode.HUMAN_OVERRIDE
final_action = human_action
else:
final_action = ai_action
else:
final_action = ai_action
self.override_system.mode = ControlMode.AI_CONTROL
# Smooth transition
final_action = self.override_system.smooth_transition(
self.prev_action, final_action)
self.prev_action = final_action
return final_action
# Usage example
from example1 import RobustSACAgent, RandomizedCSTREnv
print("Launching Human-in-the-Loop system...")
ai_agent = RobustSACAgent(obs_dim=2, action_dim=1)
override_system = HumanOverrideSystem()
controller = HITLController(ai_agent, override_system)
env = RandomizedCSTREnv(randomize=True)
state = env.reset()
# Test under severe conditions
for step in range(50):
# Simulate confidence (high uncertainty situation)
confidence = np.random.uniform(0.5, 1.0)
action = controller.select_action(state, confidence)
next_state, reward, done = env.step(action)
print(f"Step {step}: Mode={override_system.mode.value}, "
f"T={state[0]:.1f}K, Action={action:.3f}")
state = next_state
if done:
print("Episode ended (abnormal state)")
break
# Generate report
print(override_system.generate_report())
By quantifying how confident AI predictions are, take conservative actions in situations with high uncertainty. Estimate confidence intervals using Bayesian neural networks or ensemble methods.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# Uncertainty Quantification: Ensemble and Bayesian Approximation
import torch
import torch.nn as nn
import numpy as np
class EnsembleQNetwork:
"""Ensemble of Q functions"""
def __init__(self, state_dim, action_dim, n_models=5):
self.n_models = n_models
self.models = [
nn.Sequential(
nn.Linear(state_dim + action_dim, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
nn.Linear(128, 1)
) for _ in range(n_models)
]
self.optimizers = [
torch.optim.Adam(m.parameters(), lr=1e-3) for m in self.models
]
def predict_with_uncertainty(self, state, action):
"""Return prediction mean and uncertainty (standard deviation)"""
state_t = torch.FloatTensor(state).unsqueeze(0)
action_t = torch.FloatTensor(action).unsqueeze(0)
x = torch.cat([state_t, action_t], dim=-1)
predictions = []
for model in self.models:
with torch.no_grad():
pred = model(x).item()
predictions.append(pred)
mean = np.mean(predictions)
std = np.std(predictions)
return mean, std
def train_step(self, batch):
"""Train all models with different bootstrap samples"""
states, actions, targets = batch
losses = []
for i, (model, opt) in enumerate(zip(self.models, self.optimizers)):
# Bootstrap sampling
indices = np.random.choice(len(states), len(states), replace=True)
s = torch.FloatTensor(states[indices])
a = torch.FloatTensor(actions[indices])
t = torch.FloatTensor(targets[indices])
x = torch.cat([s, a], dim=-1)
pred = model(x).squeeze()
loss = nn.MSELoss()(pred, t)
opt.zero_grad()
loss.backward()
opt.step()
losses.append(loss.item())
return np.mean(losses)
class MCDropoutQNetwork(nn.Module):
"""Monte Carlo Dropout (Bayesian approximation)"""
def __init__(self, state_dim, action_dim, dropout_rate=0.1):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, 128),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(128, 128),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(128, 1)
)
def forward(self, state, action):
x = torch.cat([state, action], dim=-1)
return self.net(x)
def predict_with_uncertainty(self, state, action, n_samples=20):
"""Uncertainty estimation with MC Dropout"""
self.train() # Enable Dropout
state_t = torch.FloatTensor(state).unsqueeze(0)
action_t = torch.FloatTensor(action).unsqueeze(0)
predictions = []
for _ in range(n_samples):
pred = self.forward(state_t, action_t).item()
predictions.append(pred)
mean = np.mean(predictions)
std = np.std(predictions)
return mean, std
class UncertaintyAwareAgent:
"""Agent considering uncertainty"""
def __init__(self, state_dim, action_dim, method='ensemble'):
self.method = method
if method == 'ensemble':
self.q_network = EnsembleQNetwork(state_dim, action_dim, n_models=5)
else:
self.q_network = MCDropoutQNetwork(state_dim, action_dim)
self.policy = nn.Sequential(
nn.Linear(state_dim, 128), nn.ReLU(),
nn.Linear(128, action_dim), nn.Tanh()
)
self.uncertainty_threshold = 0.5 # Uncertainty threshold
def select_action(self, state):
"""Action selection considering uncertainty"""
with torch.no_grad():
nominal_action = self.policy(torch.FloatTensor(state)).numpy()
# Evaluate uncertainty of multiple candidates
action_candidates = [
nominal_action,
nominal_action * 0.5, # Conservative
np.zeros_like(nominal_action) # Maintain current
]
best_action = nominal_action
min_uncertainty = float('inf')
for action in action_candidates:
if self.method == 'ensemble':
q_mean, q_std = self.q_network.predict_with_uncertainty(state, action)
else:
q_mean, q_std = self.q_network.predict_with_uncertainty(state, action)
# Select action with low uncertainty and high Q-value
if q_std < self.uncertainty_threshold and q_std < min_uncertainty:
best_action = action
min_uncertainty = q_std
return best_action, min_uncertainty
# Demonstration
print("Uncertainty Quantification Demo\n")
# Ensemble method
ensemble_agent = UncertaintyAwareAgent(state_dim=2, action_dim=1, method='ensemble')
test_states = [
np.array([350.0, 1.5]), # Normal
np.array([385.0, 0.8]), # High temperature
np.array([310.0, 2.5]) # Low temperature
]
print("Ensemble Method:")
for i, state in enumerate(test_states):
action, uncertainty = ensemble_agent.select_action(state)
print(f"State {i+1}: T={state[0]}K, CA={state[1]}")
print(f" Action: {action[0]:.3f}, Uncertainty: {uncertainty:.3f}")
if uncertainty > ensemble_agent.uncertainty_threshold:
print(f" WARNING: High uncertainty! Consider human oversight.")
print()
# MC Dropout method
mc_agent = UncertaintyAwareAgent(state_dim=2, action_dim=1, method='mcdropout')
print("\nMC Dropout Method:")
for i, state in enumerate(test_states):
action, uncertainty = mc_agent.select_action(state)
print(f"State {i+1}: T={state[0]}K, CA={state[1]}")
print(f" Action: {action[0]:.3f}, Uncertainty: {uncertainty:.3f}")
if uncertainty > mc_agent.uncertainty_threshold:
print(f" WARNING: High uncertainty!")
print()
Continuously monitor performance after deployment to detect plant aging and model drift. Implement KPI monitoring, statistical testing, and anomaly detection.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - scipy>=1.11.0
# Performance Monitoring and Drift Detection System
import numpy as np
from collections import deque
from scipy import stats
class PerformanceMonitor:
"""Performance monitoring system"""
def __init__(self, window_size=100):
self.window_size = window_size
# KPI history
self.rewards = deque(maxlen=window_size)
self.temperatures = deque(maxlen=window_size)
self.concentrations = deque(maxlen=window_size)
self.actions = deque(maxlen=window_size)
# Baseline statistics
self.baseline_reward_mean = None
self.baseline_reward_std = None
# Anomaly counter
self.anomaly_count = 0
self.total_steps = 0
def update(self, state, action, reward):
"""Update KPI"""
T, CA = state
self.rewards.append(reward)
self.temperatures.append(T)
self.concentrations.append(CA)
self.actions.append(action)
self.total_steps += 1
def set_baseline(self):
"""Set baseline performance"""
if len(self.rewards) >= self.window_size:
self.baseline_reward_mean = np.mean(self.rewards)
self.baseline_reward_std = np.std(self.rewards)
print(f"Baseline set: μ={self.baseline_reward_mean:.2f}, "
f"σ={self.baseline_reward_std:.2f}")
def detect_drift(self, alpha=0.05):
"""Statistical drift detection (t-test)"""
if self.baseline_reward_mean is None or len(self.rewards) < 50:
return False, None
current_mean = np.mean(list(self.rewards)[-50:])
# t-test
t_stat, p_value = stats.ttest_1samp(
list(self.rewards)[-50:],
self.baseline_reward_mean
)
drift_detected = p_value < alpha and current_mean < self.baseline_reward_mean
return drift_detected, {
't_stat': t_stat,
'p_value': p_value,
'current_mean': current_mean,
'baseline_mean': self.baseline_reward_mean
}
def detect_anomaly(self, state, action):
"""Anomaly detection (3-sigma rule)"""
if len(self.rewards) < 30:
return False
T, CA = state
# Temperature anomaly
temp_mean = np.mean(self.temperatures)
temp_std = np.std(self.temperatures)
temp_anomaly = abs(T - temp_mean) > 3 * temp_std
# Concentration anomaly
conc_mean = np.mean(self.concentrations)
conc_std = np.std(self.concentrations)
conc_anomaly = abs(CA - conc_mean) > 3 * conc_std
# Action anomaly
action_mean = np.mean(self.actions)
action_std = np.std(self.actions)
action_anomaly = abs(action - action_mean) > 3 * action_std
is_anomaly = temp_anomaly or conc_anomaly or action_anomaly
if is_anomaly:
self.anomaly_count += 1
return is_anomaly
def generate_report(self):
"""Generate monitoring report"""
if len(self.rewards) == 0:
return "No data collected."
report = "\n" + "="*60 + "\n"
report += "Performance Monitoring Report\n"
report += "="*60 + "\n\n"
report += f"Total steps: {self.total_steps}\n"
report += f"Anomalies detected: {self.anomaly_count} "
report += f"({self.anomaly_count/self.total_steps*100:.2f}%)\n\n"
report += "KPI Statistics (last {} steps):\n".format(len(self.rewards))
report += f" Reward: μ={np.mean(self.rewards):.2f}, "
report += f"σ={np.std(self.rewards):.2f}\n"
report += f" Temperature: μ={np.mean(self.temperatures):.2f}K, "
report += f"σ={np.std(self.temperatures):.2f}K\n"
report += f" Concentration: μ={np.mean(self.concentrations):.3f}, "
report += f"σ={np.std(self.concentrations):.3f}\n"
report += f" Action: μ={np.mean(self.actions):.3f}, "
report += f"σ={np.std(self.actions):.3f}\n\n"
# Drift detection
drift, drift_info = self.detect_drift()
if drift:
report += "WARNING: Performance drift detected!\n"
report += f" Current mean reward: {drift_info['current_mean']:.2f}\n"
report += f" Baseline mean reward: {drift_info['baseline_mean']:.2f}\n"
report += f" p-value: {drift_info['p_value']:.4f}\n"
else:
report += "No significant drift detected.\n"
report += "="*60 + "\n"
return report
class DriftDetector:
"""More advanced drift detection (ADWIN)"""
def __init__(self, delta=0.002):
self.delta = delta
self.window = deque()
self.drift_detected = False
def add_element(self, value):
"""Add data point and check for drift"""
self.window.append(value)
# Simplified ADWIN: split window in half and compare means
if len(self.window) > 50:
mid = len(self.window) // 2
window1 = list(self.window)[:mid]
window2 = list(self.window)[mid:]
# Welch's t-test (does not assume equal variance)
t_stat, p_value = stats.ttest_ind(window1, window2, equal_var=False)
if p_value < self.delta:
self.drift_detected = True
self.window.clear() # Reset after drift detection
return True
return False
# Demonstration
from example1 import RandomizedCSTREnv, RobustSACAgent
print("Launching performance monitoring system...\n")
env = RandomizedCSTREnv(randomize=False)
agent = RobustSACAgent(obs_dim=2, action_dim=1)
monitor = PerformanceMonitor(window_size=100)
drift_detector = DriftDetector()
# Phase 1: Normal operation (baseline setting)
print("Phase 1: Setting baseline...")
state = env.reset()
for step in range(100):
action = agent.select_action(state)
next_state, reward, done = env.step(action[0])
monitor.update(state, action[0], reward)
state = next_state if not done else env.reset()
monitor.set_baseline()
# Phase 2: Degradation simulation
print("\nPhase 2: Simulating plant degradation...")
for step in range(200):
action = agent.select_action(state)
# Simulate performance degradation over time
degradation = step / 200 * 0.3
next_state, reward, done = env.step(action[0])
reward -= degradation * 10 # Performance decline
monitor.update(state, action[0], reward)
# Anomaly detection
if monitor.detect_anomaly(state, action[0]):
print(f"Step {step+100}: Anomaly detected!")
# Drift detection
if drift_detector.add_element(reward):
print(f"Step {step+100}: DRIFT DETECTED by ADWIN!")
state = next_state if not done else env.reset()
# Periodic drift check
if step % 50 == 0:
drift, info = monitor.detect_drift()
if drift:
print(f"\nStep {step+100}: Statistical drift detected!")
print(f" Current: {info['current_mean']:.2f}, "
f"Baseline: {info['baseline_mean']:.2f}")
# Final report
print(monitor.generate_report())
A practical deployment system integrating all previous elements (domain randomization, safety constraints, human override, uncertainty quantification, performance monitoring).
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# Integrated Deployment Framework
import numpy as np
import torch
from datetime import datetime
class IntegratedDeploymentSystem:
"""Fully integrated deployment system"""
def __init__(self, agent, env):
# Components
self.agent = agent
self.env = env
# Example 2: Safety constraints
from example2 import SafetyConstraints, ControlBarrierFunction, SimpleCSTRModel
self.safety = SafetyConstraints()
self.cbf = ControlBarrierFunction(self.safety)
self.model = SimpleCSTRModel()
# Example 4: Human override
from example4 import HumanOverrideSystem, ControlMode
self.override_system = HumanOverrideSystem()
# Example 5: Uncertainty quantification
from example5 import EnsembleQNetwork
self.uncertainty_estimator = EnsembleQNetwork(
state_dim=2, action_dim=1, n_models=5)
# Example 6: Performance monitoring
from example6 import PerformanceMonitor, DriftDetector
self.monitor = PerformanceMonitor(window_size=100)
self.drift_detector = DriftDetector()
# System state
self.prev_action = 0.0
self.running = True
self.emergency_stop = False
def preprocess_observation(self, raw_obs):
"""Preprocess sensor data"""
# Outlier removal (simple)
T, CA = raw_obs
T = np.clip(T, 250, 450)
CA = np.clip(CA, 0, 5)
return np.array([T, CA])
def estimate_uncertainty(self, state, action):
"""Estimate uncertainty"""
q_mean, q_std = self.uncertainty_estimator.predict_with_uncertainty(
state, action)
return q_std
def apply_safety_filter(self, state, action):
"""Apply safety filter"""
# Constraint projection
safe_action = self.safety.project_to_safe(action, self.prev_action)
# CBF constraint
safe_action = self.cbf.safe_action(state, safe_action, self.model)
return safe_action
def check_human_override(self, state, action, uncertainty):
"""Check for human intervention"""
need_intervention, reasons = self.override_system.check_intervention_needed(
state, action, confidence=1.0 - uncertainty)
if need_intervention:
human_action, overridden = self.override_system.request_human_action(
state, action)
if overridden:
self.override_system.log_intervention(
datetime.now(), state, action, human_action, reasons)
return human_action, True
return action, False
def monitor_performance(self, state, action, reward):
"""Performance monitoring and drift detection"""
self.monitor.update(state, action, reward)
# Anomaly detection
if self.monitor.detect_anomaly(state, action):
print(f" [MONITOR] Anomaly detected at step {self.monitor.total_steps}")
# Drift detection
if self.drift_detector.add_element(reward):
print(f" [MONITOR] Performance drift detected!")
return True # Retraining trigger
# Periodic statistical drift check
if self.monitor.total_steps % 100 == 0:
drift, info = self.monitor.detect_drift()
if drift:
print(f" [MONITOR] Statistical drift: "
f"current={info['current_mean']:.2f}, "
f"baseline={info['baseline_mean']:.2f}")
return True
return False
def control_loop(self, n_steps=500):
"""Main control loop"""
print("Integrated Deployment System Launch\n")
print("="*60)
state = self.env.reset()
state = self.preprocess_observation(state)
# Baseline setting
for step in range(100):
action = self.agent.select_action(state)[0]
next_state, reward, done = self.env.step(action)
next_state = self.preprocess_observation(next_state)
self.monitor.update(state, action, reward)
state = next_state if not done else self.env.reset()
self.monitor.set_baseline()
print("Baseline setting complete\n")
# Main loop
for step in range(n_steps):
if self.emergency_stop:
print("Emergency stop!")
break
# 1. AI policy
raw_action = self.agent.select_action(state)[0]
# 2. Uncertainty estimation
uncertainty = self.estimate_uncertainty(state, np.array([raw_action]))
# Conservative when high uncertainty
if uncertainty > 0.5:
raw_action *= 0.5
print(f" [UNCERTAINTY] High uncertainty ({uncertainty:.3f}), "
f"conservative action")
# 3. Safety filter
safe_action = self.apply_safety_filter(state, raw_action)
# 4. Human override
final_action, overridden = self.check_human_override(
state, safe_action, uncertainty)
# 5. Execute
next_state, reward, done = self.env.step(final_action)
next_state = self.preprocess_observation(next_state)
# 6. Performance monitoring
need_retraining = self.monitor_performance(state, final_action, reward)
if need_retraining:
print(f" [SYSTEM] Retraining recommended")
# Periodic reporting
if step % 100 == 0:
print(f"\nStep {step}:")
print(f" State: T={state[0]:.2f}K, CA={state[1]:.3f}")
print(f" Action: {final_action:.3f}, Uncertainty: {uncertainty:.3f}")
print(f" Mode: {self.override_system.mode.value}")
print(f" Anomalies: {self.monitor.anomaly_count}")
self.prev_action = final_action
state = next_state if not done else self.env.reset()
# Final report
print("\n" + "="*60)
print("Operation ended")
print("="*60)
print(self.monitor.generate_report())
print(self.override_system.generate_report())
# Execution
from example1 import RobustSACAgent, RandomizedCSTREnv
print("Integrated Deployment System Demonstration\n")
env = RandomizedCSTREnv(randomize=True)
agent = RobustSACAgent(obs_dim=2, action_dim=1)
system = IntegratedDeploymentSystem(agent, env)
system.control_loop(n_steps=300)
Chapter 5 Summary
What We Learned
- Sim-to-Real Transfer: Achieving robustness through domain randomization
- Safe Exploration: Danger avoidance with safety filters and CBF
- Conservative Q-Learning: Safe learning from offline data
- Human Override: Emergency intervention mechanism
- Uncertainty Quantification: Confidence assessment with ensemble or MC Dropout
- Performance Monitoring: Drift detection and anomaly detection
- Integrated Framework: Practical system combining all elements
Deployment Maturity Model
| Level | Description | Required Technology |
|---|---|---|
| L1: Laboratory | Simulation only | Basic RL |
| L2: Testing | Pilot plant | Sim-to-real, safety constraints |
| L3: Supervised | Real plant (human supervision) | Override, performance monitoring |
| L4: Autonomous | Fully autonomous operation | Full integration, continual learning |
Real Plant Deployment Checklist
- ✓ Sufficient simulation validation (1000+ episodes)
- ✓ Validity confirmation of parameter randomization
- ✓ Comprehensive definition of safety constraints
- ✓ Implementation and testing of emergency stop protocol
- ✓ Operator training and manual preparation
- ✓ Construction of performance monitoring dashboard
- ✓ Periodic performance evaluation and retraining plan
- ✓ Reporting structure to regulatory authorities
Final Cautionary Note
Real plant deployment of reinforcement learning is still an emerging technology. Especially in safety-critical environments like chemical plants, a phased approach is essential:
- Thorough validation in simulation
- Demonstration in pilot plant
- Limited operation under human supervision
- Gradual increase in autonomy level
It is important to always collaborate with human experts and use AI as a tool.
Series Complete
Congratulations!
You have completed all 5 chapters of the "Autonomous Process Operation with AI Agents" series. You have acquired a wide range of knowledge from reinforcement learning basics to real plant deployment.
For further learning, please also use the following series:
References
- Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
- Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
- Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
- McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.
Disclaimer
- This content is provided solely for educational, research, and informational purposes and does not constitute professional advice (legal, accounting, technical warranty, etc.).
- This content and accompanying code examples are provided "AS IS" without any warranty, express or implied, including but not limited to merchantability, fitness for a particular purpose, non-infringement, accuracy, completeness, operation, or safety.
- The author and Tohoku University assume no responsibility for the content, availability, or safety of external links, third-party data, tools, libraries, etc.
- To the maximum extent permitted by applicable law, the author and Tohoku University shall not be liable for any direct, indirect, incidental, special, consequential, or punitive damages arising from the use, execution, or interpretation of this content.
- The content may be changed, updated, or discontinued without notice.
- The copyright and license of this content are subject to the stated conditions (e.g., CC BY 4.0). Such licenses typically include no-warranty clauses.