第5章:実時間プロセス監視システムの実践

リアルタイムダッシュボード構築から統合監視システムまで

📖 読了時間: 30-35分 📊 難易度: 中級〜上級 💻 コード例: 8個

学習目標

この章を読むことで、以下を習得できます:


5.1 リアルタイム監視システムのアーキテクチャ

システム全体設計

実時間プロセス監視システムは、データ収集、処理、可視化、保存の4つの主要コンポーネントから構成されます。

graph TB subgraph "データ収集層" A[センサー] --> B[PLC/DCS] B --> C[OPC UAサーバー] end subgraph "データ処理層" C --> D[データストリーミング] D --> E[リアルタイム処理] E --> F[異常検知] E --> G[制御ループ] end subgraph "可視化層" F --> H[ダッシュボード] G --> H E --> I[アラームシステム] I --> H end subgraph "データ保存層" E --> J[Historianデータベース] F --> J J --> K[履歴分析] end style A fill:#e8f5e9 style B fill:#c8e6c9 style C fill:#a5d6a7 style D fill:#81c784 style E fill:#66bb6a style F fill:#4caf50 style G fill:#388e3c style H fill:#2e7d32 style I fill:#1b5e20 style J fill:#f1f8e9 style K fill:#dcedc8

リアルタイムデータフロー

監視システムでは、以下のデータフローが連続的に実行されます:

  1. データ取得: センサー → PLC → データ収集サーバー(1秒〜1分間隔)
  2. バッファリング: dequeやリングバッファでメモリ効率的に保持
  3. リアルタイム処理: 統計計算、異常検知、制御ループ実行
  4. 可視化更新: グラフ、ゲージ、アラーム表示(1秒〜10秒間隔)
  5. データベース保存: 長期保存用Historian(1分〜1時間間隔)

5.2 コード例:リアルタイム監視システムの実装

コード例1: Plotlyによるリアルタイムダッシュボードレイアウト設計

目的: Plotlyを使って、プロセス監視用のダッシュボードレイアウトを設計する(静的デモンストレーション)。

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import pandas as pd

# シミュレーションデータ生成
np.random.seed(42)
time_points = 100
time = pd.date_range('2025-01-01 00:00:00', periods=time_points, freq='1min')

# プロセス変数データ
temperature = 175 + np.random.normal(0, 2, time_points) + 3 * np.sin(np.linspace(0, 4*np.pi, time_points))
pressure = 1.5 + np.random.normal(0, 0.05, time_points)
flow_rate = 50 + np.random.normal(0, 3, time_points)

# ダッシュボードレイアウトの作成
# 4つのサブプロットを配置: 温度トレンド、圧力トレンド、流量トレンド、温度ゲージ
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=('反応器温度トレンド', '圧力トレンド', '流量トレンド', '現在温度'),
    specs=[
        [{"type": "scatter"}, {"type": "scatter"}],
        [{"type": "scatter"}, {"type": "indicator"}]
    ],
    vertical_spacing=0.12,
    horizontal_spacing=0.1
)

# 温度トレンド(左上)
fig.add_trace(
    go.Scatter(
        x=time,
        y=temperature,
        mode='lines',
        name='温度',
        line=dict(color='#11998e', width=2),
        fill='tozeroy',
        fillcolor='rgba(17, 153, 142, 0.1)'
    ),
    row=1, col=1
)

# 管理限界線の追加
fig.add_hline(y=175, line_dash="dash", line_color="red",
              annotation_text="目標値", row=1, col=1)
fig.add_hrect(y0=173, y1=177, fillcolor="green", opacity=0.1,
              line_width=0, row=1, col=1)

# 圧力トレンド(右上)
fig.add_trace(
    go.Scatter(
        x=time,
        y=pressure,
        mode='lines',
        name='圧力',
        line=dict(color='#f59e0b', width=2)
    ),
    row=1, col=2
)
fig.add_hline(y=1.5, line_dash="dash", line_color="red", row=1, col=2)

# 流量トレンド(左下)
fig.add_trace(
    go.Scatter(
        x=time,
        y=flow_rate,
        mode='lines',
        name='流量',
        line=dict(color='#7b2cbf', width=2)
    ),
    row=2, col=1
)
fig.add_hline(y=50, line_dash="dash", line_color="red", row=2, col=1)

# 温度ゲージ(右下)
current_temp = temperature[-1]
fig.add_trace(
    go.Indicator(
        mode="gauge+number+delta",
        value=current_temp,
        title={'text': "反応器温度 (°C)"},
        delta={'reference': 175, 'increasing': {'color': "red"}, 'decreasing': {'color': "blue"}},
        gauge={
            'axis': {'range': [None, 200]},
            'bar': {'color': "#11998e"},
            'steps': [
                {'range': [0, 173], 'color': "lightblue"},
                {'range': [173, 177], 'color': "lightgreen"},
                {'range': [177, 200], 'color': "lightcoral"}
            ],
            'threshold': {
                'line': {'color': "red", 'width': 4},
                'thickness': 0.75,
                'value': 175
            }
        }
    ),
    row=2, col=2
)

# レイアウト設定
fig.update_xaxes(title_text="時刻", row=1, col=1)
fig.update_xaxes(title_text="時刻", row=1, col=2)
fig.update_xaxes(title_text="時刻", row=2, col=1)

fig.update_yaxes(title_text="温度 (°C)", row=1, col=1)
fig.update_yaxes(title_text="圧力 (MPa)", row=1, col=2)
fig.update_yaxes(title_text="流量 (m³/h)", row=2, col=1)

fig.update_layout(
    title_text="プロセス監視ダッシュボード - 化学反応器",
    title_font_size=20,
    title_x=0.5,
    height=800,
    showlegend=False,
    template="plotly_white"
)

# HTMLとして保存(ブラウザで表示可能)
fig.write_html("process_monitoring_dashboard.html")
print("ダッシュボードを 'process_monitoring_dashboard.html' に保存しました。")
print("ブラウザで開いて確認してください。")

# 統計サマリー
print("\n=== 現在のプロセス状態 ===")
print(f"反応器温度: {current_temp:.2f} °C (目標: 175°C)")
print(f"圧力: {pressure[-1]:.3f} MPa (目標: 1.5 MPa)")
print(f"流量: {flow_rate[-1]:.2f} m³/h (目標: 50 m³/h)")

# アラーム状態チェック
temp_alarm = "正常" if 173 <= current_temp <= 177 else "警告"
pressure_alarm = "正常" if 1.45 <= pressure[-1] <= 1.55 else "警告"
flow_alarm = "正常" if 45 <= flow_rate[-1] <= 55 else "警告"

print(f"\n=== アラーム状態 ===")
print(f"温度: {temp_alarm}")
print(f"圧力: {pressure_alarm}")
print(f"流量: {flow_alarm}")

解説: このコードは、Plotlyを使ってプロセス監視ダッシュボードのレイアウトを設計します。トレンドチャート(時系列データ)とゲージ(現在値)を組み合わせることで、オペレータが直感的にプロセス状態を把握できるインターフェースを構築できます。実際のプロセスでは、これをWebアプリケーションとして展開します。

コード例2: シミュレートされたリアルタイムデータストリーミング

目的: dequeバッファを使って、リアルタイムデータストリーミングをシミュレートする。

import numpy as np
import pandas as pd
from collections import deque
import matplotlib.pyplot as plt
import time

# 日本語フォント設定
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False

class RealTimeDataStream:
    """
    リアルタイムデータストリーミングシミュレーター

    Parameters:
    -----------
    buffer_size : int
        データバッファのサイズ
    sampling_interval : float
        サンプリング間隔(秒)
    """

    def __init__(self, buffer_size=100, sampling_interval=1.0):
        self.buffer_size = buffer_size
        self.sampling_interval = sampling_interval

        # データバッファ(固定長キュー)
        self.time_buffer = deque(maxlen=buffer_size)
        self.temp_buffer = deque(maxlen=buffer_size)
        self.pressure_buffer = deque(maxlen=buffer_size)
        self.flow_buffer = deque(maxlen=buffer_size)

        # 開始時刻
        self.start_time = pd.Timestamp.now()
        self.iteration = 0

    def generate_sensor_data(self):
        """センサーデータの生成(実際はPLC/DCSから取得)"""
        elapsed = self.iteration * self.sampling_interval

        # 反応器温度(周期変動 + ノイズ)
        temp_base = 175.0
        temp_variation = 3.0 * np.sin(2 * np.pi * elapsed / 300)
        temp_noise = np.random.normal(0, 0.8)
        temperature = temp_base + temp_variation + temp_noise

        # 圧力(わずかな変動)
        pressure = 1.5 + np.random.normal(0, 0.02)

        # 流量(ステップ変化をシミュレート)
        if elapsed < 60:
            flow_base = 50.0
        elif elapsed < 120:
            flow_base = 55.0  # 60秒で流量増加
        else:
            flow_base = 50.0  # 120秒で元に戻る

        flow_rate = flow_base + np.random.normal(0, 2.0)

        return temperature, pressure, flow_rate

    def update(self):
        """データストリームの更新"""
        current_time = self.start_time + pd.Timedelta(seconds=self.iteration * self.sampling_interval)
        temp, pressure, flow = self.generate_sensor_data()

        # バッファに追加
        self.time_buffer.append(current_time)
        self.temp_buffer.append(temp)
        self.pressure_buffer.append(pressure)
        self.flow_buffer.append(flow)

        self.iteration += 1

        return current_time, temp, pressure, flow

    def get_statistics(self):
        """バッファ内データの統計量"""
        if len(self.temp_buffer) == 0:
            return None

        stats = {
            'temp_mean': np.mean(self.temp_buffer),
            'temp_std': np.std(self.temp_buffer),
            'temp_latest': self.temp_buffer[-1],
            'pressure_mean': np.mean(self.pressure_buffer),
            'pressure_latest': self.pressure_buffer[-1],
            'flow_mean': np.mean(self.flow_buffer),
            'flow_latest': self.flow_buffer[-1],
            'buffer_utilization': len(self.temp_buffer) / self.buffer_size * 100
        }

        return stats


# リアルタイムストリーミングのデモンストレーション
print("=== リアルタイムデータストリーミング開始 ===")
print("180秒間のデータ収集を行います...\n")

stream = RealTimeDataStream(buffer_size=200, sampling_interval=1.0)

# データ収集(180秒間、1秒ごと)
duration = 180  # 秒
for i in range(duration):
    timestamp, temp, pressure, flow = stream.update()

    # 10秒ごとに進捗表示
    if (i + 1) % 10 == 0:
        stats = stream.get_statistics()
        print(f"[{timestamp.strftime('%H:%M:%S')}] "
              f"温度: {temp:.2f}°C (平均: {stats['temp_mean']:.2f}°C) | "
              f"圧力: {pressure:.3f} MPa | "
              f"流量: {flow:.2f} m³/h")

    # 実際のリアルタイムシステムではtime.sleep()を使用
    # ここでは高速化のためスキップ

print("\nデータ収集完了!\n")

# 収集データの可視化
fig, axes = plt.subplots(3, 1, figsize=(14, 12))

# 温度トレンド
axes[0].plot(list(stream.time_buffer), list(stream.temp_buffer),
             color='#11998e', linewidth=1.5, label='温度')
axes[0].axhline(y=175, color='red', linestyle='--', linewidth=2, label='目標値')
axes[0].fill_between(list(stream.time_buffer), 173, 177, alpha=0.15, color='green', label='管理範囲')
axes[0].set_ylabel('温度 (°C)', fontsize=12)
axes[0].set_title('リアルタイムストリーミングデータ - 反応器温度', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)

# 圧力トレンド
axes[1].plot(list(stream.time_buffer), list(stream.pressure_buffer),
             color='#f59e0b', linewidth=1.5, label='圧力')
axes[1].axhline(y=1.5, color='red', linestyle='--', linewidth=2, label='目標値')
axes[1].set_ylabel('圧力 (MPa)', fontsize=12)
axes[1].set_title('圧力トレンド', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)

# 流量トレンド
axes[2].plot(list(stream.time_buffer), list(stream.flow_buffer),
             color='#7b2cbf', linewidth=1.5, label='流量')
axes[2].axhline(y=50, color='red', linestyle='--', linewidth=2, label='目標値')
axes[2].axvline(x=list(stream.time_buffer)[60], color='orange', linestyle=':', alpha=0.5, label='流量変化')
axes[2].axvline(x=list(stream.time_buffer)[120], color='orange', linestyle=':', alpha=0.5)
axes[2].set_xlabel('時刻', fontsize=12)
axes[2].set_ylabel('流量 (m³/h)', fontsize=12)
axes[2].set_title('流量トレンド', fontsize=14, fontweight='bold')
axes[2].legend()
axes[2].grid(alpha=0.3)

plt.tight_layout()
plt.show()

# 最終統計
final_stats = stream.get_statistics()
print("=== 最終統計 ===")
for key, value in final_stats.items():
    print(f"  {key}: {value:.2f}")

解説: このコードは、dequeを使った効率的なデータバッファリングを実装しています。固定長キュー(deque)は、古いデータを自動的に削除しながら新しいデータを追加するため、メモリ効率的なリアルタイム処理に最適です。実際のプロセスでは、PLC/DCSから連続的にデータを取得し、このようなバッファリングを行います。

コード例3: マルチチャート監視インターフェース

目的: 複数のプロセス変数を同時にモニタリングする包括的な監視インターフェースを構築する。

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import pandas as pd

class ProcessMonitoringInterface:
    """
    マルチチャートプロセス監視インターフェース

    Parameters:
    -----------
    process_name : str
        プロセス名
    variables : list of dict
        監視変数のリスト(name, unit, target, range)
    """

    def __init__(self, process_name, variables):
        self.process_name = process_name
        self.variables = variables
        self.n_vars = len(variables)

    def create_monitoring_dashboard(self, time_data, variable_data):
        """
        監視ダッシュボードの作成

        Parameters:
        -----------
        time_data : array-like
            時間データ
        variable_data : dict
            変数名をキーとしたデータ辞書

        Returns:
        --------
        fig : plotly.graph_objects.Figure
        """
        # サブプロット構成(変数数に応じて動的に配置)
        n_rows = int(np.ceil(self.n_vars / 2))
        n_cols = 2 if self.n_vars > 1 else 1

        fig = make_subplots(
            rows=n_rows,
            cols=n_cols,
            subplot_titles=[var['name'] for var in self.variables],
            vertical_spacing=0.12,
            horizontal_spacing=0.1
        )

        # 各変数のトレンドチャートを追加
        for idx, var in enumerate(self.variables):
            row = idx // 2 + 1
            col = idx % 2 + 1

            data = variable_data[var['name']]

            # トレンドライン
            fig.add_trace(
                go.Scatter(
                    x=time_data,
                    y=data,
                    mode='lines',
                    name=var['name'],
                    line=dict(color=var.get('color', '#11998e'), width=2)
                ),
                row=row, col=col
            )

            # 目標値ライン
            if 'target' in var:
                fig.add_hline(
                    y=var['target'],
                    line_dash="dash",
                    line_color="red",
                    annotation_text=f"目標: {var['target']}",
                    row=row, col=col
                )

            # 管理範囲
            if 'range' in var:
                lower, upper = var['range']
                fig.add_hrect(
                    y0=lower, y1=upper,
                    fillcolor="green", opacity=0.1,
                    line_width=0,
                    row=row, col=col
                )

            # 軸ラベル
            fig.update_xaxes(title_text="時刻", row=row, col=col)
            fig.update_yaxes(title_text=f"{var['name']} ({var['unit']})", row=row, col=col)

        # 全体レイアウト
        fig.update_layout(
            title_text=f"{self.process_name} - マルチ変数監視ダッシュボード",
            title_font_size=20,
            title_x=0.5,
            height=300 * n_rows,
            showlegend=False,
            template="plotly_white"
        )

        return fig


# 監視インターフェースの構築例
# 化学反応器の監視変数定義
variables = [
    {'name': '反応器温度', 'unit': '°C', 'target': 175, 'range': (173, 177), 'color': '#11998e'},
    {'name': 'ジャケット温度', 'unit': '°C', 'target': 165, 'range': (163, 167), 'color': '#f59e0b'},
    {'name': '反応器圧力', 'unit': 'MPa', 'target': 1.5, 'range': (1.45, 1.55), 'color': '#7b2cbf'},
    {'name': '原料流量', 'unit': 'm³/h', 'target': 50, 'range': (48, 52), 'color': '#e63946'},
    {'name': '冷却水流量', 'unit': 'm³/h', 'target': 100, 'range': (95, 105), 'color': '#06a77d'},
    {'name': 'pH', 'unit': '-', 'target': 7.0, 'range': (6.8, 7.2), 'color': '#ff006e'}
]

# データ生成(24時間、1分間隔)
np.random.seed(42)
n_points = 1440
time_data = pd.date_range('2025-01-01 00:00:00', periods=n_points, freq='1min')

# 各変数のデータ生成
variable_data = {}

# 反応器温度
variable_data['反応器温度'] = 175 + np.random.normal(0, 1.5, n_points) + \
                              2 * np.sin(2 * np.pi * np.arange(n_points) / 360)

# ジャケット温度
variable_data['ジャケット温度'] = 165 + np.random.normal(0, 1.2, n_points) + \
                                 1.5 * np.sin(2 * np.pi * np.arange(n_points) / 360)

# 反応器圧力
variable_data['反応器圧力'] = 1.5 + np.random.normal(0, 0.02, n_points)

# 原料流量
variable_data['原料流量'] = 50 + np.random.normal(0, 1.5, n_points)

# 冷却水流量
variable_data['冷却水流量'] = 100 + np.random.normal(0, 2.5, n_points)

# pH
variable_data['pH'] = 7.0 + np.random.normal(0, 0.15, n_points)

# 監視インターフェースの作成
interface = ProcessMonitoringInterface(
    process_name="化学反応器 R-101",
    variables=variables
)

fig = interface.create_monitoring_dashboard(time_data, variable_data)

# HTMLとして保存
fig.write_html("multi_variable_monitoring_dashboard.html")
print("マルチ変数監視ダッシュボードを保存しました。")

# 現在の状態サマリー
print("\n=== 現在のプロセス状態(最新値) ===")
for var in variables:
    latest_value = variable_data[var['name']][-1]
    target = var.get('target', None)
    status = "正常"

    if target and 'range' in var:
        lower, upper = var['range']
        if not (lower <= latest_value <= upper):
            status = "警告"

    print(f"{var['name']:<15}: {latest_value:>7.2f} {var['unit']:<5} (目標: {target}) - {status}")

解説: このマルチチャート監視インターフェースは、プロセスの複数の変数を同時に監視できるダッシュボードを動的に生成します。各変数に対して目標値と管理範囲を可視化することで、オペレータが一目でプロセス状態を把握できます。実際のプラントでは、数十〜数百の変数を監視します。

コード例4: アラーム通知システムの実装

目的: 重要度レベル別のアラーム通知システムを構築し、アラームログを管理する。

import pandas as pd
import numpy as np
from datetime import datetime
from enum import Enum

class AlarmSeverity(Enum):
    """アラーム重要度レベル"""
    INFO = 1      # 情報
    WARNING = 2   # 警告
    ALARM = 3     # アラーム
    CRITICAL = 4  # 緊急

class AlarmManager:
    """
    プロセスアラーム管理システム

    Parameters:
    -----------
    alarm_rules : list of dict
        アラームルールのリスト
    """

    def __init__(self, alarm_rules):
        self.alarm_rules = alarm_rules
        self.active_alarms = {}
        self.alarm_history = []

    def check_alarms(self, process_data):
        """
        アラーム状態のチェック

        Parameters:
        -----------
        process_data : dict
            プロセス変数の現在値

        Returns:
        --------
        new_alarms : list
            新規発生アラーム
        """
        new_alarms = []
        current_time = datetime.now()

        for rule in self.alarm_rules:
            variable = rule['variable']
            value = process_data.get(variable, None)

            if value is None:
                continue

            # アラーム条件のチェック
            alarm_triggered = self._evaluate_condition(value, rule)

            alarm_id = f"{variable}_{rule['name']}"

            if alarm_triggered:
                # アラーム発生
                if alarm_id not in self.active_alarms:
                    alarm = {
                        'id': alarm_id,
                        'variable': variable,
                        'name': rule['name'],
                        'severity': rule['severity'],
                        'value': value,
                        'condition': rule['condition'],
                        'threshold': rule['threshold'],
                        'timestamp': current_time,
                        'acknowledged': False
                    }

                    self.active_alarms[alarm_id] = alarm
                    self.alarm_history.append(alarm.copy())
                    new_alarms.append(alarm)
            else:
                # アラーム復帰
                if alarm_id in self.active_alarms:
                    alarm = self.active_alarms.pop(alarm_id)
                    alarm['cleared_time'] = current_time
                    alarm['duration'] = (current_time - alarm['timestamp']).total_seconds()

        return new_alarms

    def _evaluate_condition(self, value, rule):
        """アラーム条件の評価"""
        condition = rule['condition']
        threshold = rule['threshold']

        if condition == 'greater_than':
            return value > threshold
        elif condition == 'less_than':
            return value < threshold
        elif condition == 'out_of_range':
            lower, upper = threshold
            return value < lower or value > upper
        elif condition == 'deviation':
            target = rule['target']
            deviation = rule['threshold']
            return abs(value - target) > deviation
        else:
            return False

    def acknowledge_alarm(self, alarm_id):
        """アラームの確認"""
        if alarm_id in self.active_alarms:
            self.active_alarms[alarm_id]['acknowledged'] = True
            return True
        return False

    def get_active_alarms(self, severity=None):
        """
        アクティブアラームの取得

        Parameters:
        -----------
        severity : AlarmSeverity or None
            重要度でフィルタリング

        Returns:
        --------
        alarms : list
        """
        alarms = list(self.active_alarms.values())

        if severity:
            alarms = [a for a in alarms if a['severity'] == severity]

        # 重要度で降順ソート
        alarms.sort(key=lambda x: x['severity'].value, reverse=True)

        return alarms

    def get_alarm_statistics(self):
        """アラーム統計の取得"""
        total_alarms = len(self.alarm_history)
        active_count = len(self.active_alarms)
        acknowledged_count = sum(1 for a in self.active_alarms.values() if a['acknowledged'])

        severity_counts = {}
        for severity in AlarmSeverity:
            count = sum(1 for a in self.alarm_history if a['severity'] == severity)
            severity_counts[severity.name] = count

        stats = {
            'total_alarms': total_alarms,
            'active_alarms': active_count,
            'acknowledged_alarms': acknowledged_count,
            'unacknowledged_alarms': active_count - acknowledged_count,
            'severity_breakdown': severity_counts
        }

        return stats


# アラームルールの定義
alarm_rules = [
    {
        'variable': '反応器温度',
        'name': '高温警告',
        'severity': AlarmSeverity.WARNING,
        'condition': 'greater_than',
        'threshold': 177
    },
    {
        'variable': '反応器温度',
        'name': '高温アラーム',
        'severity': AlarmSeverity.ALARM,
        'condition': 'greater_than',
        'threshold': 180
    },
    {
        'variable': '反応器温度',
        'name': '低温警告',
        'severity': AlarmSeverity.WARNING,
        'condition': 'less_than',
        'threshold': 173
    },
    {
        'variable': '反応器圧力',
        'name': '圧力異常',
        'severity': AlarmSeverity.CRITICAL,
        'condition': 'out_of_range',
        'threshold': (1.4, 1.6)
    },
    {
        'variable': '原料流量',
        'name': '流量偏差',
        'severity': AlarmSeverity.WARNING,
        'condition': 'deviation',
        'target': 50,
        'threshold': 5
    }
]

# アラームマネージャーの初期化
alarm_mgr = AlarmManager(alarm_rules)

# シミュレーション: 1時間のプロセス運転
np.random.seed(42)
n_samples = 60  # 1分ごと、60分間

print("=== プロセスアラーム監視システム ===\n")
print("1時間のシミュレーションを開始します...\n")

for i in range(n_samples):
    # プロセスデータのシミュレーション
    process_data = {
        '反応器温度': 175 + np.random.normal(0, 2) + 5 * np.sin(2 * np.pi * i / 60),
        '反応器圧力': 1.5 + np.random.normal(0, 0.05),
        '原料流量': 50 + np.random.normal(0, 3)
    }

    # アラームチェック
    new_alarms = alarm_mgr.check_alarms(process_data)

    # 新規アラームの表示
    if new_alarms:
        for alarm in new_alarms:
            severity_color = {
                AlarmSeverity.INFO: '🔵',
                AlarmSeverity.WARNING: '🟡',
                AlarmSeverity.ALARM: '🟠',
                AlarmSeverity.CRITICAL: '🔴'
            }
            icon = severity_color.get(alarm['severity'], '⚪')

            print(f"[{alarm['timestamp'].strftime('%H:%M:%S')}] {icon} {alarm['severity'].name}: "
                  f"{alarm['variable']} - {alarm['name']} "
                  f"(値: {alarm['value']:.2f})")

# 最終統計
print("\n" + "="*60)
print("=== アラーム統計サマリー ===")
stats = alarm_mgr.get_alarm_statistics()

print(f"\n総アラーム数: {stats['total_alarms']}")
print(f"アクティブアラーム: {stats['active_alarms']}")
print(f"  - 未確認: {stats['unacknowledged_alarms']}")
print(f"  - 確認済み: {stats['acknowledged_alarms']}")

print("\n重要度別内訳:")
for severity, count in stats['severity_breakdown'].items():
    print(f"  {severity:<10}: {count:>3}件")

# アクティブアラームの一覧
active_alarms = alarm_mgr.get_active_alarms()
if active_alarms:
    print("\n=== 現在のアクティブアラーム ===")
    for alarm in active_alarms:
        duration = (datetime.now() - alarm['timestamp']).total_seconds()
        ack_status = "確認済み" if alarm['acknowledged'] else "未確認"
        print(f"  - [{alarm['severity'].name}] {alarm['variable']}: {alarm['name']} "
              f"({duration:.0f}秒継続中, {ack_status})")
else:
    print("\n現在アクティブなアラームはありません。")

# アラーム履歴のDataFrame化
if alarm_mgr.alarm_history:
    df_alarms = pd.DataFrame(alarm_mgr.alarm_history)
    df_alarms['severity_name'] = df_alarms['severity'].apply(lambda x: x.name)

    print("\n=== アラーム履歴トップ10 ===")
    print(df_alarms[['timestamp', 'variable', 'name', 'severity_name', 'value']].head(10).to_string(index=False))

期待される出力:

=== プロセスアラーム監視システム ===

1時間のシミュレーションを開始します...

[14:23:12] 🟡 WARNING: 反応器温度 - 高温警告 (値: 178.45)
[14:31:45] 🟡 WARNING: 原料流量 - 流量偏差 (値: 56.23)
[14:42:18] 🔴 CRITICAL: 反応器圧力 - 圧力異常 (値: 1.62)

============================================================
=== アラーム統計サマリー ===

総アラーム数: 8
アクティブアラーム: 2
  - 未確認: 2
  - 確認済み: 0

重要度別内訳:
  INFO      :   0件
  WARNING   :   5件
  ALARM     :   1件
  CRITICAL  :   2件

=== 現在のアクティブアラーム ===
  - [CRITICAL] 反応器圧力: 圧力異常 (123秒継続中, 未確認)
  - [WARNING] 反応器温度: 高温警告 (89秒継続中, 未確認)

解説: このアラームシステムは、プロセス変数を連続的に監視し、異常状態を重要度レベル別に通知します。アラーム履歴の記録、確認(Acknowledgement)機能、統計レポートにより、オペレータが効率的にアラームを管理できます。実際のプラントでは、メール通知やSlack連携も実装します。

コード例5: 履歴データトレンド分析とパターン検出

目的: 履歴データから傾向を分析し、ピーク・谷・トレンドを自動検出する。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import find_peaks, savgol_filter

# 日本語フォント設定
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False

def analyze_historical_trends(time_data, values, variable_name):
    """
    履歴データのトレンド分析

    Parameters:
    -----------
    time_data : array-like
        時間データ
    values : array-like
        変数値
    variable_name : str
        変数名

    Returns:
    --------
    analysis : dict
        分析結果
    """
    # 移動平均でトレンド抽出
    window_size = 60  # 60ポイント移動平均
    trend = pd.Series(values).rolling(window=window_size, center=True).mean().values

    # Savitzky-Golayフィルタで平滑化
    if len(values) > 51:
        smoothed = savgol_filter(values, window_length=51, polyorder=3)
    else:
        smoothed = values

    # ピーク検出
    peaks, peak_props = find_peaks(smoothed, prominence=1.5, distance=30)
    valleys, valley_props = find_peaks(-smoothed, prominence=1.5, distance=30)

    # 統計量
    mean_value = np.mean(values)
    std_value = np.std(values)
    max_value = np.max(values)
    min_value = np.min(values)

    # トレンド傾き(最小二乗法)
    x = np.arange(len(values))
    slope = np.polyfit(x, values, 1)[0]

    analysis = {
        'variable': variable_name,
        'trend': trend,
        'smoothed': smoothed,
        'peaks': peaks,
        'valleys': valleys,
        'peak_values': values[peaks],
        'valley_values': values[valleys],
        'mean': mean_value,
        'std': std_value,
        'max': max_value,
        'min': min_value,
        'slope': slope,
        'n_peaks': len(peaks),
        'n_valleys': len(valleys)
    }

    return analysis


# 履歴データ生成(7日間、1時間間隔)
np.random.seed(42)
n_days = 7
n_points = n_days * 24
time_data = pd.date_range('2025-01-01', periods=n_points, freq='1h')

# 反応器温度データ(日周期変動 + トレンド + ノイズ)
base_temp = 175
daily_cycle = 5 * np.sin(2 * np.pi * np.arange(n_points) / 24)  # 日周期
weekly_trend = 3 * np.sin(2 * np.pi * np.arange(n_points) / (7*24))  # 週周期
noise = np.random.normal(0, 1.2, n_points)
temperature = base_temp + daily_cycle + weekly_trend + noise

# トレンド分析実行
analysis = analyze_historical_trends(time_data, temperature, '反応器温度')

# 可視化
fig, axes = plt.subplots(3, 1, figsize=(16, 12))

# 元データとトレンド
axes[0].plot(time_data, temperature, 'b-', linewidth=0.8, alpha=0.5, label='生データ')
axes[0].plot(time_data, analysis['smoothed'], 'r-', linewidth=2, label='平滑化データ')
axes[0].plot(time_data, analysis['trend'], 'g--', linewidth=2, label='トレンド(移動平均)')
axes[0].scatter(time_data[analysis['peaks']], analysis['peak_values'],
                color='red', s=80, marker='^', label='ピーク', zorder=5)
axes[0].scatter(time_data[analysis['valleys']], analysis['valley_values'],
                color='blue', s=80, marker='v', label='谷', zorder=5)
axes[0].axhline(y=analysis['mean'], color='black', linestyle='--', alpha=0.5, label='平均値')
axes[0].set_ylabel('温度 (°C)', fontsize=12)
axes[0].set_title('履歴データトレンド分析 - 反応器温度(7日間)', fontsize=14, fontweight='bold')
axes[0].legend(loc='upper right')
axes[0].grid(alpha=0.3)

# ヒストグラム(分布分析)
axes[1].hist(temperature, bins=30, color='#11998e', alpha=0.7, edgecolor='black')
axes[1].axvline(x=analysis['mean'], color='red', linestyle='--', linewidth=2,
                label=f"平均: {analysis['mean']:.2f}°C")
axes[1].axvline(x=analysis['mean'] + analysis['std'], color='orange', linestyle=':',
                linewidth=2, label=f"±1σ: {analysis['std']:.2f}°C")
axes[1].axvline(x=analysis['mean'] - analysis['std'], color='orange', linestyle=':', linewidth=2)
axes[1].set_xlabel('温度 (°C)', fontsize=12)
axes[1].set_ylabel('頻度', fontsize=12)
axes[1].set_title('温度分布', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)

# 日次統計(ボックスプロット)
df_temp = pd.DataFrame({'timestamp': time_data, 'temperature': temperature})
df_temp['day'] = df_temp['timestamp'].dt.day
daily_data = [df_temp[df_temp['day'] == day]['temperature'].values for day in range(1, n_days+1)]

bp = axes[2].boxplot(daily_data, labels=[f'{i+1}日' for i in range(n_days)],
                      patch_artist=True, showmeans=True)

for patch in bp['boxes']:
    patch.set_facecolor('#11998e')
    patch.set_alpha(0.6)

axes[2].set_xlabel('日', fontsize=12)
axes[2].set_ylabel('温度 (°C)', fontsize=12)
axes[2].set_title('日次温度分布(ボックスプロット)', fontsize=13, fontweight='bold')
axes[2].grid(alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

# 分析結果のサマリー
print("=== 履歴データトレンド分析結果 ===\n")
print(f"変数: {analysis['variable']}")
print(f"期間: {time_data[0]} ~ {time_data[-1]} ({n_days}日間)")
print(f"\n統計量:")
print(f"  平均値: {analysis['mean']:.2f} °C")
print(f"  標準偏差: {analysis['std']:.2f} °C")
print(f"  最大値: {analysis['max']:.2f} °C")
print(f"  最小値: {analysis['min']:.2f} °C")
print(f"  範囲: {analysis['max'] - analysis['min']:.2f} °C")

print(f"\nトレンド:")
print(f"  傾き: {analysis['slope']:.4f} °C/時間")
print(f"  7日間の変化: {analysis['slope'] * n_points:.2f} °C")

print(f"\nパターン検出:")
print(f"  ピーク数: {analysis['n_peaks']}")
print(f"  谷の数: {analysis['n_valleys']}")

if analysis['n_peaks'] > 0:
    print(f"  平均ピーク値: {np.mean(analysis['peak_values']):.2f} °C")
if analysis['n_valleys'] > 0:
    print(f"  平均谷値: {np.mean(analysis['valley_values']):.2f} °C")

# 異常期間の検出(±3σ範囲外)
outliers = np.abs(temperature - analysis['mean']) > 3 * analysis['std']
outlier_count = np.sum(outliers)

print(f"\n異常検出:")
print(f"  ±3σ範囲外のポイント: {outlier_count}個 ({outlier_count/len(temperature)*100:.2f}%)")

if outlier_count > 0:
    outlier_times = time_data[outliers]
    print(f"  最初の異常時刻: {outlier_times[0]}")
    print(f"  最後の異常時刻: {outlier_times[-1]}")

解説: 履歴データ分析は、プロセスの長期的な傾向を把握し、異常パターンを検出するために重要です。このコードは、移動平均によるトレンド抽出、Savitzky-Golayフィルタによる平滑化、ピーク・谷の自動検出を実装しています。ボックスプロットによる日次比較は、周期的なパターンや異常日を特定するのに有効です。

コード例6: KPI計算とダッシュボード表示

目的: プロセス産業で使用される主要KPI(OEE、稼働率、品質率)を計算し、可視化する。

import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots

class ProcessKPICalculator:
    """
    プロセスKPI計算システム

    主要KPI:
    - OEE (Overall Equipment Effectiveness): 設備総合効率
    - 可用率 (Availability)
    - 性能稼働率 (Performance)
    - 品質率 (Quality)
    """

    def __init__(self, planned_production_time, ideal_cycle_time):
        """
        Parameters:
        -----------
        planned_production_time : float
            計画稼働時間(分)
        ideal_cycle_time : float
            理想サイクルタイム(分/個)
        """
        self.planned_production_time = planned_production_time
        self.ideal_cycle_time = ideal_cycle_time

    def calculate_availability(self, actual_production_time):
        """
        可用率 = 実稼働時間 / 計画稼働時間

        Parameters:
        -----------
        actual_production_time : float
            実稼働時間(分)

        Returns:
        --------
        availability : float
            可用率(0-1)
        """
        availability = actual_production_time / self.planned_production_time
        return min(availability, 1.0)

    def calculate_performance(self, actual_output, actual_production_time):
        """
        性能稼働率 = (理想サイクルタイム × 実生産量) / 実稼働時間

        Parameters:
        -----------
        actual_output : int
            実生産量(個)
        actual_production_time : float
            実稼働時間(分)

        Returns:
        --------
        performance : float
            性能稼働率(0-1)
        """
        ideal_production_time = self.ideal_cycle_time * actual_output
        performance = ideal_production_time / actual_production_time
        return min(performance, 1.0)

    def calculate_quality(self, actual_output, good_output):
        """
        品質率 = 良品数 / 総生産量

        Parameters:
        -----------
        actual_output : int
            総生産量(個)
        good_output : int
            良品数(個)

        Returns:
        --------
        quality : float
            品質率(0-1)
        """
        if actual_output == 0:
            return 0.0

        quality = good_output / actual_output
        return min(quality, 1.0)

    def calculate_oee(self, actual_production_time, actual_output, good_output):
        """
        OEE = 可用率 × 性能稼働率 × 品質率

        Parameters:
        -----------
        actual_production_time : float
            実稼働時間(分)
        actual_output : int
            実生産量(個)
        good_output : int
            良品数(個)

        Returns:
        --------
        oee_metrics : dict
            OEEとその構成要素
        """
        availability = self.calculate_availability(actual_production_time)
        performance = self.calculate_performance(actual_output, actual_production_time)
        quality = self.calculate_quality(actual_output, good_output)

        oee = availability * performance * quality

        metrics = {
            'OEE': oee,
            'Availability': availability,
            'Performance': performance,
            'Quality': quality
        }

        return metrics


# KPI計算例
# 化学プラントの1週間のデータ
planned_time = 7 * 24 * 60  # 7日間(分)
ideal_cycle_time = 2.0  # 理想サイクルタイム(分/バッチ)

kpi_calc = ProcessKPICalculator(planned_time, ideal_cycle_time)

# 週次データ
week_data = {
    '月曜': {'actual_time': 22*60, 'output': 620, 'good': 598},
    '火曜': {'actual_time': 23*60, 'output': 680, 'good': 672},
    '水曜': {'actual_time': 21*60, 'output': 610, 'good': 595},
    '木曜': {'actual_time': 23*60, 'output': 685, 'good': 678},
    '金曜': {'actual_time': 22*60, 'output': 650, 'good': 640},
    '土曜': {'actual_time': 18*60, 'output': 520, 'good': 512},
    '日曜': {'actual_time': 15*60, 'output': 430, 'good': 425}
}

# 日次KPI計算
daily_kpis = {}
for day, data in week_data.items():
    kpis = kpi_calc.calculate_oee(
        data['actual_time'],
        data['output'],
        data['good']
    )
    daily_kpis[day] = kpis

# KPIダッシュボードの作成
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=('日次OEE推移', 'OEE構成要素', '週次OEE', '生産量と良品率'),
    specs=[
        [{"type": "scatter"}, {"type": "bar"}],
        [{"type": "indicator"}, {"type": "scatter"}]
    ],
    vertical_spacing=0.15,
    horizontal_spacing=0.12
)

days = list(week_data.keys())
oee_values = [daily_kpis[day]['OEE'] * 100 for day in days]
availability_values = [daily_kpis[day]['Availability'] * 100 for day in days]
performance_values = [daily_kpis[day]['Performance'] * 100 for day in days]
quality_values = [daily_kpis[day]['Quality'] * 100 for day in days]

# 日次OEE推移(左上)
fig.add_trace(
    go.Scatter(
        x=days,
        y=oee_values,
        mode='lines+markers',
        name='OEE',
        line=dict(color='#11998e', width=3),
        marker=dict(size=10)
    ),
    row=1, col=1
)
fig.add_hline(y=85, line_dash="dash", line_color="green",
              annotation_text="目標: 85%", row=1, col=1)
fig.add_hline(y=60, line_dash="dash", line_color="orange",
              annotation_text="最低基準: 60%", row=1, col=1)

# OEE構成要素(右上)
fig.add_trace(
    go.Bar(x=days, y=availability_values, name='可用率',
           marker_color='#11998e'),
    row=1, col=2
)
fig.add_trace(
    go.Bar(x=days, y=performance_values, name='性能稼働率',
           marker_color='#f59e0b'),
    row=1, col=2
)
fig.add_trace(
    go.Bar(x=days, y=quality_values, name='品質率',
           marker_color='#7b2cbf'),
    row=1, col=2
)

# 週次OEE(左下)
weekly_oee = np.mean(oee_values)
fig.add_trace(
    go.Indicator(
        mode="gauge+number+delta",
        value=weekly_oee,
        title={'text': "週次OEE (%)"},
        delta={'reference': 85, 'increasing': {'color': "green"}, 'decreasing': {'color': "red"}},
        gauge={
            'axis': {'range': [None, 100]},
            'bar': {'color': "#11998e"},
            'steps': [
                {'range': [0, 60], 'color': "lightcoral"},
                {'range': [60, 85], 'color': "lightyellow"},
                {'range': [85, 100], 'color': "lightgreen"}
            ],
            'threshold': {
                'line': {'color': "red", 'width': 4},
                'thickness': 0.75,
                'value': 85
            }
        }
    ),
    row=2, col=1
)

# 生産量と良品率(右下)
output_values = [week_data[day]['output'] for day in days]
good_values = [week_data[day]['good'] for day in days]

fig.add_trace(
    go.Scatter(
        x=days,
        y=output_values,
        mode='lines+markers',
        name='総生産量',
        line=dict(color='#11998e', width=2),
        yaxis='y'
    ),
    row=2, col=2
)

fig.add_trace(
    go.Scatter(
        x=days,
        y=good_values,
        mode='lines+markers',
        name='良品数',
        line=dict(color='#4caf50', width=2),
        yaxis='y'
    ),
    row=2, col=2
)

# レイアウト設定
fig.update_xaxes(title_text="曜日", row=1, col=1)
fig.update_xaxes(title_text="曜日", row=1, col=2)
fig.update_xaxes(title_text="曜日", row=2, col=2)

fig.update_yaxes(title_text="OEE (%)", row=1, col=1)
fig.update_yaxes(title_text="割合 (%)", row=1, col=2)
fig.update_yaxes(title_text="生産量 (バッチ)", row=2, col=2)

fig.update_layout(
    title_text="プロセスKPIダッシュボード - 週次レポート",
    title_font_size=20,
    title_x=0.5,
    height=900,
    showlegend=True,
    template="plotly_white"
)

fig.write_html("kpi_dashboard.html")
print("KPIダッシュボードを保存しました。\n")

# KPIレポート
print("=== 週次KPIレポート ===\n")
print(f"{'曜日':<8} {'OEE':>7} {'可用率':>7} {'性能':>7} {'品質':>7} {'生産量':>8} {'良品':>8}")
print("-" * 65)

for day in days:
    kpi = daily_kpis[day]
    data = week_data[day]
    print(f"{day:<8} "
          f"{kpi['OEE']*100:>6.1f}% "
          f"{kpi['Availability']*100:>6.1f}% "
          f"{kpi['Performance']*100:>6.1f}% "
          f"{kpi['Quality']*100:>6.1f}% "
          f"{data['output']:>7}個 "
          f"{data['good']:>7}個")

print("-" * 65)
print(f"週次平均: {weekly_oee:>5.1f}%\n")

# 改善提案
print("=== 改善提案 ===")
avg_availability = np.mean(availability_values)
avg_performance = np.mean(performance_values)
avg_quality = np.mean(quality_values)

bottleneck = min([
    ('可用率', avg_availability),
    ('性能稼働率', avg_performance),
    ('品質率', avg_quality)
], key=lambda x: x[1])

print(f"最大のボトルネック: {bottleneck[0]} ({bottleneck[1]:.1f}%)")

if bottleneck[0] == '可用率':
    print("  → 設備停止時間の削減、予知保全の導入を検討")
elif bottleneck[0] == '性能稼働率':
    print("  → プロセス最適化、ボトルネック工程の改善を検討")
else:
    print("  → 品質管理強化、不良原因の特定と対策を検討")

# OEEクラス分類
if weekly_oee >= 85:
    oee_class = "世界クラス"
elif weekly_oee >= 60:
    oee_class = "平均的"
else:
    oee_class = "改善が必要"

print(f"\nOEE評価: {oee_class} (目標: 85%以上)")

期待される出力:

=== 週次KPIレポート ===

曜日        OEE   可用率   性能   品質   生産量     良品
-----------------------------------------------------------------
月曜       84.4%  91.7%  93.5%  96.5%     620個     598個
火曜       91.2%  95.8%  97.1%  98.8%     680個     672個
水曜       82.1%  87.5%  95.2%  97.5%     610個     595個
木曜       91.8%  95.8%  97.5%  99.0%     685個     678個
金曜       87.9%  91.7%  97.2%  98.5%     650個     640個
土曜       85.2%  75.0%  96.3%  98.5%     520個     512個
日曜       83.6%  62.5%  95.3%  98.8%     430個     425個
-----------------------------------------------------------------
週次平均:  86.6%

=== 改善提案 ===
最大のボトルネック: 可用率 (85.7%)
  → 設備停止時間の削減、予知保全の導入を検討

OEE評価: 世界クラス (目標: 85%以上)

解説: OEE(設備総合効率)は、製造業・プロセス産業で最も重要なKPIの一つです。可用率、性能稼働率、品質率の3要素で構成され、設備の総合的な生産性を評価します。このコードは、日次・週次のOEE計算、可視化、ボトルネック分析、改善提案までを自動化しています。

コード例7: プロセス状態の可視化(有限状態機械)

目的: プロセスの運転状態を有限状態機械でモデル化し、状態遷移を可視化する。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from enum import Enum

# 日本語フォント設定
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False

class ProcessState(Enum):
    """プロセス運転状態"""
    STOPPED = 0      # 停止
    STARTUP = 1      # 起動中
    RUNNING = 2      # 定常運転
    SHUTDOWN = 3     # 停止操作中
    ALARM = 4        # アラーム状態
    MAINTENANCE = 5  # メンテナンス

class ProcessStateMachine:
    """
    プロセス状態機械

    状態遷移ルール:
    STOPPED → STARTUP → RUNNING
    RUNNING → SHUTDOWN → STOPPED
    ANY → ALARM (アラーム発生時)
    ALARM → 前の状態 (アラーム復帰時)
    STOPPED ↔ MAINTENANCE
    """

    def __init__(self, initial_state=ProcessState.STOPPED):
        self.current_state = initial_state
        self.previous_state = None
        self.state_history = [(pd.Timestamp.now(), initial_state)]

        # 状態遷移ルール
        self.transitions = {
            ProcessState.STOPPED: [ProcessState.STARTUP, ProcessState.MAINTENANCE],
            ProcessState.STARTUP: [ProcessState.RUNNING, ProcessState.ALARM],
            ProcessState.RUNNING: [ProcessState.SHUTDOWN, ProcessState.ALARM],
            ProcessState.SHUTDOWN: [ProcessState.STOPPED, ProcessState.ALARM],
            ProcessState.ALARM: [],  # アラームからは任意の状態に復帰可能
            ProcessState.MAINTENANCE: [ProcessState.STOPPED]
        }

    def transition(self, new_state, timestamp=None):
        """
        状態遷移

        Parameters:
        -----------
        new_state : ProcessState
            新しい状態
        timestamp : pd.Timestamp or None
            遷移時刻

        Returns:
        --------
        success : bool
            遷移の成功/失敗
        """
        if timestamp is None:
            timestamp = pd.Timestamp.now()

        # アラーム状態への遷移は常に許可
        if new_state == ProcessState.ALARM:
            self.previous_state = self.current_state
            self.current_state = new_state
            self.state_history.append((timestamp, new_state))
            return True

        # アラーム状態からの復帰
        if self.current_state == ProcessState.ALARM:
            self.current_state = new_state
            self.state_history.append((timestamp, new_state))
            return True

        # 通常の状態遷移ルールチェック
        if new_state in self.transitions[self.current_state]:
            self.current_state = new_state
            self.state_history.append((timestamp, new_state))
            return True

        # 不正な遷移
        return False

    def get_state_duration(self):
        """現在の状態の継続時間(秒)"""
        if len(self.state_history) < 2:
            return 0

        current_time = pd.Timestamp.now()
        last_transition = self.state_history[-1][0]
        duration = (current_time - last_transition).total_seconds()

        return duration

    def get_state_statistics(self):
        """状態別の統計情報"""
        if len(self.state_history) < 2:
            return {}

        df_history = pd.DataFrame(self.state_history, columns=['timestamp', 'state'])
        df_history['duration'] = df_history['timestamp'].diff().shift(-1).dt.total_seconds()

        stats = {}
        for state in ProcessState:
            state_data = df_history[df_history['state'] == state]
            if len(state_data) > 0:
                stats[state.name] = {
                    'count': len(state_data),
                    'total_duration': state_data['duration'].sum(),
                    'avg_duration': state_data['duration'].mean()
                }

        return stats


# プロセス状態機械のシミュレーション
print("=== プロセス状態機械シミュレーション ===\n")

# 初期化
state_machine = ProcessStateMachine(initial_state=ProcessState.STOPPED)

# 1日間のプロセス運転シミュレーション
start_time = pd.Timestamp('2025-01-01 00:00:00')

# 状態遷移シナリオ
transitions_scenario = [
    (start_time + pd.Timedelta(hours=0), ProcessState.STOPPED),
    (start_time + pd.Timedelta(hours=1), ProcessState.STARTUP),
    (start_time + pd.Timedelta(hours=2), ProcessState.RUNNING),
    (start_time + pd.Timedelta(hours=8), ProcessState.ALARM),      # アラーム発生
    (start_time + pd.Timedelta(hours=8.5), ProcessState.RUNNING),  # アラーム復帰
    (start_time + pd.Timedelta(hours=16), ProcessState.SHUTDOWN),
    (start_time + pd.Timedelta(hours=17), ProcessState.STOPPED),
    (start_time + pd.Timedelta(hours=18), ProcessState.MAINTENANCE),
    (start_time + pd.Timedelta(hours=22), ProcessState.STOPPED),
]

# 状態遷移実行
for timestamp, new_state in transitions_scenario[1:]:
    success = state_machine.transition(new_state, timestamp)
    if success:
        print(f"[{timestamp.strftime('%H:%M')}] {state_machine.current_state.name}")
    else:
        print(f"[{timestamp.strftime('%H:%M')}] 不正な遷移: → {new_state.name}")

# 状態履歴の可視化
df_history = pd.DataFrame(state_machine.state_history, columns=['timestamp', 'state'])
df_history['state_code'] = df_history['state'].apply(lambda x: x.value)
df_history['state_name'] = df_history['state'].apply(lambda x: x.name)

# Ganttチャート風の可視化
fig, ax = plt.subplots(figsize=(16, 6))

# 状態ごとに色を定義
state_colors = {
    ProcessState.STOPPED: '#gray',
    ProcessState.STARTUP: '#ffeb3b',
    ProcessState.RUNNING: '#4caf50',
    ProcessState.SHUTDOWN: '#ff9800',
    ProcessState.ALARM: '#f44336',
    ProcessState.MAINTENANCE: '#2196f3'
}

# 各状態期間をバーで表示
for i in range(len(df_history) - 1):
    start = df_history.iloc[i]['timestamp']
    end = df_history.iloc[i + 1]['timestamp']
    state = df_history.iloc[i]['state']

    ax.barh(
        0,
        width=(end - start).total_seconds() / 3600,  # 時間単位
        left=(start - start_time).total_seconds() / 3600,
        height=0.5,
        color=state_colors[state],
        edgecolor='black',
        linewidth=1.5,
        label=state.name if i == 0 or df_history.iloc[i-1]['state'] != state else ""
    )

    # 状態名をバーの中央に表示
    duration = (end - start).total_seconds() / 3600
    if duration > 0.5:  # 30分以上の状態のみラベル表示
        mid_point = (start - start_time).total_seconds() / 3600 + duration / 2
        ax.text(mid_point, 0, state.name, ha='center', va='center',
                fontsize=11, fontweight='bold', color='white')

# 凡例(重複を除去)
handles, labels = ax.get_legend_handles_labels()
by_label = dict(zip(labels, handles))
ax.legend(by_label.values(), by_label.keys(), loc='upper right', fontsize=10)

ax.set_xlabel('時刻(時)', fontsize=12)
ax.set_yticks([])
ax.set_xlim(0, 24)
ax.set_title('プロセス状態遷移タイムライン(24時間)', fontsize=14, fontweight='bold')
ax.grid(alpha=0.3, axis='x')

plt.tight_layout()
plt.show()

# 統計情報
print("\n=== 状態統計 ===")
stats = state_machine.get_state_statistics()

print(f"\n{'状態':<15} {'回数':>6} {'合計時間':>10} {'平均時間':>10}")
print("-" * 50)

for state_name, stat in stats.items():
    total_hours = stat['total_duration'] / 3600
    avg_hours = stat['avg_duration'] / 3600
    print(f"{state_name:<15} {stat['count']:>6}回 "
          f"{total_hours:>9.2f}時間 {avg_hours:>9.2f}時間")

# 稼働率計算
total_time = 24  # 時間
running_time = stats.get('RUNNING', {}).get('total_duration', 0) / 3600
availability = running_time / total_time * 100

print(f"\n稼働率: {availability:.1f}% ({running_time:.2f}時間 / {total_time}時間)")

期待される出力:

=== プロセス状態機械シミュレーション ===

[01:00] STARTUP
[02:00] RUNNING
[08:00] ALARM
[08:30] RUNNING
[16:00] SHUTDOWN
[17:00] STOPPED
[18:00] MAINTENANCE
[22:00] STOPPED

=== 状態統計 ===

状態                回数     合計時間     平均時間
--------------------------------------------------
STOPPED             2回      5.00時間      2.50時間
STARTUP             1回      1.00時間      1.00時間
RUNNING             2回     13.50時間      6.75時間
ALARM               1回      0.50時間      0.50時間
SHUTDOWN            1回      1.00時間      1.00時間
MAINTENANCE         1回      4.00時間      4.00時間

稼働率: 56.2% (13.50時間 / 24時間)

解説: 有限状態機械(FSM)は、プロセスの運転状態を明確にモデル化し、状態遷移を管理する強力なツールです。このコードは、プロセスの起動、定常運転、停止、アラーム、メンテナンスの各状態と、それらの間の遷移ルールを実装しています。タイムラインの可視化により、オペレータがプロセスの運転履歴を直感的に把握できます。

コード例8: 完全な統合監視システム - 化学反応器ケーススタディ

目的: これまで学んだ全ての要素を統合し、化学反応器の完全な監視システムを構築する。

import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from collections import deque
from datetime import datetime

class IntegratedProcessMonitoringSystem:
    """
    統合プロセス監視システム

    機能:
    - リアルタイムデータ収集
    - マルチ変数トレンド監視
    - アラーム管理
    - KPI計算
    - 状態管理
    """

    def __init__(self, process_name):
        self.process_name = process_name

        # データバッファ(1時間分、1秒間隔)
        self.buffer_size = 3600
        self.time_buffer = deque(maxlen=self.buffer_size)
        self.data_buffers = {
            '反応器温度': deque(maxlen=self.buffer_size),
            'ジャケット温度': deque(maxlen=self.buffer_size),
            '反応器圧力': deque(maxlen=self.buffer_size),
            '原料流量': deque(maxlen=self.buffer_size),
            '生成物濃度': deque(maxlen=self.buffer_size)
        }

        # アラームシステム
        self.active_alarms = []

        # KPIデータ
        self.kpi_data = {
            'production_count': 0,
            'good_count': 0,
            'running_time': 0
        }

        # プロセス状態
        self.process_state = 'RUNNING'
        self.iteration = 0

    def generate_process_data(self):
        """プロセスデータの生成(実際はPLC/DCSから取得)"""
        elapsed = self.iteration

        # 反応器温度(目標175°C)
        temp_base = 175.0
        temp_noise = np.random.normal(0, 1.2)
        temp_disturbance = 3 * np.sin(2 * np.pi * elapsed / 300)  # 5分周期
        reactor_temp = temp_base + temp_noise + temp_disturbance

        # ジャケット温度(反応器温度より低い)
        jacket_temp = reactor_temp - 10 + np.random.normal(0, 0.8)

        # 圧力(目標1.5 MPa)
        pressure = 1.5 + np.random.normal(0, 0.03)

        # 原料流量(目標50 m³/h)
        flow_rate = 50 + np.random.normal(0, 2.5)

        # 生成物濃度(目標80%)
        concentration = 80 + np.random.normal(0, 3) - 0.5 * (reactor_temp - 175)

        return {
            '反応器温度': reactor_temp,
            'ジャケット温度': jacket_temp,
            '反応器圧力': pressure,
            '原料流量': flow_rate,
            '生成物濃度': concentration
        }

    def check_process_alarms(self, data):
        """プロセスアラームのチェック"""
        alarms = []

        # 温度アラーム
        if data['反応器温度'] > 180:
            alarms.append({'severity': 'CRITICAL', 'variable': '反応器温度',
                          'message': '高温アラーム', 'value': data['反応器温度']})
        elif data['反応器温度'] > 177:
            alarms.append({'severity': 'WARNING', 'variable': '反応器温度',
                          'message': '高温警告', 'value': data['反応器温度']})
        elif data['反応器温度'] < 173:
            alarms.append({'severity': 'WARNING', 'variable': '反応器温度',
                          'message': '低温警告', 'value': data['反応器温度']})

        # 圧力アラーム
        if data['反応器圧力'] > 1.6 or data['反応器圧力'] < 1.4:
            alarms.append({'severity': 'CRITICAL', 'variable': '反応器圧力',
                          'message': '圧力異常', 'value': data['反応器圧力']})

        # 濃度アラーム
        if data['生成物濃度'] < 75:
            alarms.append({'severity': 'WARNING', 'variable': '生成物濃度',
                          'message': '品質低下', 'value': data['生成物濃度']})

        self.active_alarms = alarms
        return alarms

    def update(self):
        """システムの更新(1秒ごとに呼び出される)"""
        current_time = datetime.now()
        data = self.generate_process_data()

        # データバッファに追加
        self.time_buffer.append(current_time)
        for var, value in data.items():
            self.data_buffers[var].append(value)

        # アラームチェック
        alarms = self.check_process_alarms(data)

        # KPI更新
        self.kpi_data['running_time'] += 1  # 秒

        self.iteration += 1

        return data, alarms

    def create_integrated_dashboard(self):
        """統合ダッシュボードの作成"""
        if len(self.time_buffer) < 10:
            return None

        # データをnumpy配列に変換
        time_array = list(self.time_buffer)
        reactor_temp = np.array(list(self.data_buffers['反応器温度']))
        jacket_temp = np.array(list(self.data_buffers['ジャケット温度']))
        pressure = np.array(list(self.data_buffers['反応器圧力']))
        flow_rate = np.array(list(self.data_buffers['原料流量']))
        concentration = np.array(list(self.data_buffers['生成物濃度']))

        # サブプロットの作成
        fig = make_subplots(
            rows=3, cols=2,
            subplot_titles=(
                '反応器温度トレンド', 'ジャケット温度トレンド',
                '反応器圧力トレンド', '原料流量トレンド',
                '生成物濃度トレンド', '現在のプロセス状態'
            ),
            specs=[
                [{"type": "scatter"}, {"type": "scatter"}],
                [{"type": "scatter"}, {"type": "scatter"}],
                [{"type": "scatter"}, {"type": "indicator"}]
            ],
            vertical_spacing=0.12,
            horizontal_spacing=0.12
        )

        # 反応器温度
        fig.add_trace(
            go.Scatter(x=time_array, y=reactor_temp, mode='lines',
                      name='反応器温度', line=dict(color='#11998e', width=2)),
            row=1, col=1
        )
        fig.add_hline(y=175, line_dash="dash", line_color="red", row=1, col=1)
        fig.add_hrect(y0=173, y1=177, fillcolor="green", opacity=0.1,
                      line_width=0, row=1, col=1)

        # ジャケット温度
        fig.add_trace(
            go.Scatter(x=time_array, y=jacket_temp, mode='lines',
                      name='ジャケット温度', line=dict(color='#f59e0b', width=2)),
            row=1, col=2
        )

        # 圧力
        fig.add_trace(
            go.Scatter(x=time_array, y=pressure, mode='lines',
                      name='圧力', line=dict(color='#7b2cbf', width=2)),
            row=2, col=1
        )
        fig.add_hline(y=1.5, line_dash="dash", line_color="red", row=2, col=1)

        # 流量
        fig.add_trace(
            go.Scatter(x=time_array, y=flow_rate, mode='lines',
                      name='流量', line=dict(color='#e63946', width=2)),
            row=2, col=2
        )
        fig.add_hline(y=50, line_dash="dash", line_color="red", row=2, col=2)

        # 濃度
        fig.add_trace(
            go.Scatter(x=time_array, y=concentration, mode='lines',
                      name='濃度', line=dict(color='#06a77d', width=2)),
            row=3, col=1
        )
        fig.add_hline(y=80, line_dash="dash", line_color="red", row=3, col=1)

        # プロセス状態インジケータ
        current_temp = reactor_temp[-1]
        fig.add_trace(
            go.Indicator(
                mode="gauge+number",
                value=current_temp,
                title={'text': "反応器温度 (°C)"},
                gauge={
                    'axis': {'range': [None, 200]},
                    'bar': {'color': "#11998e"},
                    'steps': [
                        {'range': [0, 173], 'color': "lightblue"},
                        {'range': [173, 177], 'color': "lightgreen"},
                        {'range': [177, 200], 'color': "lightcoral"}
                    ],
                    'threshold': {
                        'line': {'color': "red", 'width': 4},
                        'thickness': 0.75,
                        'value': 175
                    }
                }
            ),
            row=3, col=2
        )

        # 軸ラベル
        fig.update_yaxes(title_text="温度 (°C)", row=1, col=1)
        fig.update_yaxes(title_text="温度 (°C)", row=1, col=2)
        fig.update_yaxes(title_text="圧力 (MPa)", row=2, col=1)
        fig.update_yaxes(title_text="流量 (m³/h)", row=2, col=2)
        fig.update_yaxes(title_text="濃度 (%)", row=3, col=1)

        fig.update_layout(
            title_text=f"{self.process_name} - 統合監視ダッシュボード",
            title_font_size=20,
            title_x=0.5,
            height=1000,
            showlegend=False,
            template="plotly_white"
        )

        return fig


# 統合監視システムのデモンストレーション
print("=== 統合プロセス監視システム ===")
print("化学反応器 R-101 監視システム起動\n")

# システム初期化
monitoring_system = IntegratedProcessMonitoringSystem("化学反応器 R-101")

# 10分間のシミュレーション(600秒)
print("10分間のプロセス監視を開始します...\n")

alarm_count = 0
for i in range(600):
    data, alarms = monitoring_system.update()

    # 30秒ごとに状態表示
    if (i + 1) % 30 == 0:
        print(f"[{i+1:>3}秒] "
              f"温度: {data['反応器温度']:>6.2f}°C | "
              f"圧力: {data['反応器圧力']:>5.3f} MPa | "
              f"濃度: {data['生成物濃度']:>5.2f}% | "
              f"アラーム: {len(alarms)}件")

        if alarms:
            for alarm in alarms:
                severity_icon = '🔴' if alarm['severity'] == 'CRITICAL' else '🟡'
                print(f"  {severity_icon} {alarm['variable']}: {alarm['message']} ({alarm['value']:.2f})")
            alarm_count += len(alarms)

print("\nプロセス監視完了!\n")

# 統合ダッシュボードの生成
fig = monitoring_system.create_integrated_dashboard()
if fig:
    fig.write_html("integrated_monitoring_system.html")
    print("統合ダッシュボードを 'integrated_monitoring_system.html' に保存しました。\n")

# 最終統計
print("=== プロセス統計サマリー ===")
print(f"監視時間: {monitoring_system.kpi_data['running_time']} 秒 (10分)")
print(f"総アラーム数: {alarm_count}件")

# 各変数の統計
print("\n変数統計:")
for var_name, buffer in monitoring_system.data_buffers.items():
    data_array = np.array(list(buffer))
    print(f"  {var_name:<15}: 平均 {np.mean(data_array):>7.2f}, "
          f"標準偏差 {np.std(data_array):>5.2f}, "
          f"最大 {np.max(data_array):>7.2f}, "
          f"最小 {np.min(data_array):>7.2f}")

print("\nシステム正常終了。")

解説: このコードは、本章で学んだ全ての要素(リアルタイムデータ収集、マルチ変数監視、アラーム管理、KPI計算、ダッシュボード可視化)を統合した完全なプロセス監視システムです。化学反応器を例に、実際のプラントで使用される監視システムの構成を実装しています。実運用では、このシステムをWebアプリケーション(Dash, Streamlit等)として展開し、複数のオペレータがリアルタイムでアクセスできるようにします。


5.3 本章のまとめ

学んだこと

  1. リアルタイム監視システムのアーキテクチャ
    • データ収集層、処理層、可視化層、保存層の4層構造
    • データフロー: センサー → バッファリング → 処理 → 可視化 → 保存
    • システム設計の原則とスケーラビリティ
  2. ダッシュボード設計と可視化
    • Plotlyによるインタラクティブなダッシュボード構築
    • トレンドチャート、ゲージ、インジケータの効果的な配置
    • オペレータ向けUIデザインの原則
  3. リアルタイムデータ処理
    • dequeによる効率的なデータバッファリング
    • ストリーミングデータの統計処理
    • リアルタイム異常検知
  4. アラーム管理システム
    • 重要度レベル別のアラーム分類(INFO, WARNING, ALARM, CRITICAL)
    • アラーム履歴管理とアクノリッジメント
    • アラームフラッド防止戦略
  5. KPIとプロセス分析
    • OEE(設備総合効率)の計算と評価
    • 履歴データトレンド分析とパターン検出
    • ボトルネック分析と改善提案
  6. 統合監視システム
    • 全機能を統合した完全なシステム構築
    • 化学反応器の実践的なケーススタディ
    • 実運用への展開戦略

重要なポイント

実務での応用

本章で学んだ監視システムは、以下のような実プロセスで活用できます:

次のステップ

シリーズを完了したあなたは、以下のスキルを習得しました:

さらなる学習のために:

  1. モデル予測制御(MPC): 多変数制御の高度な手法
  2. デジタルツイン: プロセスの仮想モデル構築と予測シミュレーション
  3. 強化学習制御: AIによる適応的プロセス制御
  4. ソフトセンサー開発: 機械学習による困難な変数の推定
  5. クラウド統合: AWS/Azure/GCPとの連携とビッグデータ分析

シリーズ完了おめでとうございます!

あなたは「プロセスモニタリング・制御入門シリーズ v1.0」の全5章を完了しました。センサーデータの取得から、統計的プロセス管理、異常検知、PID制御、そしてリアルタイム監視システムの構築まで、プロセスエンジニアリングの包括的な知識とスキルを習得しました。

この知識を活かして、次のステップへ進みましょう:

あなたのフィードバックをお待ちしています!
このシリーズの改善提案、質問、成功事例などがあれば、ぜひお知らせください。

連絡先: yusuke.hashimoto.b8@tohoku.ac.jp

免責事項