🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 5: Real-time Process Monitoring Systems in Practice

From Building Real-time Dashboards to Integrated Monitoring Systems

📖 Reading Time: 30-35 minutes 📊 Difficulty: Intermediate to Advanced 💻 Code Examples: 8

This chapter covers Real. You will learn Build interactive dashboard layouts with Plotly and simulated real-time data streaming.

Learning Objectives

By reading this chapter, you will master the following:


5.1 Architecture of Real-time Monitoring Systems

Overall System Design

Real-time process monitoring systems consist of four main components: data collection, processing, visualization, and storage.

graph TB subgraph "Data Collection Layer" A[Sensors] --> B[PLC/DCS] B --> C[OPC UA Server] end subgraph "Data Processing Layer" C --> D[Data Streaming] D --> E[Real-time Processing] E --> F[Anomaly Detection] E --> G[Control Loop] end subgraph "Visualization Layer" F --> H[Dashboard] G --> H E --> I[Alarm System] I --> H end subgraph "Data Storage Layer" E --> J[Historian Database] F --> J J --> K[Historical Analysis] end style A fill:#e8f5e9 style B fill:#c8e6c9 style C fill:#a5d6a7 style D fill:#81c784 style E fill:#66bb6a style F fill:#4caf50 style G fill:#388e3c style H fill:#2e7d32 style I fill:#1b5e20 style J fill:#f1f8e9 style K fill:#dcedc8

Real-time Data Flow

In monitoring systems, the following data flow is executed continuously:

  1. Data Acquisition: Sensors → PLC → Data Collection Server (1 second to 1 minute intervals)
  2. Buffering: Memory-efficient storage with deque or ring buffers
  3. Real-time Processing: Statistical calculations, anomaly detection, control loop execution
  4. Visualization Update: Graphs, gauges, alarm displays (1 to 10 second intervals)
  5. Database Storage: Long-term storage in Historian (1 minute to 1 hour intervals)

5.2 Code Examples: Implementing Real-time Monitoring Systems

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

"""
Example: 5.2 Code Examples: Implementing Real-time Monitoring Systems

Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 5-15 seconds
Dependencies: None
"""

<h4>Code Example 1: Real-time Dashboard Layout Design with Plotly</h4>

<p><strong>Purpose</strong>: Design a dashboard layout for process monitoring using Plotly (static demonstration).</p>

<pre><code class="language-python">import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import pandas as pd

# Generate simulation data
np.random.seed(42)
time_points = 100
time = pd.date_range('2025-01-01 00:00:00', periods=time_points, freq='1min')

# Process variable data
temperature = 175 + np.random.normal(0, 2, time_points) + 3 * np.sin(np.linspace(0, 4*np.pi, time_points))
pressure = 1.5 + np.random.normal(0, 0.05, time_points)
flow_rate = 50 + np.random.normal(0, 3, time_points)

# Create dashboard layout
# Arrange 4 subplots: temperature trend, pressure trend, flow rate trend, temperature gauge
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=('Reactor Temperature Trend', 'Pressure Trend', 'Flow Rate Trend', 'Current Temperature'),
    specs=[
        [{"type": "scatter"}, {"type": "scatter"}],
        [{"type": "scatter"}, {"type": "indicator"}]
    ],
    vertical_spacing=0.12,
    horizontal_spacing=0.1
)

# Temperature trend (upper left)
fig.add_trace(
    go.Scatter(
        x=time,
        y=temperature,
        mode='lines',
        name='Temperature',
        line=dict(color='#11998e', width=2),
        fill='tozeroy',
        fillcolor='rgba(17, 153, 142, 0.1)'
    ),
    row=1, col=1
)

# Add control limit lines
fig.add_hline(y=175, line_dash="dash", line_color="red",
              annotation_text="Target Value", row=1, col=1)
fig.add_hrect(y0=173, y1=177, fillcolor="green", opacity=0.1,
              line_width=0, row=1, col=1)

# Pressure trend (upper right)
fig.add_trace(
    go.Scatter(
        x=time,
        y=pressure,
        mode='lines',
        name='Pressure',
        line=dict(color='#f59e0b', width=2)
    ),
    row=1, col=2
)
fig.add_hline(y=1.5, line_dash="dash", line_color="red", row=1, col=2)

# Flow rate trend (lower left)
fig.add_trace(
    go.Scatter(
        x=time,
        y=flow_rate,
        mode='lines',
        name='Flow Rate',
        line=dict(color='#7b2cbf', width=2)
    ),
    row=2, col=1
)
fig.add_hline(y=50, line_dash="dash", line_color="red", row=2, col=1)

# Temperature gauge (lower right)
current_temp = temperature[-1]
fig.add_trace(
    go.Indicator(
        mode="gauge+number+delta",
        value=current_temp,
        title={'text': "Reactor Temperature (°C)"},
        delta={'reference': 175, 'increasing': {'color': "red"}, 'decreasing': {'color': "blue"}},
        gauge={
            'axis': {'range': [None, 200]},
            'bar': {'color': "#11998e"},
            'steps': [
                {'range': [0, 173], 'color': "lightblue"},
                {'range': [173, 177], 'color': "lightgreen"},
                {'range': [177, 200], 'color': "lightcoral"}
            ],
            'threshold': {
                'line': {'color': "red", 'width': 4},
                'thickness': 0.75,
                'value': 175
            }
        }
    ),
    row=2, col=2
)

# Layout settings
fig.update_xaxes(title_text="Time", row=1, col=1)
fig.update_xaxes(title_text="Time", row=1, col=2)
fig.update_xaxes(title_text="Time", row=2, col=1)

fig.update_yaxes(title_text="Temperature (°C)", row=1, col=1)
fig.update_yaxes(title_text="Pressure (MPa)", row=1, col=2)
fig.update_yaxes(title_text="Flow Rate (m³/h)", row=2, col=1)

fig.update_layout(
    title_text="Process Monitoring Dashboard - Chemical Reactor",
    title_font_size=20,
    title_x=0.5,
    height=800,
    showlegend=False,
    template="plotly_white"
)

# Save as HTML (viewable in browser)
fig.write_html("process_monitoring_dashboard.html")
print("Dashboard saved to 'process_monitoring_dashboard.html'.")
print("Open it in your browser to view.")

# Statistics summary
print("\n=== Current Process Status ===")
print(f"Reactor Temperature: {current_temp:.2f} °C (Target: 175°C)")
print(f"Pressure: {pressure[-1]:.3f} MPa (Target: 1.5 MPa)")
print(f"Flow Rate: {flow_rate[-1]:.2f} m³/h (Target: 50 m³/h)")

# Check alarm status
temp_alarm = "Normal" if 173 <= current_temp <= 177 else "Warning"
pressure_alarm = "Normal" if 1.45 <= pressure[-1] <= 1.55 else "Warning"
flow_alarm = "Normal" if 45 <= flow_rate[-1] <= 55 else "Warning"

print(f"\n=== Alarm Status ===")
print(f"Temperature: {temp_alarm}")
print(f"Pressure: {pressure_alarm}")
print(f"Flow Rate: {flow_alarm}")

Explanation: This code designs a process monitoring dashboard layout using Plotly. By combining trend charts (time-series data) with gauges (current values), it creates an interface that allows operators to intuitively grasp the process status. In actual processes, this is deployed as a web application.

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Explanation: This code designs a process monitoring dashboar

Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""

<h4>Code Example 2: Simulated Real-time Data Streaming</h4>

<p><strong>Purpose</strong>: Simulate real-time data streaming using a deque buffer.</p>

<pre><code class="language-python">import numpy as np
import pandas as pd
from collections import deque
import matplotlib.pyplot as plt
import time

# Japanese font settings
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False

class RealTimeDataStream:
    """
    Real-time data streaming simulator

    Parameters:
    -----------
    buffer_size : int
        Data buffer size
    sampling_interval : float
        Sampling interval (seconds)
    """

    def __init__(self, buffer_size=100, sampling_interval=1.0):
        self.buffer_size = buffer_size
        self.sampling_interval = sampling_interval

        # Data buffers (fixed-length queues)
        self.time_buffer = deque(maxlen=buffer_size)
        self.temp_buffer = deque(maxlen=buffer_size)
        self.pressure_buffer = deque(maxlen=buffer_size)
        self.flow_buffer = deque(maxlen=buffer_size)

        # Start time
        self.start_time = pd.Timestamp.now()
        self.iteration = 0

    def generate_sensor_data(self):
        """Generate sensor data (in practice, acquired from PLC/DCS)"""
        elapsed = self.iteration * self.sampling_interval

        # Reactor temperature (periodic variation + noise)
        temp_base = 175.0
        temp_variation = 3.0 * np.sin(2 * np.pi * elapsed / 300)
        temp_noise = np.random.normal(0, 0.8)
        temperature = temp_base + temp_variation + temp_noise

        # Pressure (slight variation)
        pressure = 1.5 + np.random.normal(0, 0.02)

        # Flow rate (simulate step changes)
        if elapsed < 60:
            flow_base = 50.0
        elif elapsed < 120:
            flow_base = 55.0  # Increase flow rate at 60 seconds
        else:
            flow_base = 50.0  # Return to original at 120 seconds

        flow_rate = flow_base + np.random.normal(0, 2.0)

        return temperature, pressure, flow_rate

    def update(self):
        """Update data stream"""
        current_time = self.start_time + pd.Timedelta(seconds=self.iteration * self.sampling_interval)
        temp, pressure, flow = self.generate_sensor_data()

        # Add to buffers
        self.time_buffer.append(current_time)
        self.temp_buffer.append(temp)
        self.pressure_buffer.append(pressure)
        self.flow_buffer.append(flow)

        self.iteration += 1

        return current_time, temp, pressure, flow

    def get_statistics(self):
        """Statistics of buffered data"""
        if len(self.temp_buffer) == 0:
            return None

        stats = {
            'temp_mean': np.mean(self.temp_buffer),
            'temp_std': np.std(self.temp_buffer),
            'temp_latest': self.temp_buffer[-1],
            'pressure_mean': np.mean(self.pressure_buffer),
            'pressure_latest': self.pressure_buffer[-1],
            'flow_mean': np.mean(self.flow_buffer),
            'flow_latest': self.flow_buffer[-1],
            'buffer_utilization': len(self.temp_buffer) / self.buffer_size * 100
        }

        return stats


# Real-time streaming demonstration
print("=== Real-time Data Streaming Started ===")
print("Collecting data for 180 seconds...\n")

stream = RealTimeDataStream(buffer_size=200, sampling_interval=1.0)

# Data collection (180 seconds, every 1 second)
duration = 180  # seconds
for i in range(duration):
    timestamp, temp, pressure, flow = stream.update()

    # Display progress every 10 seconds
    if (i + 1) % 10 == 0:
        stats = stream.get_statistics()
        print(f"[{timestamp.strftime('%H:%M:%S')}] "
              f"Temp: {temp:.2f}°C (Mean: {stats['temp_mean']:.2f}°C) | "
              f"Pressure: {pressure:.3f} MPa | "
              f"Flow: {flow:.2f} m³/h")

    # In actual real-time systems, use time.sleep()
    # Skipped here for speed

print("\nData collection complete!\n")

# Visualize collected data
fig, axes = plt.subplots(3, 1, figsize=(14, 12))

# Temperature trend
axes[0].plot(list(stream.time_buffer), list(stream.temp_buffer),
             color='#11998e', linewidth=1.5, label='Temperature')
axes[0].axhline(y=175, color='red', linestyle='--', linewidth=2, label='Target Value')
axes[0].fill_between(list(stream.time_buffer), 173, 177, alpha=0.15, color='green', label='Control Range')
axes[0].set_ylabel('Temperature (°C)', fontsize=12)
axes[0].set_title('Real-time Streaming Data - Reactor Temperature', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)

# Pressure trend
axes[1].plot(list(stream.time_buffer), list(stream.pressure_buffer),
             color='#f59e0b', linewidth=1.5, label='Pressure')
axes[1].axhline(y=1.5, color='red', linestyle='--', linewidth=2, label='Target Value')
axes[1].set_ylabel('Pressure (MPa)', fontsize=12)
axes[1].set_title('Pressure Trend', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)

# Flow rate trend
axes[2].plot(list(stream.time_buffer), list(stream.flow_buffer),
             color='#7b2cbf', linewidth=1.5, label='Flow Rate')
axes[2].axhline(y=50, color='red', linestyle='--', linewidth=2, label='Target Value')
axes[2].axvline(x=list(stream.time_buffer)[60], color='orange', linestyle=':', alpha=0.5, label='Flow Change')
axes[2].axvline(x=list(stream.time_buffer)[120], color='orange', linestyle=':', alpha=0.5)
axes[2].set_xlabel('Time', fontsize=12)
axes[2].set_ylabel('Flow Rate (m³/h)', fontsize=12)
axes[2].set_title('Flow Rate Trend', fontsize=14, fontweight='bold')
axes[2].legend()
axes[2].grid(alpha=0.3)

plt.tight_layout()
plt.show()

# Final statistics
final_stats = stream.get_statistics()
print("=== Final Statistics ===")
for key, value in final_stats.items():
    print(f"  {key}: {value:.2f}")

Explanation: This code implements efficient data buffering using deque. Fixed-length queues (deque) are ideal for memory-efficient real-time processing as they automatically remove old data while adding new data. In actual processes, data is continuously acquired from PLC/DCS with this type of buffering.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0


<h4>Code Example 3: Multi-chart Monitoring Interface</h4>

<p><strong>Purpose</strong>: Build a comprehensive monitoring interface for simultaneous monitoring of multiple process variables.</p>

<pre><code class="language-python">import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import pandas as pd

class ProcessMonitoringInterface:
    """
    Multi-chart process monitoring interface

    Parameters:
    -----------
    process_name : str
        Process name
    variables : list of dict
        List of monitoring variables (name, unit, target, range)
    """

    def __init__(self, process_name, variables):
        self.process_name = process_name
        self.variables = variables
        self.n_vars = len(variables)

    def create_monitoring_dashboard(self, time_data, variable_data):
        """
        Create monitoring dashboard

        Parameters:
        -----------
        time_data : array-like
            Time data
        variable_data : dict
            Data dictionary with variable names as keys

        Returns:
        --------
        fig : plotly.graph_objects.Figure
        """
        # Subplot configuration (dynamically arranged based on number of variables)
        n_rows = int(np.ceil(self.n_vars / 2))
        n_cols = 2 if self.n_vars > 1 else 1

        fig = make_subplots(
            rows=n_rows,
            cols=n_cols,
            subplot_titles=[var['name'] for var in self.variables],
            vertical_spacing=0.12,
            horizontal_spacing=0.1
        )

        # Add trend chart for each variable
        for idx, var in enumerate(self.variables):
            row = idx // 2 + 1
            col = idx % 2 + 1

            data = variable_data[var['name']]

            # Trend line
            fig.add_trace(
                go.Scatter(
                    x=time_data,
                    y=data,
                    mode='lines',
                    name=var['name'],
                    line=dict(color=var.get('color', '#11998e'), width=2)
                ),
                row=row, col=col
            )

            # Target value line
            if 'target' in var:
                fig.add_hline(
                    y=var['target'],
                    line_dash="dash",
                    line_color="red",
                    annotation_text=f"Target: {var['target']}",
                    row=row, col=col
                )

            # Control range
            if 'range' in var:
                lower, upper = var['range']
                fig.add_hrect(
                    y0=lower, y1=upper,
                    fillcolor="green", opacity=0.1,
                    line_width=0,
                    row=row, col=col
                )

            # Axis labels
            fig.update_xaxes(title_text="Time", row=row, col=col)
            fig.update_yaxes(title_text=f"{var['name']} ({var['unit']})", row=row, col=col)

        # Overall layout
        fig.update_layout(
            title_text=f"{self.process_name} - Multi-variable Monitoring Dashboard",
            title_font_size=20,
            title_x=0.5,
            height=300 * n_rows,
            showlegend=False,
            template="plotly_white"
        )

        return fig


# Example of building monitoring interface
# Define monitoring variables for chemical reactor
variables = [
    {'name': 'Reactor Temperature', 'unit': '°C', 'target': 175, 'range': (173, 177), 'color': '#11998e'},
    {'name': 'Jacket Temperature', 'unit': '°C', 'target': 165, 'range': (163, 167), 'color': '#f59e0b'},
    {'name': 'Reactor Pressure', 'unit': 'MPa', 'target': 1.5, 'range': (1.45, 1.55), 'color': '#7b2cbf'},
    {'name': 'Feed Flow Rate', 'unit': 'm³/h', 'target': 50, 'range': (48, 52), 'color': '#e63946'},
    {'name': 'Cooling Water Flow', 'unit': 'm³/h', 'target': 100, 'range': (95, 105), 'color': '#06a77d'},
    {'name': 'pH', 'unit': '-', 'target': 7.0, 'range': (6.8, 7.2), 'color': '#ff006e'}
]

# Generate data (24 hours, 1 minute interval)
np.random.seed(42)
n_points = 1440
time_data = pd.date_range('2025-01-01 00:00:00', periods=n_points, freq='1min')

# Generate data for each variable
variable_data = {}

# Reactor temperature
variable_data['Reactor Temperature'] = 175 + np.random.normal(0, 1.5, n_points) + \
                              2 * np.sin(2 * np.pi * np.arange(n_points) / 360)

# Jacket temperature
variable_data['Jacket Temperature'] = 165 + np.random.normal(0, 1.2, n_points) + \
                                 1.5 * np.sin(2 * np.pi * np.arange(n_points) / 360)

# Reactor pressure
variable_data['Reactor Pressure'] = 1.5 + np.random.normal(0, 0.02, n_points)

# Feed flow rate
variable_data['Feed Flow Rate'] = 50 + np.random.normal(0, 1.5, n_points)

# Cooling water flow
variable_data['Cooling Water Flow'] = 100 + np.random.normal(0, 2.5, n_points)

# pH
variable_data['pH'] = 7.0 + np.random.normal(0, 0.15, n_points)

# Create monitoring interface
interface = ProcessMonitoringInterface(
    process_name="Chemical Reactor R-101",
    variables=variables
)

fig = interface.create_monitoring_dashboard(time_data, variable_data)

# Save as HTML
fig.write_html("multi_variable_monitoring_dashboard.html")
print("Multi-variable monitoring dashboard saved.")

# Current status summary
print("\n=== Current Process Status (Latest Values) ===")
for var in variables:
    latest_value = variable_data[var['name']][-1]
    target = var.get('target', None)
    status = "Normal"

    if target and 'range' in var:
        lower, upper = var['range']
        if not (lower <= latest_value <= upper):
            status = "Warning"

    print(f"{var['name']:<20}: {latest_value:>7.2f} {var['unit']:<5} (Target: {target}) - {status}")

Explanation: This multi-chart monitoring interface dynamically generates a dashboard that can monitor multiple process variables simultaneously. By visualizing target values and control ranges for each variable, operators can grasp the process status at a glance. In actual plants, dozens to hundreds of variables are monitored.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0


<h4>Code Example 4: Implementing an Alarm Notification System</h4>

<p><strong>Purpose</strong>: Build an alarm notification system with severity levels and manage alarm logs.</p>

<pre><code class="language-python">import pandas as pd
import numpy as np
from datetime import datetime
from enum import Enum

class AlarmSeverity(Enum):
    """Alarm severity levels"""
    INFO = 1      # Information
    WARNING = 2   # Warning
    ALARM = 3     # Alarm
    CRITICAL = 4  # Critical

class AlarmManager:
    """
    Process alarm management system

    Parameters:
    -----------
    alarm_rules : list of dict
        List of alarm rules
    """

    def __init__(self, alarm_rules):
        self.alarm_rules = alarm_rules
        self.active_alarms = {}
        self.alarm_history = []

    def check_alarms(self, process_data):
        """
        Check alarm conditions

        Parameters:
        -----------
        process_data : dict
            Current values of process variables

        Returns:
        --------
        new_alarms : list
            Newly triggered alarms
        """
        new_alarms = []
        current_time = datetime.now()

        for rule in self.alarm_rules:
            variable = rule['variable']
            value = process_data.get(variable, None)

            if value is None:
                continue

            # Check alarm condition
            alarm_triggered = self._evaluate_condition(value, rule)

            alarm_id = f"{variable}_{rule['name']}"

            if alarm_triggered:
                # Alarm triggered
                if alarm_id not in self.active_alarms:
                    alarm = {
                        'id': alarm_id,
                        'variable': variable,
                        'name': rule['name'],
                        'severity': rule['severity'],
                        'value': value,
                        'condition': rule['condition'],
                        'threshold': rule['threshold'],
                        'timestamp': current_time,
                        'acknowledged': False
                    }

                    self.active_alarms[alarm_id] = alarm
                    self.alarm_history.append(alarm.copy())
                    new_alarms.append(alarm)
            else:
                # Alarm cleared
                if alarm_id in self.active_alarms:
                    alarm = self.active_alarms.pop(alarm_id)
                    alarm['cleared_time'] = current_time
                    alarm['duration'] = (current_time - alarm['timestamp']).total_seconds()

        return new_alarms

    def _evaluate_condition(self, value, rule):
        """Evaluate alarm condition"""
        condition = rule['condition']
        threshold = rule['threshold']

        if condition == 'greater_than':
            return value > threshold
        elif condition == 'less_than':
            return value < threshold
        elif condition == 'out_of_range':
            lower, upper = threshold
            return value < lower or value > upper
        elif condition == 'deviation':
            target = rule['target']
            deviation = rule['threshold']
            return abs(value - target) > deviation
        else:
            return False

    def acknowledge_alarm(self, alarm_id):
        """Acknowledge alarm"""
        if alarm_id in self.active_alarms:
            self.active_alarms[alarm_id]['acknowledged'] = True
            return True
        return False

    def get_active_alarms(self, severity=None):
        """
        Get active alarms

        Parameters:
        -----------
        severity : AlarmSeverity or None
            Filter by severity

        Returns:
        --------
        alarms : list
        """
        alarms = list(self.active_alarms.values())

        if severity:
            alarms = [a for a in alarms if a['severity'] == severity]

        # Sort by severity in descending order
        alarms.sort(key=lambda x: x['severity'].value, reverse=True)

        return alarms

    def get_alarm_statistics(self):
        """Get alarm statistics"""
        total_alarms = len(self.alarm_history)
        active_count = len(self.active_alarms)
        acknowledged_count = sum(1 for a in self.active_alarms.values() if a['acknowledged'])

        severity_counts = {}
        for severity in AlarmSeverity:
            count = sum(1 for a in self.alarm_history if a['severity'] == severity)
            severity_counts[severity.name] = count

        stats = {
            'total_alarms': total_alarms,
            'active_alarms': active_count,
            'acknowledged_alarms': acknowledged_count,
            'unacknowledged_alarms': active_count - acknowledged_count,
            'severity_breakdown': severity_counts
        }

        return stats


# Define alarm rules
alarm_rules = [
    {
        'variable': 'Reactor Temperature',
        'name': 'High Temperature Warning',
        'severity': AlarmSeverity.WARNING,
        'condition': 'greater_than',
        'threshold': 177
    },
    {
        'variable': 'Reactor Temperature',
        'name': 'High Temperature Alarm',
        'severity': AlarmSeverity.ALARM,
        'condition': 'greater_than',
        'threshold': 180
    },
    {
        'variable': 'Reactor Temperature',
        'name': 'Low Temperature Warning',
        'severity': AlarmSeverity.WARNING,
        'condition': 'less_than',
        'threshold': 173
    },
    {
        'variable': 'Reactor Pressure',
        'name': 'Pressure Abnormal',
        'severity': AlarmSeverity.CRITICAL,
        'condition': 'out_of_range',
        'threshold': (1.4, 1.6)
    },
    {
        'variable': 'Feed Flow Rate',
        'name': 'Flow Deviation',
        'severity': AlarmSeverity.WARNING,
        'condition': 'deviation',
        'target': 50,
        'threshold': 5
    }
]

# Initialize alarm manager
alarm_mgr = AlarmManager(alarm_rules)

# Simulation: 1 hour of process operation
np.random.seed(42)
n_samples = 60  # Every 1 minute, for 60 minutes

print("=== Process Alarm Monitoring System ===\n")
print("Starting 1-hour simulation...\n")

for i in range(n_samples):
    # Simulate process data
    process_data = {
        'Reactor Temperature': 175 + np.random.normal(0, 2) + 5 * np.sin(2 * np.pi * i / 60),
        'Reactor Pressure': 1.5 + np.random.normal(0, 0.05),
        'Feed Flow Rate': 50 + np.random.normal(0, 3)
    }

    # Check alarms
    new_alarms = alarm_mgr.check_alarms(process_data)

    # Display new alarms
    if new_alarms:
        for alarm in new_alarms:
            severity_color = {
                AlarmSeverity.INFO: '🔵',
                AlarmSeverity.WARNING: '🟡',
                AlarmSeverity.ALARM: '🟠',
                AlarmSeverity.CRITICAL: '🔴'
            }
            icon = severity_color.get(alarm['severity'], '⚪')

            print(f"[{alarm['timestamp'].strftime('%H:%M:%S')}] {icon} {alarm['severity'].name}: "
                  f"{alarm['variable']} - {alarm['name']} "
                  f"(Value: {alarm['value']:.2f})")

# Final statistics
print("\n" + "="*60)
print("=== Alarm Statistics Summary ===")
stats = alarm_mgr.get_alarm_statistics()

print(f"\nTotal Alarms: {stats['total_alarms']}")
print(f"Active Alarms: {stats['active_alarms']}")
print(f"  - Unacknowledged: {stats['unacknowledged_alarms']}")
print(f"  - Acknowledged: {stats['acknowledged_alarms']}")

print("\nBreakdown by Severity:")
for severity, count in stats['severity_breakdown'].items():
    print(f"  {severity:<10}: {count:>3} alarms")

# List of active alarms
active_alarms = alarm_mgr.get_active_alarms()
if active_alarms:
    print("\n=== Current Active Alarms ===")
    for alarm in active_alarms:
        duration = (datetime.now() - alarm['timestamp']).total_seconds()
        ack_status = "Acknowledged" if alarm['acknowledged'] else "Unacknowledged"
        print(f"  - [{alarm['severity'].name}] {alarm['variable']}: {alarm['name']} "
              f"({duration:.0f}s duration, {ack_status})")
else:
    print("\nNo active alarms currently.")

# Convert alarm history to DataFrame
if alarm_mgr.alarm_history:
    df_alarms = pd.DataFrame(alarm_mgr.alarm_history)
    df_alarms['severity_name'] = df_alarms['severity'].apply(lambda x: x.name)

    print("\n=== Alarm History Top 10 ===")
    print(df_alarms[['timestamp', 'variable', 'name', 'severity_name', 'value']].head(10).to_string(index=False))

Expected Output:

=== Process Alarm Monitoring System ===

Starting 1-hour simulation...

[14:23:12] 🟡 WARNING: Reactor Temperature - High Temperature Warning (Value: 178.45)
[14:31:45] 🟡 WARNING: Feed Flow Rate - Flow Deviation (Value: 56.23)
[14:42:18] 🔴 CRITICAL: Reactor Pressure - Pressure Abnormal (Value: 1.62)

============================================================
=== Alarm Statistics Summary ===

Total Alarms: 8
Active Alarms: 2
  - Unacknowledged: 2
  - Acknowledged: 0

Breakdown by Severity:
  INFO      :   0 alarms
  WARNING   :   5 alarms
  ALARM     :   1 alarms
  CRITICAL  :   2 alarms

=== Current Active Alarms ===
  - [CRITICAL] Reactor Pressure: Pressure Abnormal (123s duration, Unacknowledged)
  - [WARNING] Reactor Temperature: High Temperature Warning (89s duration, Unacknowledged)

Explanation: This alarm system continuously monitors process variables and notifies abnormal conditions by severity level. With alarm history recording, acknowledgement functionality, and statistical reports, operators can manage alarms efficiently. In actual plants, email notifications and Slack integration are also implemented.

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Explanation: This alarm system continuously monitors process

Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""

<h4>Code Example 5: Historical Data Trend Analysis and Pattern Detection</h4>

<p><strong>Purpose</strong>: Analyze trends from historical data and automatically detect peaks, valleys, and trends.</p>

<pre><code class="language-python">import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import find_peaks, savgol_filter

# Japanese font settings
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False

def analyze_historical_trends(time_data, values, variable_name):
    """
    Historical data trend analysis

    Parameters:
    -----------
    time_data : array-like
        Time data
    values : array-like
        Variable values
    variable_name : str
        Variable name

    Returns:
    --------
    analysis : dict
        Analysis results
    """
    # Extract trend with moving average
    window_size = 60  # 60-point moving average
    trend = pd.Series(values).rolling(window=window_size, center=True).mean().values

    # Smooth with Savitzky-Golay filter
    if len(values) > 51:
        smoothed = savgol_filter(values, window_length=51, polyorder=3)
    else:
        smoothed = values

    # Peak detection
    peaks, peak_props = find_peaks(smoothed, prominence=1.5, distance=30)
    valleys, valley_props = find_peaks(-smoothed, prominence=1.5, distance=30)

    # Statistics
    mean_value = np.mean(values)
    std_value = np.std(values)
    max_value = np.max(values)
    min_value = np.min(values)

    # Trend slope (least squares method)
    x = np.arange(len(values))
    slope = np.polyfit(x, values, 1)[0]

    analysis = {
        'variable': variable_name,
        'trend': trend,
        'smoothed': smoothed,
        'peaks': peaks,
        'valleys': valleys,
        'peak_values': values[peaks],
        'valley_values': values[valleys],
        'mean': mean_value,
        'std': std_value,
        'max': max_value,
        'min': min_value,
        'slope': slope,
        'n_peaks': len(peaks),
        'n_valleys': len(valleys)
    }

    return analysis


# Generate historical data (7 days, 1 hour interval)
np.random.seed(42)
n_days = 7
n_points = n_days * 24
time_data = pd.date_range('2025-01-01', periods=n_points, freq='1h')

# Reactor temperature data (daily cycle + trend + noise)
base_temp = 175
daily_cycle = 5 * np.sin(2 * np.pi * np.arange(n_points) / 24)  # Daily cycle
weekly_trend = 3 * np.sin(2 * np.pi * np.arange(n_points) / (7*24))  # Weekly cycle
noise = np.random.normal(0, 1.2, n_points)
temperature = base_temp + daily_cycle + weekly_trend + noise

# Execute trend analysis
analysis = analyze_historical_trends(time_data, temperature, 'Reactor Temperature')

# Visualization
fig, axes = plt.subplots(3, 1, figsize=(16, 12))

# Original data and trend
axes[0].plot(time_data, temperature, 'b-', linewidth=0.8, alpha=0.5, label='Raw Data')
axes[0].plot(time_data, analysis['smoothed'], 'r-', linewidth=2, label='Smoothed Data')
axes[0].plot(time_data, analysis['trend'], 'g--', linewidth=2, label='Trend (Moving Average)')
axes[0].scatter(time_data[analysis['peaks']], analysis['peak_values'],
                color='red', s=80, marker='^', label='Peaks', zorder=5)
axes[0].scatter(time_data[analysis['valleys']], analysis['valley_values'],
                color='blue', s=80, marker='v', label='Valleys', zorder=5)
axes[0].axhline(y=analysis['mean'], color='black', linestyle='--', alpha=0.5, label='Mean Value')
axes[0].set_ylabel('Temperature (°C)', fontsize=12)
axes[0].set_title('Historical Data Trend Analysis - Reactor Temperature (7 days)', fontsize=14, fontweight='bold')
axes[0].legend(loc='upper right')
axes[0].grid(alpha=0.3)

# Histogram (distribution analysis)
axes[1].hist(temperature, bins=30, color='#11998e', alpha=0.7, edgecolor='black')
axes[1].axvline(x=analysis['mean'], color='red', linestyle='--', linewidth=2,
                label=f"Mean: {analysis['mean']:.2f}°C")
axes[1].axvline(x=analysis['mean'] + analysis['std'], color='orange', linestyle=':',
                linewidth=2, label=f"±1σ: {analysis['std']:.2f}°C")
axes[1].axvline(x=analysis['mean'] - analysis['std'], color='orange', linestyle=':', linewidth=2)
axes[1].set_xlabel('Temperature (°C)', fontsize=12)
axes[1].set_ylabel('Frequency', fontsize=12)
axes[1].set_title('Temperature Distribution', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)

# Daily statistics (box plot)
df_temp = pd.DataFrame({'timestamp': time_data, 'temperature': temperature})
df_temp['day'] = df_temp['timestamp'].dt.day
daily_data = [df_temp[df_temp['day'] == day]['temperature'].values for day in range(1, n_days+1)]

bp = axes[2].boxplot(daily_data, labels=[f'Day {i+1}' for i in range(n_days)],
                      patch_artist=True, showmeans=True)

for patch in bp['boxes']:
    patch.set_facecolor('#11998e')
    patch.set_alpha(0.6)

axes[2].set_xlabel('Day', fontsize=12)
axes[2].set_ylabel('Temperature (°C)', fontsize=12)
axes[2].set_title('Daily Temperature Distribution (Box Plot)', fontsize=13, fontweight='bold')
axes[2].grid(alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

# Analysis results summary
print("=== Historical Data Trend Analysis Results ===\n")
print(f"Variable: {analysis['variable']}")
print(f"Period: {time_data[0]} to {time_data[-1]} ({n_days} days)")
print(f"\nStatistics:")
print(f"  Mean: {analysis['mean']:.2f} °C")
print(f"  Std Dev: {analysis['std']:.2f} °C")
print(f"  Max: {analysis['max']:.2f} °C")
print(f"  Min: {analysis['min']:.2f} °C")
print(f"  Range: {analysis['max'] - analysis['min']:.2f} °C")

print(f"\nTrend:")
print(f"  Slope: {analysis['slope']:.4f} °C/hour")
print(f"  7-day change: {analysis['slope'] * n_points:.2f} °C")

print(f"\nPattern Detection:")
print(f"  Number of peaks: {analysis['n_peaks']}")
print(f"  Number of valleys: {analysis['n_valleys']}")

if analysis['n_peaks'] > 0:
    print(f"  Average peak value: {np.mean(analysis['peak_values']):.2f} °C")
if analysis['n_valleys'] > 0:
    print(f"  Average valley value: {np.mean(analysis['valley_values']):.2f} °C")

# Detect abnormal periods (outside ±3σ range)
outliers = np.abs(temperature - analysis['mean']) > 3 * analysis['std']
outlier_count = np.sum(outliers)

print(f"\nAnomaly Detection:")
print(f"  Points outside ±3σ range: {outlier_count} ({outlier_count/len(temperature)*100:.2f}%)")

if outlier_count > 0:
    outlier_times = time_data[outliers]
    print(f"  First anomaly time: {outlier_times[0]}")
    print(f"  Last anomaly time: {outlier_times[-1]}")

Explanation: Historical data analysis is important for understanding long-term process trends and detecting abnormal patterns. This code implements trend extraction with moving averages, smoothing with Savitzky-Golay filters, and automatic detection of peaks and valleys. Box plots for daily comparisons are effective for identifying periodic patterns and abnormal days.

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
# - plotly>=5.14.0


<h4>Code Example 6: KPI Calculation and Dashboard Display</h4>

<p><strong>Purpose</strong>: Calculate and visualize key KPIs (OEE, availability, quality rate) used in process industries.</p>

<pre><code class="language-python">import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots

class ProcessKPICalculator:
    """
    Process KPI calculation system

    Key KPIs:
    - OEE (Overall Equipment Effectiveness)
    - Availability
    - Performance
    - Quality
    """

    def __init__(self, planned_production_time, ideal_cycle_time):
        """
        Parameters:
        -----------
        planned_production_time : float
            Planned operating time (minutes)
        ideal_cycle_time : float
            Ideal cycle time (minutes/unit)
        """
        self.planned_production_time = planned_production_time
        self.ideal_cycle_time = ideal_cycle_time

    def calculate_availability(self, actual_production_time):
        """
        Availability = Actual Operating Time / Planned Operating Time

        Parameters:
        -----------
        actual_production_time : float
            Actual operating time (minutes)

        Returns:
        --------
        availability : float
            Availability (0-1)
        """
        availability = actual_production_time / self.planned_production_time
        return min(availability, 1.0)

    def calculate_performance(self, actual_output, actual_production_time):
        """
        Performance = (Ideal Cycle Time × Actual Output) / Actual Operating Time

        Parameters:
        -----------
        actual_output : int
            Actual production quantity (units)
        actual_production_time : float
            Actual operating time (minutes)

        Returns:
        --------
        performance : float
            Performance (0-1)
        """
        ideal_production_time = self.ideal_cycle_time * actual_output
        performance = ideal_production_time / actual_production_time
        return min(performance, 1.0)

    def calculate_quality(self, actual_output, good_output):
        """
        Quality = Good Units / Total Production

        Parameters:
        -----------
        actual_output : int
            Total production (units)
        good_output : int
            Good units (units)

        Returns:
        --------
        quality : float
            Quality rate (0-1)
        """
        if actual_output == 0:
            return 0.0

        quality = good_output / actual_output
        return min(quality, 1.0)

    def calculate_oee(self, actual_production_time, actual_output, good_output):
        """
        OEE = Availability × Performance × Quality

        Parameters:
        -----------
        actual_production_time : float
            Actual operating time (minutes)
        actual_output : int
            Actual production (units)
        good_output : int
            Good units (units)

        Returns:
        --------
        oee_metrics : dict
            OEE and its components
        """
        availability = self.calculate_availability(actual_production_time)
        performance = self.calculate_performance(actual_output, actual_production_time)
        quality = self.calculate_quality(actual_output, good_output)

        oee = availability * performance * quality

        metrics = {
            'OEE': oee,
            'Availability': availability,
            'Performance': performance,
            'Quality': quality
        }

        return metrics


# KPI calculation example
# 1 week of data for chemical plant
planned_time = 7 * 24 * 60  # 7 days (minutes)
ideal_cycle_time = 2.0  # Ideal cycle time (minutes/batch)

kpi_calc = ProcessKPICalculator(planned_time, ideal_cycle_time)

# Weekly data
week_data = {
    'Monday': {'actual_time': 22*60, 'output': 620, 'good': 598},
    'Tuesday': {'actual_time': 23*60, 'output': 680, 'good': 672},
    'Wednesday': {'actual_time': 21*60, 'output': 610, 'good': 595},
    'Thursday': {'actual_time': 23*60, 'output': 685, 'good': 678},
    'Friday': {'actual_time': 22*60, 'output': 650, 'good': 640},
    'Saturday': {'actual_time': 18*60, 'output': 520, 'good': 512},
    'Sunday': {'actual_time': 15*60, 'output': 430, 'good': 425}
}

# Calculate daily KPIs
daily_kpis = {}
for day, data in week_data.items():
    kpis = kpi_calc.calculate_oee(
        data['actual_time'],
        data['output'],
        data['good']
    )
    daily_kpis[day] = kpis

# Create KPI dashboard
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=('Daily OEE Trend', 'OEE Components', 'Weekly OEE', 'Production and Quality'),
    specs=[
        [{"type": "scatter"}, {"type": "bar"}],
        [{"type": "indicator"}, {"type": "scatter"}]
    ],
    vertical_spacing=0.15,
    horizontal_spacing=0.12
)

days = list(week_data.keys())
oee_values = [daily_kpis[day]['OEE'] * 100 for day in days]
availability_values = [daily_kpis[day]['Availability'] * 100 for day in days]
performance_values = [daily_kpis[day]['Performance'] * 100 for day in days]
quality_values = [daily_kpis[day]['Quality'] * 100 for day in days]

# Daily OEE trend (upper left)
fig.add_trace(
    go.Scatter(
        x=days,
        y=oee_values,
        mode='lines+markers',
        name='OEE',
        line=dict(color='#11998e', width=3),
        marker=dict(size=10)
    ),
    row=1, col=1
)
fig.add_hline(y=85, line_dash="dash", line_color="green",
              annotation_text="Target: 85%", row=1, col=1)
fig.add_hline(y=60, line_dash="dash", line_color="orange",
              annotation_text="Minimum: 60%", row=1, col=1)

# OEE components (upper right)
fig.add_trace(
    go.Bar(x=days, y=availability_values, name='Availability',
           marker_color='#11998e'),
    row=1, col=2
)
fig.add_trace(
    go.Bar(x=days, y=performance_values, name='Performance',
           marker_color='#f59e0b'),
    row=1, col=2
)
fig.add_trace(
    go.Bar(x=days, y=quality_values, name='Quality',
           marker_color='#7b2cbf'),
    row=1, col=2
)

# Weekly OEE (lower left)
weekly_oee = np.mean(oee_values)
fig.add_trace(
    go.Indicator(
        mode="gauge+number+delta",
        value=weekly_oee,
        title={'text': "Weekly OEE (%)"},
        delta={'reference': 85, 'increasing': {'color': "green"}, 'decreasing': {'color': "red"}},
        gauge={
            'axis': {'range': [None, 100]},
            'bar': {'color': "#11998e"},
            'steps': [
                {'range': [0, 60], 'color': "lightcoral"},
                {'range': [60, 85], 'color': "lightyellow"},
                {'range': [85, 100], 'color': "lightgreen"}
            ],
            'threshold': {
                'line': {'color': "red", 'width': 4},
                'thickness': 0.75,
                'value': 85
            }
        }
    ),
    row=2, col=1
)

# Production and quality (lower right)
output_values = [week_data[day]['output'] for day in days]
good_values = [week_data[day]['good'] for day in days]

fig.add_trace(
    go.Scatter(
        x=days,
        y=output_values,
        mode='lines+markers',
        name='Total Production',
        line=dict(color='#11998e', width=2),
        yaxis='y'
    ),
    row=2, col=2
)

fig.add_trace(
    go.Scatter(
        x=days,
        y=good_values,
        mode='lines+markers',
        name='Good Units',
        line=dict(color='#4caf50', width=2),
        yaxis='y'
    ),
    row=2, col=2
)

# Layout settings
fig.update_xaxes(title_text="Day", row=1, col=1)
fig.update_xaxes(title_text="Day", row=1, col=2)
fig.update_xaxes(title_text="Day", row=2, col=2)

fig.update_yaxes(title_text="OEE (%)", row=1, col=1)
fig.update_yaxes(title_text="Percentage (%)", row=1, col=2)
fig.update_yaxes(title_text="Production (batches)", row=2, col=2)

fig.update_layout(
    title_text="Process KPI Dashboard - Weekly Report",
    title_font_size=20,
    title_x=0.5,
    height=900,
    showlegend=True,
    template="plotly_white"
)

fig.write_html("kpi_dashboard.html")
print("KPI dashboard saved.\n")

# KPI report
print("=== Weekly KPI Report ===\n")
print(f"{'Day':<10} {'OEE':>7} {'Avail.':>7} {'Perf.':>7} {'Quality':>7} {'Output':>8} {'Good':>8}")
print("-" * 65)

for day in days:
    kpi = daily_kpis[day]
    data = week_data[day]
    print(f"{day:<10} "
          f"{kpi['OEE']*100:>6.1f}% "
          f"{kpi['Availability']*100:>6.1f}% "
          f"{kpi['Performance']*100:>6.1f}% "
          f"{kpi['Quality']*100:>6.1f}% "
          f"{data['output']:>7} "
          f"{data['good']:>7}")

print("-" * 65)
print(f"Weekly Average: {weekly_oee:>5.1f}%\n")

# Improvement suggestions
print("=== Improvement Suggestions ===")
avg_availability = np.mean(availability_values)
avg_performance = np.mean(performance_values)
avg_quality = np.mean(quality_values)

bottleneck = min([
    ('Availability', avg_availability),
    ('Performance', avg_performance),
    ('Quality', avg_quality)
], key=lambda x: x[1])

print(f"Main Bottleneck: {bottleneck[0]} ({bottleneck[1]:.1f}%)")

if bottleneck[0] == 'Availability':
    print("  → Consider reducing downtime, implementing predictive maintenance")
elif bottleneck[0] == 'Performance':
    print("  → Consider process optimization, improving bottleneck operations")
else:
    print("  → Consider strengthening quality control, identifying and addressing defect causes")

# OEE classification
if weekly_oee >= 85:
    oee_class = "World Class"
elif weekly_oee >= 60:
    oee_class = "Average"
else:
    oee_class = "Needs Improvement"

print(f"\nOEE Rating: {oee_class} (Target: 85% or higher)")

Expected Output:

=== Weekly KPI Report ===

Day          OEE   Avail.   Perf. Quality   Output     Good
-----------------------------------------------------------------
Monday      84.4%  91.7%  93.5%  96.5%     620     598
Tuesday     91.2%  95.8%  97.1%  98.8%     680     672
Wednesday   82.1%  87.5%  95.2%  97.5%     610     595
Thursday    91.8%  95.8%  97.5%  99.0%     685     678
Friday      87.9%  91.7%  97.2%  98.5%     650     640
Saturday    85.2%  75.0%  96.3%  98.5%     520     512
Sunday      83.6%  62.5%  95.3%  98.8%     430     425
-----------------------------------------------------------------
Weekly Average:  86.6%

=== Improvement Suggestions ===
Main Bottleneck: Availability (85.7%)
  → Consider reducing downtime, implementing predictive maintenance

OEE Rating: World Class (Target: 85% or higher)

Explanation: OEE (Overall Equipment Effectiveness) is one of the most important KPIs in manufacturing and process industries. Composed of three elements - availability, performance, and quality - it evaluates the overall productivity of equipment. This code automates daily/weekly OEE calculations, visualization, bottleneck analysis, and improvement suggestions.

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Explanation: OEE (Overall Equipment Effectiveness) is one of

Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 5-15 seconds
Dependencies: None
"""

<h4>Code Example 7: Process State Visualization (Finite State Machine)</h4>

<p><strong>Purpose</strong>: Model process operational states with a finite state machine and visualize state transitions.</p>

<pre><code class="language-python">import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from enum import Enum

# Japanese font settings
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False

class ProcessState(Enum):
    """Process operational states"""
    STOPPED = 0      # Stopped
    STARTUP = 1      # Starting up
    RUNNING = 2      # Normal operation
    SHUTDOWN = 3     # Shutting down
    ALARM = 4        # Alarm state
    MAINTENANCE = 5  # Maintenance

class ProcessStateMachine:
    """
    Process state machine

    State transition rules:
    STOPPED → STARTUP → RUNNING
    RUNNING → SHUTDOWN → STOPPED
    ANY → ALARM (when alarm occurs)
    ALARM → previous state (when alarm clears)
    STOPPED ↔ MAINTENANCE
    """

    def __init__(self, initial_state=ProcessState.STOPPED):
        self.current_state = initial_state
        self.previous_state = None
        self.state_history = [(pd.Timestamp.now(), initial_state)]

        # State transition rules
        self.transitions = {
            ProcessState.STOPPED: [ProcessState.STARTUP, ProcessState.MAINTENANCE],
            ProcessState.STARTUP: [ProcessState.RUNNING, ProcessState.ALARM],
            ProcessState.RUNNING: [ProcessState.SHUTDOWN, ProcessState.ALARM],
            ProcessState.SHUTDOWN: [ProcessState.STOPPED, ProcessState.ALARM],
            ProcessState.ALARM: [],  # Can return to any state from alarm
            ProcessState.MAINTENANCE: [ProcessState.STOPPED]
        }

    def transition(self, new_state, timestamp=None):
        """
        State transition

        Parameters:
        -----------
        new_state : ProcessState
            New state
        timestamp : pd.Timestamp or None
            Transition time

        Returns:
        --------
        success : bool
            Success/failure of transition
        """
        if timestamp is None:
            timestamp = pd.Timestamp.now()

        # Transition to alarm state always allowed
        if new_state == ProcessState.ALARM:
            self.previous_state = self.current_state
            self.current_state = new_state
            self.state_history.append((timestamp, new_state))
            return True

        # Return from alarm state
        if self.current_state == ProcessState.ALARM:
            self.current_state = new_state
            self.state_history.append((timestamp, new_state))
            return True

        # Check normal state transition rules
        if new_state in self.transitions[self.current_state]:
            self.current_state = new_state
            self.state_history.append((timestamp, new_state))
            return True

        # Invalid transition
        return False

    def get_state_duration(self):
        """Duration of current state (seconds)"""
        if len(self.state_history) < 2:
            return 0

        current_time = pd.Timestamp.now()
        last_transition = self.state_history[-1][0]
        duration = (current_time - last_transition).total_seconds()

        return duration

    def get_state_statistics(self):
        """Statistics by state"""
        if len(self.state_history) < 2:
            return {}

        df_history = pd.DataFrame(self.state_history, columns=['timestamp', 'state'])
        df_history['duration'] = df_history['timestamp'].diff().shift(-1).dt.total_seconds()

        stats = {}
        for state in ProcessState:
            state_data = df_history[df_history['state'] == state]
            if len(state_data) > 0:
                stats[state.name] = {
                    'count': len(state_data),
                    'total_duration': state_data['duration'].sum(),
                    'avg_duration': state_data['duration'].mean()
                }

        return stats


# Process state machine simulation
print("=== Process State Machine Simulation ===\n")

# Initialization
state_machine = ProcessStateMachine(initial_state=ProcessState.STOPPED)

# 1-day process operation simulation
start_time = pd.Timestamp('2025-01-01 00:00:00')

# State transition scenario
transitions_scenario = [
    (start_time + pd.Timedelta(hours=0), ProcessState.STOPPED),
    (start_time + pd.Timedelta(hours=1), ProcessState.STARTUP),
    (start_time + pd.Timedelta(hours=2), ProcessState.RUNNING),
    (start_time + pd.Timedelta(hours=8), ProcessState.ALARM),      # Alarm occurs
    (start_time + pd.Timedelta(hours=8.5), ProcessState.RUNNING),  # Alarm clears
    (start_time + pd.Timedelta(hours=16), ProcessState.SHUTDOWN),
    (start_time + pd.Timedelta(hours=17), ProcessState.STOPPED),
    (start_time + pd.Timedelta(hours=18), ProcessState.MAINTENANCE),
    (start_time + pd.Timedelta(hours=22), ProcessState.STOPPED),
]

# Execute state transitions
for timestamp, new_state in transitions_scenario[1:]:
    success = state_machine.transition(new_state, timestamp)
    if success:
        print(f"[{timestamp.strftime('%H:%M')}] {state_machine.current_state.name}")
    else:
        print(f"[{timestamp.strftime('%H:%M')}] Invalid transition: → {new_state.name}")

# Visualize state history
df_history = pd.DataFrame(state_machine.state_history, columns=['timestamp', 'state'])
df_history['state_code'] = df_history['state'].apply(lambda x: x.value)
df_history['state_name'] = df_history['state'].apply(lambda x: x.name)

# Gantt chart-style visualization
fig, ax = plt.subplots(figsize=(16, 6))

# Define colors for each state
state_colors = {
    ProcessState.STOPPED: '#gray',
    ProcessState.STARTUP: '#ffeb3b',
    ProcessState.RUNNING: '#4caf50',
    ProcessState.SHUTDOWN: '#ff9800',
    ProcessState.ALARM: '#f44336',
    ProcessState.MAINTENANCE: '#2196f3'
}

# Display each state period as a bar
for i in range(len(df_history) - 1):
    start = df_history.iloc[i]['timestamp']
    end = df_history.iloc[i + 1]['timestamp']
    state = df_history.iloc[i]['state']

    ax.barh(
        0,
        width=(end - start).total_seconds() / 3600,  # In hours
        left=(start - start_time).total_seconds() / 3600,
        height=0.5,
        color=state_colors[state],
        edgecolor='black',
        linewidth=1.5,
        label=state.name if i == 0 or df_history.iloc[i-1]['state'] != state else ""
    )

    # Display state name in center of bar
    duration = (end - start).total_seconds() / 3600
    if duration > 0.5:  # Only label states longer than 30 minutes
        mid_point = (start - start_time).total_seconds() / 3600 + duration / 2
        ax.text(mid_point, 0, state.name, ha='center', va='center',
                fontsize=11, fontweight='bold', color='white')

# Legend (remove duplicates)
handles, labels = ax.get_legend_handles_labels()
by_label = dict(zip(labels, handles))
ax.legend(by_label.values(), by_label.keys(), loc='upper right', fontsize=10)

ax.set_xlabel('Time (hours)', fontsize=12)
ax.set_yticks([])
ax.set_xlim(0, 24)
ax.set_title('Process State Transition Timeline (24 hours)', fontsize=14, fontweight='bold')
ax.grid(alpha=0.3, axis='x')

plt.tight_layout()
plt.show()

# Statistics
print("\n=== State Statistics ===")
stats = state_machine.get_state_statistics()

print(f"\n{'State':<15} {'Count':>6} {'Total Time':>12} {'Avg Time':>12}")
print("-" * 50)

for state_name, stat in stats.items():
    total_hours = stat['total_duration'] / 3600
    avg_hours = stat['avg_duration'] / 3600
    print(f"{state_name:<15} {stat['count']:>6} times "
          f"{total_hours:>9.2f} hours {avg_hours:>9.2f} hours")

# Calculate availability
total_time = 24  # hours
running_time = stats.get('RUNNING', {}).get('total_duration', 0) / 3600
availability = running_time / total_time * 100

print(f"\nAvailability: {availability:.1f}% ({running_time:.2f} hours / {total_time} hours)")

Expected Output:

=== Process State Machine Simulation ===

[01:00] STARTUP
[02:00] RUNNING
[08:00] ALARM
[08:30] RUNNING
[16:00] SHUTDOWN
[17:00] STOPPED
[18:00] MAINTENANCE
[22:00] STOPPED

=== State Statistics ===

State              Count   Total Time     Avg Time
--------------------------------------------------
STOPPED             2 times      5.00 hours      2.50 hours
STARTUP             1 times      1.00 hours      1.00 hours
RUNNING             2 times     13.50 hours      6.75 hours
ALARM               1 times      0.50 hours      0.50 hours
SHUTDOWN            1 times      1.00 hours      1.00 hours
MAINTENANCE         1 times      4.00 hours      4.00 hours

Availability: 56.2% (13.50 hours / 24 hours)

Explanation: Finite state machines (FSM) are a powerful tool for clearly modeling process operational states and managing state transitions. This code implements startup, normal operation, shutdown, alarm, and maintenance states along with transition rules between them. Timeline visualization allows operators to intuitively understand process operation history.

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
# - plotly>=5.14.0


<h4>Code Example 8: Complete Integrated Monitoring System - Chemical Reactor Case Study</h4>

<p><strong>Purpose</strong>: Integrate all elements learned so far to build a complete monitoring system for a chemical reactor.</p>

<pre><code class="language-python">import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from collections import deque
from datetime import datetime

class IntegratedProcessMonitoringSystem:
    """
    Integrated process monitoring system

    Functions:
    - Real-time data collection
    - Multi-variable trend monitoring
    - Alarm management
    - KPI calculation
    - State management
    """

    def __init__(self, process_name):
        self.process_name = process_name

        # Data buffers (1 hour, 1 second interval)
        self.buffer_size = 3600
        self.time_buffer = deque(maxlen=self.buffer_size)
        self.data_buffers = {
            'Reactor Temperature': deque(maxlen=self.buffer_size),
            'Jacket Temperature': deque(maxlen=self.buffer_size),
            'Reactor Pressure': deque(maxlen=self.buffer_size),
            'Feed Flow Rate': deque(maxlen=self.buffer_size),
            'Product Concentration': deque(maxlen=self.buffer_size)
        }

        # Alarm system
        self.active_alarms = []

        # KPI data
        self.kpi_data = {
            'production_count': 0,
            'good_count': 0,
            'running_time': 0
        }

        # Process state
        self.process_state = 'RUNNING'
        self.iteration = 0

    def generate_process_data(self):
        """Generate process data (in practice, acquired from PLC/DCS)"""
        elapsed = self.iteration

        # Reactor temperature (target 175°C)
        temp_base = 175.0
        temp_noise = np.random.normal(0, 1.2)
        temp_disturbance = 3 * np.sin(2 * np.pi * elapsed / 300)  # 5-minute cycle
        reactor_temp = temp_base + temp_noise + temp_disturbance

        # Jacket temperature (lower than reactor temperature)
        jacket_temp = reactor_temp - 10 + np.random.normal(0, 0.8)

        # Pressure (target 1.5 MPa)
        pressure = 1.5 + np.random.normal(0, 0.03)

        # Feed flow rate (target 50 m³/h)
        flow_rate = 50 + np.random.normal(0, 2.5)

        # Product concentration (target 80%)
        concentration = 80 + np.random.normal(0, 3) - 0.5 * (reactor_temp - 175)

        return {
            'Reactor Temperature': reactor_temp,
            'Jacket Temperature': jacket_temp,
            'Reactor Pressure': pressure,
            'Feed Flow Rate': flow_rate,
            'Product Concentration': concentration
        }

    def check_process_alarms(self, data):
        """Check process alarms"""
        alarms = []

        # Temperature alarms
        if data['Reactor Temperature'] > 180:
            alarms.append({'severity': 'CRITICAL', 'variable': 'Reactor Temperature',
                          'message': 'High Temperature Alarm', 'value': data['Reactor Temperature']})
        elif data['Reactor Temperature'] > 177:
            alarms.append({'severity': 'WARNING', 'variable': 'Reactor Temperature',
                          'message': 'High Temperature Warning', 'value': data['Reactor Temperature']})
        elif data['Reactor Temperature'] < 173:
            alarms.append({'severity': 'WARNING', 'variable': 'Reactor Temperature',
                          'message': 'Low Temperature Warning', 'value': data['Reactor Temperature']})

        # Pressure alarms
        if data['Reactor Pressure'] > 1.6 or data['Reactor Pressure'] < 1.4:
            alarms.append({'severity': 'CRITICAL', 'variable': 'Reactor Pressure',
                          'message': 'Pressure Abnormal', 'value': data['Reactor Pressure']})

        # Concentration alarms
        if data['Product Concentration'] < 75:
            alarms.append({'severity': 'WARNING', 'variable': 'Product Concentration',
                          'message': 'Quality Degradation', 'value': data['Product Concentration']})

        self.active_alarms = alarms
        return alarms

    def update(self):
        """System update (called every second)"""
        current_time = datetime.now()
        data = self.generate_process_data()

        # Add to data buffers
        self.time_buffer.append(current_time)
        for var, value in data.items():
            self.data_buffers[var].append(value)

        # Check alarms
        alarms = self.check_process_alarms(data)

        # Update KPIs
        self.kpi_data['running_time'] += 1  # seconds

        self.iteration += 1

        return data, alarms

    def create_integrated_dashboard(self):
        """Create integrated dashboard"""
        if len(self.time_buffer) < 10:
            return None

        # Convert data to numpy arrays
        time_array = list(self.time_buffer)
        reactor_temp = np.array(list(self.data_buffers['Reactor Temperature']))
        jacket_temp = np.array(list(self.data_buffers['Jacket Temperature']))
        pressure = np.array(list(self.data_buffers['Reactor Pressure']))
        flow_rate = np.array(list(self.data_buffers['Feed Flow Rate']))
        concentration = np.array(list(self.data_buffers['Product Concentration']))

        # Create subplots
        fig = make_subplots(
            rows=3, cols=2,
            subplot_titles=(
                'Reactor Temperature Trend', 'Jacket Temperature Trend',
                'Reactor Pressure Trend', 'Feed Flow Rate Trend',
                'Product Concentration Trend', 'Current Process Status'
            ),
            specs=[
                [{"type": "scatter"}, {"type": "scatter"}],
                [{"type": "scatter"}, {"type": "scatter"}],
                [{"type": "scatter"}, {"type": "indicator"}]
            ],
            vertical_spacing=0.12,
            horizontal_spacing=0.12
        )

        # Reactor temperature
        fig.add_trace(
            go.Scatter(x=time_array, y=reactor_temp, mode='lines',
                      name='Reactor Temperature', line=dict(color='#11998e', width=2)),
            row=1, col=1
        )
        fig.add_hline(y=175, line_dash="dash", line_color="red", row=1, col=1)
        fig.add_hrect(y0=173, y1=177, fillcolor="green", opacity=0.1,
                      line_width=0, row=1, col=1)

        # Jacket temperature
        fig.add_trace(
            go.Scatter(x=time_array, y=jacket_temp, mode='lines',
                      name='Jacket Temperature', line=dict(color='#f59e0b', width=2)),
            row=1, col=2
        )

        # Pressure
        fig.add_trace(
            go.Scatter(x=time_array, y=pressure, mode='lines',
                      name='Pressure', line=dict(color='#7b2cbf', width=2)),
            row=2, col=1
        )
        fig.add_hline(y=1.5, line_dash="dash", line_color="red", row=2, col=1)

        # Flow rate
        fig.add_trace(
            go.Scatter(x=time_array, y=flow_rate, mode='lines',
                      name='Flow Rate', line=dict(color='#e63946', width=2)),
            row=2, col=2
        )
        fig.add_hline(y=50, line_dash="dash", line_color="red", row=2, col=2)

        # Concentration
        fig.add_trace(
            go.Scatter(x=time_array, y=concentration, mode='lines',
                      name='Concentration', line=dict(color='#06a77d', width=2)),
            row=3, col=1
        )
        fig.add_hline(y=80, line_dash="dash", line_color="red", row=3, col=1)

        # Process status indicator
        current_temp = reactor_temp[-1]
        fig.add_trace(
            go.Indicator(
                mode="gauge+number",
                value=current_temp,
                title={'text': "Reactor Temperature (°C)"},
                gauge={
                    'axis': {'range': [None, 200]},
                    'bar': {'color': "#11998e"},
                    'steps': [
                        {'range': [0, 173], 'color': "lightblue"},
                        {'range': [173, 177], 'color': "lightgreen"},
                        {'range': [177, 200], 'color': "lightcoral"}
                    ],
                    'threshold': {
                        'line': {'color': "red", 'width': 4},
                        'thickness': 0.75,
                        'value': 175
                    }
                }
            ),
            row=3, col=2
        )

        # Axis labels
        fig.update_yaxes(title_text="Temperature (°C)", row=1, col=1)
        fig.update_yaxes(title_text="Temperature (°C)", row=1, col=2)
        fig.update_yaxes(title_text="Pressure (MPa)", row=2, col=1)
        fig.update_yaxes(title_text="Flow Rate (m³/h)", row=2, col=2)
        fig.update_yaxes(title_text="Concentration (%)", row=3, col=1)

        fig.update_layout(
            title_text=f"{self.process_name} - Integrated Monitoring Dashboard",
            title_font_size=20,
            title_x=0.5,
            height=1000,
            showlegend=False,
            template="plotly_white"
        )

        return fig


# Integrated monitoring system demonstration
print("=== Integrated Process Monitoring System ===")
print("Chemical Reactor R-101 Monitoring System Starting\n")

# System initialization
monitoring_system = IntegratedProcessMonitoringSystem("Chemical Reactor R-101")

# 10-minute simulation (600 seconds)
print("Starting 10-minute process monitoring...\n")

alarm_count = 0
for i in range(600):
    data, alarms = monitoring_system.update()

    # Display status every 30 seconds
    if (i + 1) % 30 == 0:
        print(f"[{i+1:>3}s] "
              f"Temp: {data['Reactor Temperature']:>6.2f}°C | "
              f"Pressure: {data['Reactor Pressure']:>5.3f} MPa | "
              f"Conc: {data['Product Concentration']:>5.2f}% | "
              f"Alarms: {len(alarms)}")

        if alarms:
            for alarm in alarms:
                severity_icon = '🔴' if alarm['severity'] == 'CRITICAL' else '🟡'
                print(f"  {severity_icon} {alarm['variable']}: {alarm['message']} ({alarm['value']:.2f})")
            alarm_count += len(alarms)

print("\nProcess monitoring complete!\n")

# Generate integrated dashboard
fig = monitoring_system.create_integrated_dashboard()
if fig:
    fig.write_html("integrated_monitoring_system.html")
    print("Integrated dashboard saved to 'integrated_monitoring_system.html'.\n")

# Final statistics
print("=== Process Statistics Summary ===")
print(f"Monitoring Time: {monitoring_system.kpi_data['running_time']} seconds (10 minutes)")
print(f"Total Alarms: {alarm_count}")

# Statistics for each variable
print("\nVariable Statistics:")
for var_name, buffer in monitoring_system.data_buffers.items():
    data_array = np.array(list(buffer))
    print(f"  {var_name:<25}: Mean {np.mean(data_array):>7.2f}, "
          f"Std Dev {np.std(data_array):>5.2f}, "
          f"Max {np.max(data_array):>7.2f}, "
          f"Min {np.min(data_array):>7.2f}")

print("\nSystem terminated normally.")

Explanation: This code integrates all elements learned in this chapter (real-time data collection, multi-variable monitoring, alarm management, KPI calculation, dashboard visualization) into a complete process monitoring system. Using a chemical reactor as an example, it implements the configuration of monitoring systems used in actual plants. In real operation, this system is deployed as a web application (Dash, Streamlit, etc.) so multiple operators can access it in real-time.


5.3 Chapter Summary

What We Learned

  1. Real-time Monitoring System Architecture
    • Four-layer structure: data collection, processing, visualization, and storage layers
    • Data flow: Sensors → Buffering → Processing → Visualization → Storage
    • System design principles and scalability
  2. Dashboard Design and Visualization
    • Building interactive dashboards with Plotly
    • Effective placement of trend charts, gauges, and indicators
    • UI design principles for operators
  3. Real-time Data Processing
    • Efficient data buffering with deque
    • Statistical processing of streaming data
    • Real-time anomaly detection
  4. Alarm Management System
    • Alarm classification by severity level (INFO, WARNING, ALARM, CRITICAL)
    • Alarm history management and acknowledgement
    • Alarm flood prevention strategies
  5. KPIs and Process Analysis
    • OEE (Overall Equipment Effectiveness) calculation and evaluation
    • Historical data trend analysis and pattern detection
    • Bottleneck analysis and improvement suggestions
  6. Integrated Monitoring System
    • Building a complete system integrating all functions
    • Practical case study of a chemical reactor
    • Deployment strategy for real operations

Key Points

Data Buffering using deque for fixed-length queues enables memory-efficient real-time processing. Alarm Design with severity classification and appropriate threshold settings reduces operator burden. KPI Visualization through visual display of OEE and its components clarifies improvement opportunities. Integrated Dashboards that monitor multiple variables simultaneously provide an overview of the entire process. Scalability through modular design makes it easy to add variables and extend functionality.

Practical Applications

The monitoring systems learned in this chapter can be applied to real processes across various industries. Chemical Plants benefit from integrated monitoring of reactors, distillation columns, and heat exchangers. Pharmaceutical Plants require GMP-compliant record management and alarm systems. Food Plants need HACCP-compliant temperature and pH management. Semiconductor Manufacturing demands precision monitoring of cleanroom environments. Power Plants rely on efficiency monitoring of boilers and turbines.

Next Steps

Having completed this series, you have acquired fundamental knowledge and implementation capability in process monitoring, practical skills in Statistical Process Control (SPC), anomaly detection techniques using machine learning and deep learning, theory and implementation of PID control, and the capability to build real-time monitoring systems.

For Further Learning:

  1. Model Predictive Control (MPC): Advanced methods for multivariable control
  2. Digital Twin: Building virtual process models and predictive simulation
  3. Reinforcement Learning Control: Adaptive process control using AI
  4. Soft Sensor Development: Estimation of difficult variables using machine learning
  5. Cloud Integration: Integration with AWS/Azure/GCP and big data analysis

Congratulations on Completing the Series!

You have completed all 5 chapters of the "Introduction to Process Monitoring and Control Series v1.0". From sensor data acquisition to statistical process control, anomaly detection, PID control, and building real-time monitoring systems, you have acquired comprehensive knowledge and skills in process engineering.

Use this knowledge to move on to the next steps: Consider application to your company's processes, publish and share learned code on GitHub, build soft sensors and dashboards with real data, and develop your career as a process control engineer.

We look forward to your feedback!
If you have suggestions for improving this series, questions, or success stories, please let us know.

Contact: yusuke.hashimoto.b8@tohoku.ac.jp

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer