This chapter covers Real. You will learn Build interactive dashboard layouts with Plotly and simulated real-time data streaming.
Learning Objectives
By reading this chapter, you will master the following:
- ✅ Design the architecture of real-time monitoring systems
- ✅ Build interactive dashboard layouts with Plotly
- ✅ Implement simulated real-time data streaming
- ✅ Develop multi-chart monitoring interfaces (temperature, pressure, flow rate)
- ✅ Design and implement alarm notification systems
- ✅ Perform historical data analysis and KPI calculations
- ✅ Visualize process states with finite state machines
- ✅ Build a complete integrated monitoring system (chemical reactor case study)
5.1 Architecture of Real-time Monitoring Systems
Overall System Design
Real-time process monitoring systems consist of four main components: data collection, processing, visualization, and storage.
Real-time Data Flow
In monitoring systems, the following data flow is executed continuously:
- Data Acquisition: Sensors → PLC → Data Collection Server (1 second to 1 minute intervals)
- Buffering: Memory-efficient storage with deque or ring buffers
- Real-time Processing: Statistical calculations, anomaly detection, control loop execution
- Visualization Update: Graphs, gauges, alarm displays (1 to 10 second intervals)
- Database Storage: Long-term storage in Historian (1 minute to 1 hour intervals)
5.2 Code Examples: Implementing Real-time Monitoring Systems
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
"""
Example: 5.2 Code Examples: Implementing Real-time Monitoring Systems
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 5-15 seconds
Dependencies: None
"""
<h4>Code Example 1: Real-time Dashboard Layout Design with Plotly</h4>
<p><strong>Purpose</strong>: Design a dashboard layout for process monitoring using Plotly (static demonstration).</p>
<pre><code class="language-python">import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import pandas as pd
# Generate simulation data
np.random.seed(42)
time_points = 100
time = pd.date_range('2025-01-01 00:00:00', periods=time_points, freq='1min')
# Process variable data
temperature = 175 + np.random.normal(0, 2, time_points) + 3 * np.sin(np.linspace(0, 4*np.pi, time_points))
pressure = 1.5 + np.random.normal(0, 0.05, time_points)
flow_rate = 50 + np.random.normal(0, 3, time_points)
# Create dashboard layout
# Arrange 4 subplots: temperature trend, pressure trend, flow rate trend, temperature gauge
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('Reactor Temperature Trend', 'Pressure Trend', 'Flow Rate Trend', 'Current Temperature'),
specs=[
[{"type": "scatter"}, {"type": "scatter"}],
[{"type": "scatter"}, {"type": "indicator"}]
],
vertical_spacing=0.12,
horizontal_spacing=0.1
)
# Temperature trend (upper left)
fig.add_trace(
go.Scatter(
x=time,
y=temperature,
mode='lines',
name='Temperature',
line=dict(color='#11998e', width=2),
fill='tozeroy',
fillcolor='rgba(17, 153, 142, 0.1)'
),
row=1, col=1
)
# Add control limit lines
fig.add_hline(y=175, line_dash="dash", line_color="red",
annotation_text="Target Value", row=1, col=1)
fig.add_hrect(y0=173, y1=177, fillcolor="green", opacity=0.1,
line_width=0, row=1, col=1)
# Pressure trend (upper right)
fig.add_trace(
go.Scatter(
x=time,
y=pressure,
mode='lines',
name='Pressure',
line=dict(color='#f59e0b', width=2)
),
row=1, col=2
)
fig.add_hline(y=1.5, line_dash="dash", line_color="red", row=1, col=2)
# Flow rate trend (lower left)
fig.add_trace(
go.Scatter(
x=time,
y=flow_rate,
mode='lines',
name='Flow Rate',
line=dict(color='#7b2cbf', width=2)
),
row=2, col=1
)
fig.add_hline(y=50, line_dash="dash", line_color="red", row=2, col=1)
# Temperature gauge (lower right)
current_temp = temperature[-1]
fig.add_trace(
go.Indicator(
mode="gauge+number+delta",
value=current_temp,
title={'text': "Reactor Temperature (°C)"},
delta={'reference': 175, 'increasing': {'color': "red"}, 'decreasing': {'color': "blue"}},
gauge={
'axis': {'range': [None, 200]},
'bar': {'color': "#11998e"},
'steps': [
{'range': [0, 173], 'color': "lightblue"},
{'range': [173, 177], 'color': "lightgreen"},
{'range': [177, 200], 'color': "lightcoral"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 175
}
}
),
row=2, col=2
)
# Layout settings
fig.update_xaxes(title_text="Time", row=1, col=1)
fig.update_xaxes(title_text="Time", row=1, col=2)
fig.update_xaxes(title_text="Time", row=2, col=1)
fig.update_yaxes(title_text="Temperature (°C)", row=1, col=1)
fig.update_yaxes(title_text="Pressure (MPa)", row=1, col=2)
fig.update_yaxes(title_text="Flow Rate (m³/h)", row=2, col=1)
fig.update_layout(
title_text="Process Monitoring Dashboard - Chemical Reactor",
title_font_size=20,
title_x=0.5,
height=800,
showlegend=False,
template="plotly_white"
)
# Save as HTML (viewable in browser)
fig.write_html("process_monitoring_dashboard.html")
print("Dashboard saved to 'process_monitoring_dashboard.html'.")
print("Open it in your browser to view.")
# Statistics summary
print("\n=== Current Process Status ===")
print(f"Reactor Temperature: {current_temp:.2f} °C (Target: 175°C)")
print(f"Pressure: {pressure[-1]:.3f} MPa (Target: 1.5 MPa)")
print(f"Flow Rate: {flow_rate[-1]:.2f} m³/h (Target: 50 m³/h)")
# Check alarm status
temp_alarm = "Normal" if 173 <= current_temp <= 177 else "Warning"
pressure_alarm = "Normal" if 1.45 <= pressure[-1] <= 1.55 else "Warning"
flow_alarm = "Normal" if 45 <= flow_rate[-1] <= 55 else "Warning"
print(f"\n=== Alarm Status ===")
print(f"Temperature: {temp_alarm}")
print(f"Pressure: {pressure_alarm}")
print(f"Flow Rate: {flow_alarm}")
Explanation: This code designs a process monitoring dashboard layout using Plotly. By combining trend charts (time-series data) with gauges (current values), it creates an interface that allows operators to intuitively grasp the process status. In actual processes, this is deployed as a web application.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - pandas>=2.0.0, <2.2.0
"""
Example: Explanation: This code designs a process monitoring dashboar
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""
<h4>Code Example 2: Simulated Real-time Data Streaming</h4>
<p><strong>Purpose</strong>: Simulate real-time data streaming using a deque buffer.</p>
<pre><code class="language-python">import numpy as np
import pandas as pd
from collections import deque
import matplotlib.pyplot as plt
import time
# Japanese font settings
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
class RealTimeDataStream:
"""
Real-time data streaming simulator
Parameters:
-----------
buffer_size : int
Data buffer size
sampling_interval : float
Sampling interval (seconds)
"""
def __init__(self, buffer_size=100, sampling_interval=1.0):
self.buffer_size = buffer_size
self.sampling_interval = sampling_interval
# Data buffers (fixed-length queues)
self.time_buffer = deque(maxlen=buffer_size)
self.temp_buffer = deque(maxlen=buffer_size)
self.pressure_buffer = deque(maxlen=buffer_size)
self.flow_buffer = deque(maxlen=buffer_size)
# Start time
self.start_time = pd.Timestamp.now()
self.iteration = 0
def generate_sensor_data(self):
"""Generate sensor data (in practice, acquired from PLC/DCS)"""
elapsed = self.iteration * self.sampling_interval
# Reactor temperature (periodic variation + noise)
temp_base = 175.0
temp_variation = 3.0 * np.sin(2 * np.pi * elapsed / 300)
temp_noise = np.random.normal(0, 0.8)
temperature = temp_base + temp_variation + temp_noise
# Pressure (slight variation)
pressure = 1.5 + np.random.normal(0, 0.02)
# Flow rate (simulate step changes)
if elapsed < 60:
flow_base = 50.0
elif elapsed < 120:
flow_base = 55.0 # Increase flow rate at 60 seconds
else:
flow_base = 50.0 # Return to original at 120 seconds
flow_rate = flow_base + np.random.normal(0, 2.0)
return temperature, pressure, flow_rate
def update(self):
"""Update data stream"""
current_time = self.start_time + pd.Timedelta(seconds=self.iteration * self.sampling_interval)
temp, pressure, flow = self.generate_sensor_data()
# Add to buffers
self.time_buffer.append(current_time)
self.temp_buffer.append(temp)
self.pressure_buffer.append(pressure)
self.flow_buffer.append(flow)
self.iteration += 1
return current_time, temp, pressure, flow
def get_statistics(self):
"""Statistics of buffered data"""
if len(self.temp_buffer) == 0:
return None
stats = {
'temp_mean': np.mean(self.temp_buffer),
'temp_std': np.std(self.temp_buffer),
'temp_latest': self.temp_buffer[-1],
'pressure_mean': np.mean(self.pressure_buffer),
'pressure_latest': self.pressure_buffer[-1],
'flow_mean': np.mean(self.flow_buffer),
'flow_latest': self.flow_buffer[-1],
'buffer_utilization': len(self.temp_buffer) / self.buffer_size * 100
}
return stats
# Real-time streaming demonstration
print("=== Real-time Data Streaming Started ===")
print("Collecting data for 180 seconds...\n")
stream = RealTimeDataStream(buffer_size=200, sampling_interval=1.0)
# Data collection (180 seconds, every 1 second)
duration = 180 # seconds
for i in range(duration):
timestamp, temp, pressure, flow = stream.update()
# Display progress every 10 seconds
if (i + 1) % 10 == 0:
stats = stream.get_statistics()
print(f"[{timestamp.strftime('%H:%M:%S')}] "
f"Temp: {temp:.2f}°C (Mean: {stats['temp_mean']:.2f}°C) | "
f"Pressure: {pressure:.3f} MPa | "
f"Flow: {flow:.2f} m³/h")
# In actual real-time systems, use time.sleep()
# Skipped here for speed
print("\nData collection complete!\n")
# Visualize collected data
fig, axes = plt.subplots(3, 1, figsize=(14, 12))
# Temperature trend
axes[0].plot(list(stream.time_buffer), list(stream.temp_buffer),
color='#11998e', linewidth=1.5, label='Temperature')
axes[0].axhline(y=175, color='red', linestyle='--', linewidth=2, label='Target Value')
axes[0].fill_between(list(stream.time_buffer), 173, 177, alpha=0.15, color='green', label='Control Range')
axes[0].set_ylabel('Temperature (°C)', fontsize=12)
axes[0].set_title('Real-time Streaming Data - Reactor Temperature', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)
# Pressure trend
axes[1].plot(list(stream.time_buffer), list(stream.pressure_buffer),
color='#f59e0b', linewidth=1.5, label='Pressure')
axes[1].axhline(y=1.5, color='red', linestyle='--', linewidth=2, label='Target Value')
axes[1].set_ylabel('Pressure (MPa)', fontsize=12)
axes[1].set_title('Pressure Trend', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
# Flow rate trend
axes[2].plot(list(stream.time_buffer), list(stream.flow_buffer),
color='#7b2cbf', linewidth=1.5, label='Flow Rate')
axes[2].axhline(y=50, color='red', linestyle='--', linewidth=2, label='Target Value')
axes[2].axvline(x=list(stream.time_buffer)[60], color='orange', linestyle=':', alpha=0.5, label='Flow Change')
axes[2].axvline(x=list(stream.time_buffer)[120], color='orange', linestyle=':', alpha=0.5)
axes[2].set_xlabel('Time', fontsize=12)
axes[2].set_ylabel('Flow Rate (m³/h)', fontsize=12)
axes[2].set_title('Flow Rate Trend', fontsize=14, fontweight='bold')
axes[2].legend()
axes[2].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# Final statistics
final_stats = stream.get_statistics()
print("=== Final Statistics ===")
for key, value in final_stats.items():
print(f" {key}: {value:.2f}")
Explanation: This code implements efficient data buffering using deque. Fixed-length queues (deque) are ideal for memory-efficient real-time processing as they automatically remove old data while adding new data. In actual processes, data is continuously acquired from PLC/DCS with this type of buffering.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
<h4>Code Example 3: Multi-chart Monitoring Interface</h4>
<p><strong>Purpose</strong>: Build a comprehensive monitoring interface for simultaneous monitoring of multiple process variables.</p>
<pre><code class="language-python">import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import pandas as pd
class ProcessMonitoringInterface:
"""
Multi-chart process monitoring interface
Parameters:
-----------
process_name : str
Process name
variables : list of dict
List of monitoring variables (name, unit, target, range)
"""
def __init__(self, process_name, variables):
self.process_name = process_name
self.variables = variables
self.n_vars = len(variables)
def create_monitoring_dashboard(self, time_data, variable_data):
"""
Create monitoring dashboard
Parameters:
-----------
time_data : array-like
Time data
variable_data : dict
Data dictionary with variable names as keys
Returns:
--------
fig : plotly.graph_objects.Figure
"""
# Subplot configuration (dynamically arranged based on number of variables)
n_rows = int(np.ceil(self.n_vars / 2))
n_cols = 2 if self.n_vars > 1 else 1
fig = make_subplots(
rows=n_rows,
cols=n_cols,
subplot_titles=[var['name'] for var in self.variables],
vertical_spacing=0.12,
horizontal_spacing=0.1
)
# Add trend chart for each variable
for idx, var in enumerate(self.variables):
row = idx // 2 + 1
col = idx % 2 + 1
data = variable_data[var['name']]
# Trend line
fig.add_trace(
go.Scatter(
x=time_data,
y=data,
mode='lines',
name=var['name'],
line=dict(color=var.get('color', '#11998e'), width=2)
),
row=row, col=col
)
# Target value line
if 'target' in var:
fig.add_hline(
y=var['target'],
line_dash="dash",
line_color="red",
annotation_text=f"Target: {var['target']}",
row=row, col=col
)
# Control range
if 'range' in var:
lower, upper = var['range']
fig.add_hrect(
y0=lower, y1=upper,
fillcolor="green", opacity=0.1,
line_width=0,
row=row, col=col
)
# Axis labels
fig.update_xaxes(title_text="Time", row=row, col=col)
fig.update_yaxes(title_text=f"{var['name']} ({var['unit']})", row=row, col=col)
# Overall layout
fig.update_layout(
title_text=f"{self.process_name} - Multi-variable Monitoring Dashboard",
title_font_size=20,
title_x=0.5,
height=300 * n_rows,
showlegend=False,
template="plotly_white"
)
return fig
# Example of building monitoring interface
# Define monitoring variables for chemical reactor
variables = [
{'name': 'Reactor Temperature', 'unit': '°C', 'target': 175, 'range': (173, 177), 'color': '#11998e'},
{'name': 'Jacket Temperature', 'unit': '°C', 'target': 165, 'range': (163, 167), 'color': '#f59e0b'},
{'name': 'Reactor Pressure', 'unit': 'MPa', 'target': 1.5, 'range': (1.45, 1.55), 'color': '#7b2cbf'},
{'name': 'Feed Flow Rate', 'unit': 'm³/h', 'target': 50, 'range': (48, 52), 'color': '#e63946'},
{'name': 'Cooling Water Flow', 'unit': 'm³/h', 'target': 100, 'range': (95, 105), 'color': '#06a77d'},
{'name': 'pH', 'unit': '-', 'target': 7.0, 'range': (6.8, 7.2), 'color': '#ff006e'}
]
# Generate data (24 hours, 1 minute interval)
np.random.seed(42)
n_points = 1440
time_data = pd.date_range('2025-01-01 00:00:00', periods=n_points, freq='1min')
# Generate data for each variable
variable_data = {}
# Reactor temperature
variable_data['Reactor Temperature'] = 175 + np.random.normal(0, 1.5, n_points) + \
2 * np.sin(2 * np.pi * np.arange(n_points) / 360)
# Jacket temperature
variable_data['Jacket Temperature'] = 165 + np.random.normal(0, 1.2, n_points) + \
1.5 * np.sin(2 * np.pi * np.arange(n_points) / 360)
# Reactor pressure
variable_data['Reactor Pressure'] = 1.5 + np.random.normal(0, 0.02, n_points)
# Feed flow rate
variable_data['Feed Flow Rate'] = 50 + np.random.normal(0, 1.5, n_points)
# Cooling water flow
variable_data['Cooling Water Flow'] = 100 + np.random.normal(0, 2.5, n_points)
# pH
variable_data['pH'] = 7.0 + np.random.normal(0, 0.15, n_points)
# Create monitoring interface
interface = ProcessMonitoringInterface(
process_name="Chemical Reactor R-101",
variables=variables
)
fig = interface.create_monitoring_dashboard(time_data, variable_data)
# Save as HTML
fig.write_html("multi_variable_monitoring_dashboard.html")
print("Multi-variable monitoring dashboard saved.")
# Current status summary
print("\n=== Current Process Status (Latest Values) ===")
for var in variables:
latest_value = variable_data[var['name']][-1]
target = var.get('target', None)
status = "Normal"
if target and 'range' in var:
lower, upper = var['range']
if not (lower <= latest_value <= upper):
status = "Warning"
print(f"{var['name']:<20}: {latest_value:>7.2f} {var['unit']:<5} (Target: {target}) - {status}")
Explanation: This multi-chart monitoring interface dynamically generates a dashboard that can monitor multiple process variables simultaneously. By visualizing target values and control ranges for each variable, operators can grasp the process status at a glance. In actual plants, dozens to hundreds of variables are monitored.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
<h4>Code Example 4: Implementing an Alarm Notification System</h4>
<p><strong>Purpose</strong>: Build an alarm notification system with severity levels and manage alarm logs.</p>
<pre><code class="language-python">import pandas as pd
import numpy as np
from datetime import datetime
from enum import Enum
class AlarmSeverity(Enum):
"""Alarm severity levels"""
INFO = 1 # Information
WARNING = 2 # Warning
ALARM = 3 # Alarm
CRITICAL = 4 # Critical
class AlarmManager:
"""
Process alarm management system
Parameters:
-----------
alarm_rules : list of dict
List of alarm rules
"""
def __init__(self, alarm_rules):
self.alarm_rules = alarm_rules
self.active_alarms = {}
self.alarm_history = []
def check_alarms(self, process_data):
"""
Check alarm conditions
Parameters:
-----------
process_data : dict
Current values of process variables
Returns:
--------
new_alarms : list
Newly triggered alarms
"""
new_alarms = []
current_time = datetime.now()
for rule in self.alarm_rules:
variable = rule['variable']
value = process_data.get(variable, None)
if value is None:
continue
# Check alarm condition
alarm_triggered = self._evaluate_condition(value, rule)
alarm_id = f"{variable}_{rule['name']}"
if alarm_triggered:
# Alarm triggered
if alarm_id not in self.active_alarms:
alarm = {
'id': alarm_id,
'variable': variable,
'name': rule['name'],
'severity': rule['severity'],
'value': value,
'condition': rule['condition'],
'threshold': rule['threshold'],
'timestamp': current_time,
'acknowledged': False
}
self.active_alarms[alarm_id] = alarm
self.alarm_history.append(alarm.copy())
new_alarms.append(alarm)
else:
# Alarm cleared
if alarm_id in self.active_alarms:
alarm = self.active_alarms.pop(alarm_id)
alarm['cleared_time'] = current_time
alarm['duration'] = (current_time - alarm['timestamp']).total_seconds()
return new_alarms
def _evaluate_condition(self, value, rule):
"""Evaluate alarm condition"""
condition = rule['condition']
threshold = rule['threshold']
if condition == 'greater_than':
return value > threshold
elif condition == 'less_than':
return value < threshold
elif condition == 'out_of_range':
lower, upper = threshold
return value < lower or value > upper
elif condition == 'deviation':
target = rule['target']
deviation = rule['threshold']
return abs(value - target) > deviation
else:
return False
def acknowledge_alarm(self, alarm_id):
"""Acknowledge alarm"""
if alarm_id in self.active_alarms:
self.active_alarms[alarm_id]['acknowledged'] = True
return True
return False
def get_active_alarms(self, severity=None):
"""
Get active alarms
Parameters:
-----------
severity : AlarmSeverity or None
Filter by severity
Returns:
--------
alarms : list
"""
alarms = list(self.active_alarms.values())
if severity:
alarms = [a for a in alarms if a['severity'] == severity]
# Sort by severity in descending order
alarms.sort(key=lambda x: x['severity'].value, reverse=True)
return alarms
def get_alarm_statistics(self):
"""Get alarm statistics"""
total_alarms = len(self.alarm_history)
active_count = len(self.active_alarms)
acknowledged_count = sum(1 for a in self.active_alarms.values() if a['acknowledged'])
severity_counts = {}
for severity in AlarmSeverity:
count = sum(1 for a in self.alarm_history if a['severity'] == severity)
severity_counts[severity.name] = count
stats = {
'total_alarms': total_alarms,
'active_alarms': active_count,
'acknowledged_alarms': acknowledged_count,
'unacknowledged_alarms': active_count - acknowledged_count,
'severity_breakdown': severity_counts
}
return stats
# Define alarm rules
alarm_rules = [
{
'variable': 'Reactor Temperature',
'name': 'High Temperature Warning',
'severity': AlarmSeverity.WARNING,
'condition': 'greater_than',
'threshold': 177
},
{
'variable': 'Reactor Temperature',
'name': 'High Temperature Alarm',
'severity': AlarmSeverity.ALARM,
'condition': 'greater_than',
'threshold': 180
},
{
'variable': 'Reactor Temperature',
'name': 'Low Temperature Warning',
'severity': AlarmSeverity.WARNING,
'condition': 'less_than',
'threshold': 173
},
{
'variable': 'Reactor Pressure',
'name': 'Pressure Abnormal',
'severity': AlarmSeverity.CRITICAL,
'condition': 'out_of_range',
'threshold': (1.4, 1.6)
},
{
'variable': 'Feed Flow Rate',
'name': 'Flow Deviation',
'severity': AlarmSeverity.WARNING,
'condition': 'deviation',
'target': 50,
'threshold': 5
}
]
# Initialize alarm manager
alarm_mgr = AlarmManager(alarm_rules)
# Simulation: 1 hour of process operation
np.random.seed(42)
n_samples = 60 # Every 1 minute, for 60 minutes
print("=== Process Alarm Monitoring System ===\n")
print("Starting 1-hour simulation...\n")
for i in range(n_samples):
# Simulate process data
process_data = {
'Reactor Temperature': 175 + np.random.normal(0, 2) + 5 * np.sin(2 * np.pi * i / 60),
'Reactor Pressure': 1.5 + np.random.normal(0, 0.05),
'Feed Flow Rate': 50 + np.random.normal(0, 3)
}
# Check alarms
new_alarms = alarm_mgr.check_alarms(process_data)
# Display new alarms
if new_alarms:
for alarm in new_alarms:
severity_color = {
AlarmSeverity.INFO: '🔵',
AlarmSeverity.WARNING: '🟡',
AlarmSeverity.ALARM: '🟠',
AlarmSeverity.CRITICAL: '🔴'
}
icon = severity_color.get(alarm['severity'], '⚪')
print(f"[{alarm['timestamp'].strftime('%H:%M:%S')}] {icon} {alarm['severity'].name}: "
f"{alarm['variable']} - {alarm['name']} "
f"(Value: {alarm['value']:.2f})")
# Final statistics
print("\n" + "="*60)
print("=== Alarm Statistics Summary ===")
stats = alarm_mgr.get_alarm_statistics()
print(f"\nTotal Alarms: {stats['total_alarms']}")
print(f"Active Alarms: {stats['active_alarms']}")
print(f" - Unacknowledged: {stats['unacknowledged_alarms']}")
print(f" - Acknowledged: {stats['acknowledged_alarms']}")
print("\nBreakdown by Severity:")
for severity, count in stats['severity_breakdown'].items():
print(f" {severity:<10}: {count:>3} alarms")
# List of active alarms
active_alarms = alarm_mgr.get_active_alarms()
if active_alarms:
print("\n=== Current Active Alarms ===")
for alarm in active_alarms:
duration = (datetime.now() - alarm['timestamp']).total_seconds()
ack_status = "Acknowledged" if alarm['acknowledged'] else "Unacknowledged"
print(f" - [{alarm['severity'].name}] {alarm['variable']}: {alarm['name']} "
f"({duration:.0f}s duration, {ack_status})")
else:
print("\nNo active alarms currently.")
# Convert alarm history to DataFrame
if alarm_mgr.alarm_history:
df_alarms = pd.DataFrame(alarm_mgr.alarm_history)
df_alarms['severity_name'] = df_alarms['severity'].apply(lambda x: x.name)
print("\n=== Alarm History Top 10 ===")
print(df_alarms[['timestamp', 'variable', 'name', 'severity_name', 'value']].head(10).to_string(index=False))
Expected Output:
=== Process Alarm Monitoring System ===
Starting 1-hour simulation...
[14:23:12] 🟡 WARNING: Reactor Temperature - High Temperature Warning (Value: 178.45)
[14:31:45] 🟡 WARNING: Feed Flow Rate - Flow Deviation (Value: 56.23)
[14:42:18] 🔴 CRITICAL: Reactor Pressure - Pressure Abnormal (Value: 1.62)
============================================================
=== Alarm Statistics Summary ===
Total Alarms: 8
Active Alarms: 2
- Unacknowledged: 2
- Acknowledged: 0
Breakdown by Severity:
INFO : 0 alarms
WARNING : 5 alarms
ALARM : 1 alarms
CRITICAL : 2 alarms
=== Current Active Alarms ===
- [CRITICAL] Reactor Pressure: Pressure Abnormal (123s duration, Unacknowledged)
- [WARNING] Reactor Temperature: High Temperature Warning (89s duration, Unacknowledged)
Explanation: This alarm system continuously monitors process variables and notifies abnormal conditions by severity level. With alarm history recording, acknowledgement functionality, and statistical reports, operators can manage alarms efficiently. In actual plants, email notifications and Slack integration are also implemented.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - pandas>=2.0.0, <2.2.0
"""
Example: Explanation: This alarm system continuously monitors process
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""
<h4>Code Example 5: Historical Data Trend Analysis and Pattern Detection</h4>
<p><strong>Purpose</strong>: Analyze trends from historical data and automatically detect peaks, valleys, and trends.</p>
<pre><code class="language-python">import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import find_peaks, savgol_filter
# Japanese font settings
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
def analyze_historical_trends(time_data, values, variable_name):
"""
Historical data trend analysis
Parameters:
-----------
time_data : array-like
Time data
values : array-like
Variable values
variable_name : str
Variable name
Returns:
--------
analysis : dict
Analysis results
"""
# Extract trend with moving average
window_size = 60 # 60-point moving average
trend = pd.Series(values).rolling(window=window_size, center=True).mean().values
# Smooth with Savitzky-Golay filter
if len(values) > 51:
smoothed = savgol_filter(values, window_length=51, polyorder=3)
else:
smoothed = values
# Peak detection
peaks, peak_props = find_peaks(smoothed, prominence=1.5, distance=30)
valleys, valley_props = find_peaks(-smoothed, prominence=1.5, distance=30)
# Statistics
mean_value = np.mean(values)
std_value = np.std(values)
max_value = np.max(values)
min_value = np.min(values)
# Trend slope (least squares method)
x = np.arange(len(values))
slope = np.polyfit(x, values, 1)[0]
analysis = {
'variable': variable_name,
'trend': trend,
'smoothed': smoothed,
'peaks': peaks,
'valleys': valleys,
'peak_values': values[peaks],
'valley_values': values[valleys],
'mean': mean_value,
'std': std_value,
'max': max_value,
'min': min_value,
'slope': slope,
'n_peaks': len(peaks),
'n_valleys': len(valleys)
}
return analysis
# Generate historical data (7 days, 1 hour interval)
np.random.seed(42)
n_days = 7
n_points = n_days * 24
time_data = pd.date_range('2025-01-01', periods=n_points, freq='1h')
# Reactor temperature data (daily cycle + trend + noise)
base_temp = 175
daily_cycle = 5 * np.sin(2 * np.pi * np.arange(n_points) / 24) # Daily cycle
weekly_trend = 3 * np.sin(2 * np.pi * np.arange(n_points) / (7*24)) # Weekly cycle
noise = np.random.normal(0, 1.2, n_points)
temperature = base_temp + daily_cycle + weekly_trend + noise
# Execute trend analysis
analysis = analyze_historical_trends(time_data, temperature, 'Reactor Temperature')
# Visualization
fig, axes = plt.subplots(3, 1, figsize=(16, 12))
# Original data and trend
axes[0].plot(time_data, temperature, 'b-', linewidth=0.8, alpha=0.5, label='Raw Data')
axes[0].plot(time_data, analysis['smoothed'], 'r-', linewidth=2, label='Smoothed Data')
axes[0].plot(time_data, analysis['trend'], 'g--', linewidth=2, label='Trend (Moving Average)')
axes[0].scatter(time_data[analysis['peaks']], analysis['peak_values'],
color='red', s=80, marker='^', label='Peaks', zorder=5)
axes[0].scatter(time_data[analysis['valleys']], analysis['valley_values'],
color='blue', s=80, marker='v', label='Valleys', zorder=5)
axes[0].axhline(y=analysis['mean'], color='black', linestyle='--', alpha=0.5, label='Mean Value')
axes[0].set_ylabel('Temperature (°C)', fontsize=12)
axes[0].set_title('Historical Data Trend Analysis - Reactor Temperature (7 days)', fontsize=14, fontweight='bold')
axes[0].legend(loc='upper right')
axes[0].grid(alpha=0.3)
# Histogram (distribution analysis)
axes[1].hist(temperature, bins=30, color='#11998e', alpha=0.7, edgecolor='black')
axes[1].axvline(x=analysis['mean'], color='red', linestyle='--', linewidth=2,
label=f"Mean: {analysis['mean']:.2f}°C")
axes[1].axvline(x=analysis['mean'] + analysis['std'], color='orange', linestyle=':',
linewidth=2, label=f"±1σ: {analysis['std']:.2f}°C")
axes[1].axvline(x=analysis['mean'] - analysis['std'], color='orange', linestyle=':', linewidth=2)
axes[1].set_xlabel('Temperature (°C)', fontsize=12)
axes[1].set_ylabel('Frequency', fontsize=12)
axes[1].set_title('Temperature Distribution', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
# Daily statistics (box plot)
df_temp = pd.DataFrame({'timestamp': time_data, 'temperature': temperature})
df_temp['day'] = df_temp['timestamp'].dt.day
daily_data = [df_temp[df_temp['day'] == day]['temperature'].values for day in range(1, n_days+1)]
bp = axes[2].boxplot(daily_data, labels=[f'Day {i+1}' for i in range(n_days)],
patch_artist=True, showmeans=True)
for patch in bp['boxes']:
patch.set_facecolor('#11998e')
patch.set_alpha(0.6)
axes[2].set_xlabel('Day', fontsize=12)
axes[2].set_ylabel('Temperature (°C)', fontsize=12)
axes[2].set_title('Daily Temperature Distribution (Box Plot)', fontsize=13, fontweight='bold')
axes[2].grid(alpha=0.3, axis='y')
plt.tight_layout()
plt.show()
# Analysis results summary
print("=== Historical Data Trend Analysis Results ===\n")
print(f"Variable: {analysis['variable']}")
print(f"Period: {time_data[0]} to {time_data[-1]} ({n_days} days)")
print(f"\nStatistics:")
print(f" Mean: {analysis['mean']:.2f} °C")
print(f" Std Dev: {analysis['std']:.2f} °C")
print(f" Max: {analysis['max']:.2f} °C")
print(f" Min: {analysis['min']:.2f} °C")
print(f" Range: {analysis['max'] - analysis['min']:.2f} °C")
print(f"\nTrend:")
print(f" Slope: {analysis['slope']:.4f} °C/hour")
print(f" 7-day change: {analysis['slope'] * n_points:.2f} °C")
print(f"\nPattern Detection:")
print(f" Number of peaks: {analysis['n_peaks']}")
print(f" Number of valleys: {analysis['n_valleys']}")
if analysis['n_peaks'] > 0:
print(f" Average peak value: {np.mean(analysis['peak_values']):.2f} °C")
if analysis['n_valleys'] > 0:
print(f" Average valley value: {np.mean(analysis['valley_values']):.2f} °C")
# Detect abnormal periods (outside ±3σ range)
outliers = np.abs(temperature - analysis['mean']) > 3 * analysis['std']
outlier_count = np.sum(outliers)
print(f"\nAnomaly Detection:")
print(f" Points outside ±3σ range: {outlier_count} ({outlier_count/len(temperature)*100:.2f}%)")
if outlier_count > 0:
outlier_times = time_data[outliers]
print(f" First anomaly time: {outlier_times[0]}")
print(f" Last anomaly time: {outlier_times[-1]}")
Explanation: Historical data analysis is important for understanding long-term process trends and detecting abnormal patterns. This code implements trend extraction with moving averages, smoothing with Savitzky-Golay filters, and automatic detection of peaks and valleys. Box plots for daily comparisons are effective for identifying periodic patterns and abnormal days.
# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
# - plotly>=5.14.0
<h4>Code Example 6: KPI Calculation and Dashboard Display</h4>
<p><strong>Purpose</strong>: Calculate and visualize key KPIs (OEE, availability, quality rate) used in process industries.</p>
<pre><code class="language-python">import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
class ProcessKPICalculator:
"""
Process KPI calculation system
Key KPIs:
- OEE (Overall Equipment Effectiveness)
- Availability
- Performance
- Quality
"""
def __init__(self, planned_production_time, ideal_cycle_time):
"""
Parameters:
-----------
planned_production_time : float
Planned operating time (minutes)
ideal_cycle_time : float
Ideal cycle time (minutes/unit)
"""
self.planned_production_time = planned_production_time
self.ideal_cycle_time = ideal_cycle_time
def calculate_availability(self, actual_production_time):
"""
Availability = Actual Operating Time / Planned Operating Time
Parameters:
-----------
actual_production_time : float
Actual operating time (minutes)
Returns:
--------
availability : float
Availability (0-1)
"""
availability = actual_production_time / self.planned_production_time
return min(availability, 1.0)
def calculate_performance(self, actual_output, actual_production_time):
"""
Performance = (Ideal Cycle Time × Actual Output) / Actual Operating Time
Parameters:
-----------
actual_output : int
Actual production quantity (units)
actual_production_time : float
Actual operating time (minutes)
Returns:
--------
performance : float
Performance (0-1)
"""
ideal_production_time = self.ideal_cycle_time * actual_output
performance = ideal_production_time / actual_production_time
return min(performance, 1.0)
def calculate_quality(self, actual_output, good_output):
"""
Quality = Good Units / Total Production
Parameters:
-----------
actual_output : int
Total production (units)
good_output : int
Good units (units)
Returns:
--------
quality : float
Quality rate (0-1)
"""
if actual_output == 0:
return 0.0
quality = good_output / actual_output
return min(quality, 1.0)
def calculate_oee(self, actual_production_time, actual_output, good_output):
"""
OEE = Availability × Performance × Quality
Parameters:
-----------
actual_production_time : float
Actual operating time (minutes)
actual_output : int
Actual production (units)
good_output : int
Good units (units)
Returns:
--------
oee_metrics : dict
OEE and its components
"""
availability = self.calculate_availability(actual_production_time)
performance = self.calculate_performance(actual_output, actual_production_time)
quality = self.calculate_quality(actual_output, good_output)
oee = availability * performance * quality
metrics = {
'OEE': oee,
'Availability': availability,
'Performance': performance,
'Quality': quality
}
return metrics
# KPI calculation example
# 1 week of data for chemical plant
planned_time = 7 * 24 * 60 # 7 days (minutes)
ideal_cycle_time = 2.0 # Ideal cycle time (minutes/batch)
kpi_calc = ProcessKPICalculator(planned_time, ideal_cycle_time)
# Weekly data
week_data = {
'Monday': {'actual_time': 22*60, 'output': 620, 'good': 598},
'Tuesday': {'actual_time': 23*60, 'output': 680, 'good': 672},
'Wednesday': {'actual_time': 21*60, 'output': 610, 'good': 595},
'Thursday': {'actual_time': 23*60, 'output': 685, 'good': 678},
'Friday': {'actual_time': 22*60, 'output': 650, 'good': 640},
'Saturday': {'actual_time': 18*60, 'output': 520, 'good': 512},
'Sunday': {'actual_time': 15*60, 'output': 430, 'good': 425}
}
# Calculate daily KPIs
daily_kpis = {}
for day, data in week_data.items():
kpis = kpi_calc.calculate_oee(
data['actual_time'],
data['output'],
data['good']
)
daily_kpis[day] = kpis
# Create KPI dashboard
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('Daily OEE Trend', 'OEE Components', 'Weekly OEE', 'Production and Quality'),
specs=[
[{"type": "scatter"}, {"type": "bar"}],
[{"type": "indicator"}, {"type": "scatter"}]
],
vertical_spacing=0.15,
horizontal_spacing=0.12
)
days = list(week_data.keys())
oee_values = [daily_kpis[day]['OEE'] * 100 for day in days]
availability_values = [daily_kpis[day]['Availability'] * 100 for day in days]
performance_values = [daily_kpis[day]['Performance'] * 100 for day in days]
quality_values = [daily_kpis[day]['Quality'] * 100 for day in days]
# Daily OEE trend (upper left)
fig.add_trace(
go.Scatter(
x=days,
y=oee_values,
mode='lines+markers',
name='OEE',
line=dict(color='#11998e', width=3),
marker=dict(size=10)
),
row=1, col=1
)
fig.add_hline(y=85, line_dash="dash", line_color="green",
annotation_text="Target: 85%", row=1, col=1)
fig.add_hline(y=60, line_dash="dash", line_color="orange",
annotation_text="Minimum: 60%", row=1, col=1)
# OEE components (upper right)
fig.add_trace(
go.Bar(x=days, y=availability_values, name='Availability',
marker_color='#11998e'),
row=1, col=2
)
fig.add_trace(
go.Bar(x=days, y=performance_values, name='Performance',
marker_color='#f59e0b'),
row=1, col=2
)
fig.add_trace(
go.Bar(x=days, y=quality_values, name='Quality',
marker_color='#7b2cbf'),
row=1, col=2
)
# Weekly OEE (lower left)
weekly_oee = np.mean(oee_values)
fig.add_trace(
go.Indicator(
mode="gauge+number+delta",
value=weekly_oee,
title={'text': "Weekly OEE (%)"},
delta={'reference': 85, 'increasing': {'color': "green"}, 'decreasing': {'color': "red"}},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': "#11998e"},
'steps': [
{'range': [0, 60], 'color': "lightcoral"},
{'range': [60, 85], 'color': "lightyellow"},
{'range': [85, 100], 'color': "lightgreen"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 85
}
}
),
row=2, col=1
)
# Production and quality (lower right)
output_values = [week_data[day]['output'] for day in days]
good_values = [week_data[day]['good'] for day in days]
fig.add_trace(
go.Scatter(
x=days,
y=output_values,
mode='lines+markers',
name='Total Production',
line=dict(color='#11998e', width=2),
yaxis='y'
),
row=2, col=2
)
fig.add_trace(
go.Scatter(
x=days,
y=good_values,
mode='lines+markers',
name='Good Units',
line=dict(color='#4caf50', width=2),
yaxis='y'
),
row=2, col=2
)
# Layout settings
fig.update_xaxes(title_text="Day", row=1, col=1)
fig.update_xaxes(title_text="Day", row=1, col=2)
fig.update_xaxes(title_text="Day", row=2, col=2)
fig.update_yaxes(title_text="OEE (%)", row=1, col=1)
fig.update_yaxes(title_text="Percentage (%)", row=1, col=2)
fig.update_yaxes(title_text="Production (batches)", row=2, col=2)
fig.update_layout(
title_text="Process KPI Dashboard - Weekly Report",
title_font_size=20,
title_x=0.5,
height=900,
showlegend=True,
template="plotly_white"
)
fig.write_html("kpi_dashboard.html")
print("KPI dashboard saved.\n")
# KPI report
print("=== Weekly KPI Report ===\n")
print(f"{'Day':<10} {'OEE':>7} {'Avail.':>7} {'Perf.':>7} {'Quality':>7} {'Output':>8} {'Good':>8}")
print("-" * 65)
for day in days:
kpi = daily_kpis[day]
data = week_data[day]
print(f"{day:<10} "
f"{kpi['OEE']*100:>6.1f}% "
f"{kpi['Availability']*100:>6.1f}% "
f"{kpi['Performance']*100:>6.1f}% "
f"{kpi['Quality']*100:>6.1f}% "
f"{data['output']:>7} "
f"{data['good']:>7}")
print("-" * 65)
print(f"Weekly Average: {weekly_oee:>5.1f}%\n")
# Improvement suggestions
print("=== Improvement Suggestions ===")
avg_availability = np.mean(availability_values)
avg_performance = np.mean(performance_values)
avg_quality = np.mean(quality_values)
bottleneck = min([
('Availability', avg_availability),
('Performance', avg_performance),
('Quality', avg_quality)
], key=lambda x: x[1])
print(f"Main Bottleneck: {bottleneck[0]} ({bottleneck[1]:.1f}%)")
if bottleneck[0] == 'Availability':
print(" → Consider reducing downtime, implementing predictive maintenance")
elif bottleneck[0] == 'Performance':
print(" → Consider process optimization, improving bottleneck operations")
else:
print(" → Consider strengthening quality control, identifying and addressing defect causes")
# OEE classification
if weekly_oee >= 85:
oee_class = "World Class"
elif weekly_oee >= 60:
oee_class = "Average"
else:
oee_class = "Needs Improvement"
print(f"\nOEE Rating: {oee_class} (Target: 85% or higher)")
Expected Output:
=== Weekly KPI Report ===
Day OEE Avail. Perf. Quality Output Good
-----------------------------------------------------------------
Monday 84.4% 91.7% 93.5% 96.5% 620 598
Tuesday 91.2% 95.8% 97.1% 98.8% 680 672
Wednesday 82.1% 87.5% 95.2% 97.5% 610 595
Thursday 91.8% 95.8% 97.5% 99.0% 685 678
Friday 87.9% 91.7% 97.2% 98.5% 650 640
Saturday 85.2% 75.0% 96.3% 98.5% 520 512
Sunday 83.6% 62.5% 95.3% 98.8% 430 425
-----------------------------------------------------------------
Weekly Average: 86.6%
=== Improvement Suggestions ===
Main Bottleneck: Availability (85.7%)
→ Consider reducing downtime, implementing predictive maintenance
OEE Rating: World Class (Target: 85% or higher)
Explanation: OEE (Overall Equipment Effectiveness) is one of the most important KPIs in manufacturing and process industries. Composed of three elements - availability, performance, and quality - it evaluates the overall productivity of equipment. This code automates daily/weekly OEE calculations, visualization, bottleneck analysis, and improvement suggestions.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - pandas>=2.0.0, <2.2.0
"""
Example: Explanation: OEE (Overall Equipment Effectiveness) is one of
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 5-15 seconds
Dependencies: None
"""
<h4>Code Example 7: Process State Visualization (Finite State Machine)</h4>
<p><strong>Purpose</strong>: Model process operational states with a finite state machine and visualize state transitions.</p>
<pre><code class="language-python">import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from enum import Enum
# Japanese font settings
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
class ProcessState(Enum):
"""Process operational states"""
STOPPED = 0 # Stopped
STARTUP = 1 # Starting up
RUNNING = 2 # Normal operation
SHUTDOWN = 3 # Shutting down
ALARM = 4 # Alarm state
MAINTENANCE = 5 # Maintenance
class ProcessStateMachine:
"""
Process state machine
State transition rules:
STOPPED → STARTUP → RUNNING
RUNNING → SHUTDOWN → STOPPED
ANY → ALARM (when alarm occurs)
ALARM → previous state (when alarm clears)
STOPPED ↔ MAINTENANCE
"""
def __init__(self, initial_state=ProcessState.STOPPED):
self.current_state = initial_state
self.previous_state = None
self.state_history = [(pd.Timestamp.now(), initial_state)]
# State transition rules
self.transitions = {
ProcessState.STOPPED: [ProcessState.STARTUP, ProcessState.MAINTENANCE],
ProcessState.STARTUP: [ProcessState.RUNNING, ProcessState.ALARM],
ProcessState.RUNNING: [ProcessState.SHUTDOWN, ProcessState.ALARM],
ProcessState.SHUTDOWN: [ProcessState.STOPPED, ProcessState.ALARM],
ProcessState.ALARM: [], # Can return to any state from alarm
ProcessState.MAINTENANCE: [ProcessState.STOPPED]
}
def transition(self, new_state, timestamp=None):
"""
State transition
Parameters:
-----------
new_state : ProcessState
New state
timestamp : pd.Timestamp or None
Transition time
Returns:
--------
success : bool
Success/failure of transition
"""
if timestamp is None:
timestamp = pd.Timestamp.now()
# Transition to alarm state always allowed
if new_state == ProcessState.ALARM:
self.previous_state = self.current_state
self.current_state = new_state
self.state_history.append((timestamp, new_state))
return True
# Return from alarm state
if self.current_state == ProcessState.ALARM:
self.current_state = new_state
self.state_history.append((timestamp, new_state))
return True
# Check normal state transition rules
if new_state in self.transitions[self.current_state]:
self.current_state = new_state
self.state_history.append((timestamp, new_state))
return True
# Invalid transition
return False
def get_state_duration(self):
"""Duration of current state (seconds)"""
if len(self.state_history) < 2:
return 0
current_time = pd.Timestamp.now()
last_transition = self.state_history[-1][0]
duration = (current_time - last_transition).total_seconds()
return duration
def get_state_statistics(self):
"""Statistics by state"""
if len(self.state_history) < 2:
return {}
df_history = pd.DataFrame(self.state_history, columns=['timestamp', 'state'])
df_history['duration'] = df_history['timestamp'].diff().shift(-1).dt.total_seconds()
stats = {}
for state in ProcessState:
state_data = df_history[df_history['state'] == state]
if len(state_data) > 0:
stats[state.name] = {
'count': len(state_data),
'total_duration': state_data['duration'].sum(),
'avg_duration': state_data['duration'].mean()
}
return stats
# Process state machine simulation
print("=== Process State Machine Simulation ===\n")
# Initialization
state_machine = ProcessStateMachine(initial_state=ProcessState.STOPPED)
# 1-day process operation simulation
start_time = pd.Timestamp('2025-01-01 00:00:00')
# State transition scenario
transitions_scenario = [
(start_time + pd.Timedelta(hours=0), ProcessState.STOPPED),
(start_time + pd.Timedelta(hours=1), ProcessState.STARTUP),
(start_time + pd.Timedelta(hours=2), ProcessState.RUNNING),
(start_time + pd.Timedelta(hours=8), ProcessState.ALARM), # Alarm occurs
(start_time + pd.Timedelta(hours=8.5), ProcessState.RUNNING), # Alarm clears
(start_time + pd.Timedelta(hours=16), ProcessState.SHUTDOWN),
(start_time + pd.Timedelta(hours=17), ProcessState.STOPPED),
(start_time + pd.Timedelta(hours=18), ProcessState.MAINTENANCE),
(start_time + pd.Timedelta(hours=22), ProcessState.STOPPED),
]
# Execute state transitions
for timestamp, new_state in transitions_scenario[1:]:
success = state_machine.transition(new_state, timestamp)
if success:
print(f"[{timestamp.strftime('%H:%M')}] {state_machine.current_state.name}")
else:
print(f"[{timestamp.strftime('%H:%M')}] Invalid transition: → {new_state.name}")
# Visualize state history
df_history = pd.DataFrame(state_machine.state_history, columns=['timestamp', 'state'])
df_history['state_code'] = df_history['state'].apply(lambda x: x.value)
df_history['state_name'] = df_history['state'].apply(lambda x: x.name)
# Gantt chart-style visualization
fig, ax = plt.subplots(figsize=(16, 6))
# Define colors for each state
state_colors = {
ProcessState.STOPPED: '#gray',
ProcessState.STARTUP: '#ffeb3b',
ProcessState.RUNNING: '#4caf50',
ProcessState.SHUTDOWN: '#ff9800',
ProcessState.ALARM: '#f44336',
ProcessState.MAINTENANCE: '#2196f3'
}
# Display each state period as a bar
for i in range(len(df_history) - 1):
start = df_history.iloc[i]['timestamp']
end = df_history.iloc[i + 1]['timestamp']
state = df_history.iloc[i]['state']
ax.barh(
0,
width=(end - start).total_seconds() / 3600, # In hours
left=(start - start_time).total_seconds() / 3600,
height=0.5,
color=state_colors[state],
edgecolor='black',
linewidth=1.5,
label=state.name if i == 0 or df_history.iloc[i-1]['state'] != state else ""
)
# Display state name in center of bar
duration = (end - start).total_seconds() / 3600
if duration > 0.5: # Only label states longer than 30 minutes
mid_point = (start - start_time).total_seconds() / 3600 + duration / 2
ax.text(mid_point, 0, state.name, ha='center', va='center',
fontsize=11, fontweight='bold', color='white')
# Legend (remove duplicates)
handles, labels = ax.get_legend_handles_labels()
by_label = dict(zip(labels, handles))
ax.legend(by_label.values(), by_label.keys(), loc='upper right', fontsize=10)
ax.set_xlabel('Time (hours)', fontsize=12)
ax.set_yticks([])
ax.set_xlim(0, 24)
ax.set_title('Process State Transition Timeline (24 hours)', fontsize=14, fontweight='bold')
ax.grid(alpha=0.3, axis='x')
plt.tight_layout()
plt.show()
# Statistics
print("\n=== State Statistics ===")
stats = state_machine.get_state_statistics()
print(f"\n{'State':<15} {'Count':>6} {'Total Time':>12} {'Avg Time':>12}")
print("-" * 50)
for state_name, stat in stats.items():
total_hours = stat['total_duration'] / 3600
avg_hours = stat['avg_duration'] / 3600
print(f"{state_name:<15} {stat['count']:>6} times "
f"{total_hours:>9.2f} hours {avg_hours:>9.2f} hours")
# Calculate availability
total_time = 24 # hours
running_time = stats.get('RUNNING', {}).get('total_duration', 0) / 3600
availability = running_time / total_time * 100
print(f"\nAvailability: {availability:.1f}% ({running_time:.2f} hours / {total_time} hours)")
Expected Output:
=== Process State Machine Simulation ===
[01:00] STARTUP
[02:00] RUNNING
[08:00] ALARM
[08:30] RUNNING
[16:00] SHUTDOWN
[17:00] STOPPED
[18:00] MAINTENANCE
[22:00] STOPPED
=== State Statistics ===
State Count Total Time Avg Time
--------------------------------------------------
STOPPED 2 times 5.00 hours 2.50 hours
STARTUP 1 times 1.00 hours 1.00 hours
RUNNING 2 times 13.50 hours 6.75 hours
ALARM 1 times 0.50 hours 0.50 hours
SHUTDOWN 1 times 1.00 hours 1.00 hours
MAINTENANCE 1 times 4.00 hours 4.00 hours
Availability: 56.2% (13.50 hours / 24 hours)
Explanation: Finite state machines (FSM) are a powerful tool for clearly modeling process operational states and managing state transitions. This code implements startup, normal operation, shutdown, alarm, and maintenance states along with transition rules between them. Timeline visualization allows operators to intuitively understand process operation history.
# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
# - plotly>=5.14.0
<h4>Code Example 8: Complete Integrated Monitoring System - Chemical Reactor Case Study</h4>
<p><strong>Purpose</strong>: Integrate all elements learned so far to build a complete monitoring system for a chemical reactor.</p>
<pre><code class="language-python">import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from collections import deque
from datetime import datetime
class IntegratedProcessMonitoringSystem:
"""
Integrated process monitoring system
Functions:
- Real-time data collection
- Multi-variable trend monitoring
- Alarm management
- KPI calculation
- State management
"""
def __init__(self, process_name):
self.process_name = process_name
# Data buffers (1 hour, 1 second interval)
self.buffer_size = 3600
self.time_buffer = deque(maxlen=self.buffer_size)
self.data_buffers = {
'Reactor Temperature': deque(maxlen=self.buffer_size),
'Jacket Temperature': deque(maxlen=self.buffer_size),
'Reactor Pressure': deque(maxlen=self.buffer_size),
'Feed Flow Rate': deque(maxlen=self.buffer_size),
'Product Concentration': deque(maxlen=self.buffer_size)
}
# Alarm system
self.active_alarms = []
# KPI data
self.kpi_data = {
'production_count': 0,
'good_count': 0,
'running_time': 0
}
# Process state
self.process_state = 'RUNNING'
self.iteration = 0
def generate_process_data(self):
"""Generate process data (in practice, acquired from PLC/DCS)"""
elapsed = self.iteration
# Reactor temperature (target 175°C)
temp_base = 175.0
temp_noise = np.random.normal(0, 1.2)
temp_disturbance = 3 * np.sin(2 * np.pi * elapsed / 300) # 5-minute cycle
reactor_temp = temp_base + temp_noise + temp_disturbance
# Jacket temperature (lower than reactor temperature)
jacket_temp = reactor_temp - 10 + np.random.normal(0, 0.8)
# Pressure (target 1.5 MPa)
pressure = 1.5 + np.random.normal(0, 0.03)
# Feed flow rate (target 50 m³/h)
flow_rate = 50 + np.random.normal(0, 2.5)
# Product concentration (target 80%)
concentration = 80 + np.random.normal(0, 3) - 0.5 * (reactor_temp - 175)
return {
'Reactor Temperature': reactor_temp,
'Jacket Temperature': jacket_temp,
'Reactor Pressure': pressure,
'Feed Flow Rate': flow_rate,
'Product Concentration': concentration
}
def check_process_alarms(self, data):
"""Check process alarms"""
alarms = []
# Temperature alarms
if data['Reactor Temperature'] > 180:
alarms.append({'severity': 'CRITICAL', 'variable': 'Reactor Temperature',
'message': 'High Temperature Alarm', 'value': data['Reactor Temperature']})
elif data['Reactor Temperature'] > 177:
alarms.append({'severity': 'WARNING', 'variable': 'Reactor Temperature',
'message': 'High Temperature Warning', 'value': data['Reactor Temperature']})
elif data['Reactor Temperature'] < 173:
alarms.append({'severity': 'WARNING', 'variable': 'Reactor Temperature',
'message': 'Low Temperature Warning', 'value': data['Reactor Temperature']})
# Pressure alarms
if data['Reactor Pressure'] > 1.6 or data['Reactor Pressure'] < 1.4:
alarms.append({'severity': 'CRITICAL', 'variable': 'Reactor Pressure',
'message': 'Pressure Abnormal', 'value': data['Reactor Pressure']})
# Concentration alarms
if data['Product Concentration'] < 75:
alarms.append({'severity': 'WARNING', 'variable': 'Product Concentration',
'message': 'Quality Degradation', 'value': data['Product Concentration']})
self.active_alarms = alarms
return alarms
def update(self):
"""System update (called every second)"""
current_time = datetime.now()
data = self.generate_process_data()
# Add to data buffers
self.time_buffer.append(current_time)
for var, value in data.items():
self.data_buffers[var].append(value)
# Check alarms
alarms = self.check_process_alarms(data)
# Update KPIs
self.kpi_data['running_time'] += 1 # seconds
self.iteration += 1
return data, alarms
def create_integrated_dashboard(self):
"""Create integrated dashboard"""
if len(self.time_buffer) < 10:
return None
# Convert data to numpy arrays
time_array = list(self.time_buffer)
reactor_temp = np.array(list(self.data_buffers['Reactor Temperature']))
jacket_temp = np.array(list(self.data_buffers['Jacket Temperature']))
pressure = np.array(list(self.data_buffers['Reactor Pressure']))
flow_rate = np.array(list(self.data_buffers['Feed Flow Rate']))
concentration = np.array(list(self.data_buffers['Product Concentration']))
# Create subplots
fig = make_subplots(
rows=3, cols=2,
subplot_titles=(
'Reactor Temperature Trend', 'Jacket Temperature Trend',
'Reactor Pressure Trend', 'Feed Flow Rate Trend',
'Product Concentration Trend', 'Current Process Status'
),
specs=[
[{"type": "scatter"}, {"type": "scatter"}],
[{"type": "scatter"}, {"type": "scatter"}],
[{"type": "scatter"}, {"type": "indicator"}]
],
vertical_spacing=0.12,
horizontal_spacing=0.12
)
# Reactor temperature
fig.add_trace(
go.Scatter(x=time_array, y=reactor_temp, mode='lines',
name='Reactor Temperature', line=dict(color='#11998e', width=2)),
row=1, col=1
)
fig.add_hline(y=175, line_dash="dash", line_color="red", row=1, col=1)
fig.add_hrect(y0=173, y1=177, fillcolor="green", opacity=0.1,
line_width=0, row=1, col=1)
# Jacket temperature
fig.add_trace(
go.Scatter(x=time_array, y=jacket_temp, mode='lines',
name='Jacket Temperature', line=dict(color='#f59e0b', width=2)),
row=1, col=2
)
# Pressure
fig.add_trace(
go.Scatter(x=time_array, y=pressure, mode='lines',
name='Pressure', line=dict(color='#7b2cbf', width=2)),
row=2, col=1
)
fig.add_hline(y=1.5, line_dash="dash", line_color="red", row=2, col=1)
# Flow rate
fig.add_trace(
go.Scatter(x=time_array, y=flow_rate, mode='lines',
name='Flow Rate', line=dict(color='#e63946', width=2)),
row=2, col=2
)
fig.add_hline(y=50, line_dash="dash", line_color="red", row=2, col=2)
# Concentration
fig.add_trace(
go.Scatter(x=time_array, y=concentration, mode='lines',
name='Concentration', line=dict(color='#06a77d', width=2)),
row=3, col=1
)
fig.add_hline(y=80, line_dash="dash", line_color="red", row=3, col=1)
# Process status indicator
current_temp = reactor_temp[-1]
fig.add_trace(
go.Indicator(
mode="gauge+number",
value=current_temp,
title={'text': "Reactor Temperature (°C)"},
gauge={
'axis': {'range': [None, 200]},
'bar': {'color': "#11998e"},
'steps': [
{'range': [0, 173], 'color': "lightblue"},
{'range': [173, 177], 'color': "lightgreen"},
{'range': [177, 200], 'color': "lightcoral"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 175
}
}
),
row=3, col=2
)
# Axis labels
fig.update_yaxes(title_text="Temperature (°C)", row=1, col=1)
fig.update_yaxes(title_text="Temperature (°C)", row=1, col=2)
fig.update_yaxes(title_text="Pressure (MPa)", row=2, col=1)
fig.update_yaxes(title_text="Flow Rate (m³/h)", row=2, col=2)
fig.update_yaxes(title_text="Concentration (%)", row=3, col=1)
fig.update_layout(
title_text=f"{self.process_name} - Integrated Monitoring Dashboard",
title_font_size=20,
title_x=0.5,
height=1000,
showlegend=False,
template="plotly_white"
)
return fig
# Integrated monitoring system demonstration
print("=== Integrated Process Monitoring System ===")
print("Chemical Reactor R-101 Monitoring System Starting\n")
# System initialization
monitoring_system = IntegratedProcessMonitoringSystem("Chemical Reactor R-101")
# 10-minute simulation (600 seconds)
print("Starting 10-minute process monitoring...\n")
alarm_count = 0
for i in range(600):
data, alarms = monitoring_system.update()
# Display status every 30 seconds
if (i + 1) % 30 == 0:
print(f"[{i+1:>3}s] "
f"Temp: {data['Reactor Temperature']:>6.2f}°C | "
f"Pressure: {data['Reactor Pressure']:>5.3f} MPa | "
f"Conc: {data['Product Concentration']:>5.2f}% | "
f"Alarms: {len(alarms)}")
if alarms:
for alarm in alarms:
severity_icon = '🔴' if alarm['severity'] == 'CRITICAL' else '🟡'
print(f" {severity_icon} {alarm['variable']}: {alarm['message']} ({alarm['value']:.2f})")
alarm_count += len(alarms)
print("\nProcess monitoring complete!\n")
# Generate integrated dashboard
fig = monitoring_system.create_integrated_dashboard()
if fig:
fig.write_html("integrated_monitoring_system.html")
print("Integrated dashboard saved to 'integrated_monitoring_system.html'.\n")
# Final statistics
print("=== Process Statistics Summary ===")
print(f"Monitoring Time: {monitoring_system.kpi_data['running_time']} seconds (10 minutes)")
print(f"Total Alarms: {alarm_count}")
# Statistics for each variable
print("\nVariable Statistics:")
for var_name, buffer in monitoring_system.data_buffers.items():
data_array = np.array(list(buffer))
print(f" {var_name:<25}: Mean {np.mean(data_array):>7.2f}, "
f"Std Dev {np.std(data_array):>5.2f}, "
f"Max {np.max(data_array):>7.2f}, "
f"Min {np.min(data_array):>7.2f}")
print("\nSystem terminated normally.")
Explanation: This code integrates all elements learned in this chapter (real-time data collection, multi-variable monitoring, alarm management, KPI calculation, dashboard visualization) into a complete process monitoring system. Using a chemical reactor as an example, it implements the configuration of monitoring systems used in actual plants. In real operation, this system is deployed as a web application (Dash, Streamlit, etc.) so multiple operators can access it in real-time.
5.3 Chapter Summary
What We Learned
- Real-time Monitoring System Architecture
- Four-layer structure: data collection, processing, visualization, and storage layers
- Data flow: Sensors → Buffering → Processing → Visualization → Storage
- System design principles and scalability
- Dashboard Design and Visualization
- Building interactive dashboards with Plotly
- Effective placement of trend charts, gauges, and indicators
- UI design principles for operators
- Real-time Data Processing
- Efficient data buffering with deque
- Statistical processing of streaming data
- Real-time anomaly detection
- Alarm Management System
- Alarm classification by severity level (INFO, WARNING, ALARM, CRITICAL)
- Alarm history management and acknowledgement
- Alarm flood prevention strategies
- KPIs and Process Analysis
- OEE (Overall Equipment Effectiveness) calculation and evaluation
- Historical data trend analysis and pattern detection
- Bottleneck analysis and improvement suggestions
- Integrated Monitoring System
- Building a complete system integrating all functions
- Practical case study of a chemical reactor
- Deployment strategy for real operations
Key Points
Data Buffering using deque for fixed-length queues enables memory-efficient real-time processing. Alarm Design with severity classification and appropriate threshold settings reduces operator burden. KPI Visualization through visual display of OEE and its components clarifies improvement opportunities. Integrated Dashboards that monitor multiple variables simultaneously provide an overview of the entire process. Scalability through modular design makes it easy to add variables and extend functionality.
Practical Applications
The monitoring systems learned in this chapter can be applied to real processes across various industries. Chemical Plants benefit from integrated monitoring of reactors, distillation columns, and heat exchangers. Pharmaceutical Plants require GMP-compliant record management and alarm systems. Food Plants need HACCP-compliant temperature and pH management. Semiconductor Manufacturing demands precision monitoring of cleanroom environments. Power Plants rely on efficiency monitoring of boilers and turbines.
Next Steps
Having completed this series, you have acquired fundamental knowledge and implementation capability in process monitoring, practical skills in Statistical Process Control (SPC), anomaly detection techniques using machine learning and deep learning, theory and implementation of PID control, and the capability to build real-time monitoring systems.
For Further Learning:
- Model Predictive Control (MPC): Advanced methods for multivariable control
- Digital Twin: Building virtual process models and predictive simulation
- Reinforcement Learning Control: Adaptive process control using AI
- Soft Sensor Development: Estimation of difficult variables using machine learning
- Cloud Integration: Integration with AWS/Azure/GCP and big data analysis
Congratulations on Completing the Series!
You have completed all 5 chapters of the "Introduction to Process Monitoring and Control Series v1.0". From sensor data acquisition to statistical process control, anomaly detection, PID control, and building real-time monitoring systems, you have acquired comprehensive knowledge and skills in process engineering.
Use this knowledge to move on to the next steps: Consider application to your company's processes, publish and share learned code on GitHub, build soft sensors and dashboards with real data, and develop your career as a process control engineer.
We look forward to your feedback!
If you have suggestions for improving this series, questions, or success stories, please let us know.
Contact: yusuke.hashimoto.b8@tohoku.ac.jp