学習目標
この章を読むことで、以下を習得できます:
- ✅ リアルタイム監視システムのアーキテクチャを設計できる
- ✅ Plotlyでインタラクティブなダッシュボードレイアウトを構築できる
- ✅ シミュレートされたリアルタイムデータストリーミングを実装できる
- ✅ マルチチャート監視インターフェース(温度、圧力、流量)を開発できる
- ✅ アラーム通知システムを設計・実装できる
- ✅ 履歴データ分析とKPI計算ができる
- ✅ プロセス状態を有限状態機械で可視化できる
- ✅ 完全な統合監視システム(化学反応器の実例)を構築できる
5.1 リアルタイム監視システムのアーキテクチャ
システム全体設計
実時間プロセス監視システムは、データ収集、処理、可視化、保存の4つの主要コンポーネントから構成されます。
リアルタイムデータフロー
監視システムでは、以下のデータフローが連続的に実行されます:
- データ取得: センサー → PLC → データ収集サーバー(1秒〜1分間隔)
- バッファリング: dequeやリングバッファでメモリ効率的に保持
- リアルタイム処理: 統計計算、異常検知、制御ループ実行
- 可視化更新: グラフ、ゲージ、アラーム表示(1秒〜10秒間隔)
- データベース保存: 長期保存用Historian(1分〜1時間間隔)
5.2 コード例:リアルタイム監視システムの実装
コード例1: Plotlyによるリアルタイムダッシュボードレイアウト設計
目的: Plotlyを使って、プロセス監視用のダッシュボードレイアウトを設計する(静的デモンストレーション)。
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import pandas as pd
# シミュレーションデータ生成
np.random.seed(42)
time_points = 100
time = pd.date_range('2025-01-01 00:00:00', periods=time_points, freq='1min')
# プロセス変数データ
temperature = 175 + np.random.normal(0, 2, time_points) + 3 * np.sin(np.linspace(0, 4*np.pi, time_points))
pressure = 1.5 + np.random.normal(0, 0.05, time_points)
flow_rate = 50 + np.random.normal(0, 3, time_points)
# ダッシュボードレイアウトの作成
# 4つのサブプロットを配置: 温度トレンド、圧力トレンド、流量トレンド、温度ゲージ
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('反応器温度トレンド', '圧力トレンド', '流量トレンド', '現在温度'),
specs=[
[{"type": "scatter"}, {"type": "scatter"}],
[{"type": "scatter"}, {"type": "indicator"}]
],
vertical_spacing=0.12,
horizontal_spacing=0.1
)
# 温度トレンド(左上)
fig.add_trace(
go.Scatter(
x=time,
y=temperature,
mode='lines',
name='温度',
line=dict(color='#11998e', width=2),
fill='tozeroy',
fillcolor='rgba(17, 153, 142, 0.1)'
),
row=1, col=1
)
# 管理限界線の追加
fig.add_hline(y=175, line_dash="dash", line_color="red",
annotation_text="目標値", row=1, col=1)
fig.add_hrect(y0=173, y1=177, fillcolor="green", opacity=0.1,
line_width=0, row=1, col=1)
# 圧力トレンド(右上)
fig.add_trace(
go.Scatter(
x=time,
y=pressure,
mode='lines',
name='圧力',
line=dict(color='#f59e0b', width=2)
),
row=1, col=2
)
fig.add_hline(y=1.5, line_dash="dash", line_color="red", row=1, col=2)
# 流量トレンド(左下)
fig.add_trace(
go.Scatter(
x=time,
y=flow_rate,
mode='lines',
name='流量',
line=dict(color='#7b2cbf', width=2)
),
row=2, col=1
)
fig.add_hline(y=50, line_dash="dash", line_color="red", row=2, col=1)
# 温度ゲージ(右下)
current_temp = temperature[-1]
fig.add_trace(
go.Indicator(
mode="gauge+number+delta",
value=current_temp,
title={'text': "反応器温度 (°C)"},
delta={'reference': 175, 'increasing': {'color': "red"}, 'decreasing': {'color': "blue"}},
gauge={
'axis': {'range': [None, 200]},
'bar': {'color': "#11998e"},
'steps': [
{'range': [0, 173], 'color': "lightblue"},
{'range': [173, 177], 'color': "lightgreen"},
{'range': [177, 200], 'color': "lightcoral"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 175
}
}
),
row=2, col=2
)
# レイアウト設定
fig.update_xaxes(title_text="時刻", row=1, col=1)
fig.update_xaxes(title_text="時刻", row=1, col=2)
fig.update_xaxes(title_text="時刻", row=2, col=1)
fig.update_yaxes(title_text="温度 (°C)", row=1, col=1)
fig.update_yaxes(title_text="圧力 (MPa)", row=1, col=2)
fig.update_yaxes(title_text="流量 (m³/h)", row=2, col=1)
fig.update_layout(
title_text="プロセス監視ダッシュボード - 化学反応器",
title_font_size=20,
title_x=0.5,
height=800,
showlegend=False,
template="plotly_white"
)
# HTMLとして保存(ブラウザで表示可能)
fig.write_html("process_monitoring_dashboard.html")
print("ダッシュボードを 'process_monitoring_dashboard.html' に保存しました。")
print("ブラウザで開いて確認してください。")
# 統計サマリー
print("\n=== 現在のプロセス状態 ===")
print(f"反応器温度: {current_temp:.2f} °C (目標: 175°C)")
print(f"圧力: {pressure[-1]:.3f} MPa (目標: 1.5 MPa)")
print(f"流量: {flow_rate[-1]:.2f} m³/h (目標: 50 m³/h)")
# アラーム状態チェック
temp_alarm = "正常" if 173 <= current_temp <= 177 else "警告"
pressure_alarm = "正常" if 1.45 <= pressure[-1] <= 1.55 else "警告"
flow_alarm = "正常" if 45 <= flow_rate[-1] <= 55 else "警告"
print(f"\n=== アラーム状態 ===")
print(f"温度: {temp_alarm}")
print(f"圧力: {pressure_alarm}")
print(f"流量: {flow_alarm}")
解説: このコードは、Plotlyを使ってプロセス監視ダッシュボードのレイアウトを設計します。トレンドチャート(時系列データ)とゲージ(現在値)を組み合わせることで、オペレータが直感的にプロセス状態を把握できるインターフェースを構築できます。実際のプロセスでは、これをWebアプリケーションとして展開します。
コード例2: シミュレートされたリアルタイムデータストリーミング
目的: dequeバッファを使って、リアルタイムデータストリーミングをシミュレートする。
import numpy as np
import pandas as pd
from collections import deque
import matplotlib.pyplot as plt
import time
# 日本語フォント設定
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
class RealTimeDataStream:
"""
リアルタイムデータストリーミングシミュレーター
Parameters:
-----------
buffer_size : int
データバッファのサイズ
sampling_interval : float
サンプリング間隔(秒)
"""
def __init__(self, buffer_size=100, sampling_interval=1.0):
self.buffer_size = buffer_size
self.sampling_interval = sampling_interval
# データバッファ(固定長キュー)
self.time_buffer = deque(maxlen=buffer_size)
self.temp_buffer = deque(maxlen=buffer_size)
self.pressure_buffer = deque(maxlen=buffer_size)
self.flow_buffer = deque(maxlen=buffer_size)
# 開始時刻
self.start_time = pd.Timestamp.now()
self.iteration = 0
def generate_sensor_data(self):
"""センサーデータの生成(実際はPLC/DCSから取得)"""
elapsed = self.iteration * self.sampling_interval
# 反応器温度(周期変動 + ノイズ)
temp_base = 175.0
temp_variation = 3.0 * np.sin(2 * np.pi * elapsed / 300)
temp_noise = np.random.normal(0, 0.8)
temperature = temp_base + temp_variation + temp_noise
# 圧力(わずかな変動)
pressure = 1.5 + np.random.normal(0, 0.02)
# 流量(ステップ変化をシミュレート)
if elapsed < 60:
flow_base = 50.0
elif elapsed < 120:
flow_base = 55.0 # 60秒で流量増加
else:
flow_base = 50.0 # 120秒で元に戻る
flow_rate = flow_base + np.random.normal(0, 2.0)
return temperature, pressure, flow_rate
def update(self):
"""データストリームの更新"""
current_time = self.start_time + pd.Timedelta(seconds=self.iteration * self.sampling_interval)
temp, pressure, flow = self.generate_sensor_data()
# バッファに追加
self.time_buffer.append(current_time)
self.temp_buffer.append(temp)
self.pressure_buffer.append(pressure)
self.flow_buffer.append(flow)
self.iteration += 1
return current_time, temp, pressure, flow
def get_statistics(self):
"""バッファ内データの統計量"""
if len(self.temp_buffer) == 0:
return None
stats = {
'temp_mean': np.mean(self.temp_buffer),
'temp_std': np.std(self.temp_buffer),
'temp_latest': self.temp_buffer[-1],
'pressure_mean': np.mean(self.pressure_buffer),
'pressure_latest': self.pressure_buffer[-1],
'flow_mean': np.mean(self.flow_buffer),
'flow_latest': self.flow_buffer[-1],
'buffer_utilization': len(self.temp_buffer) / self.buffer_size * 100
}
return stats
# リアルタイムストリーミングのデモンストレーション
print("=== リアルタイムデータストリーミング開始 ===")
print("180秒間のデータ収集を行います...\n")
stream = RealTimeDataStream(buffer_size=200, sampling_interval=1.0)
# データ収集(180秒間、1秒ごと)
duration = 180 # 秒
for i in range(duration):
timestamp, temp, pressure, flow = stream.update()
# 10秒ごとに進捗表示
if (i + 1) % 10 == 0:
stats = stream.get_statistics()
print(f"[{timestamp.strftime('%H:%M:%S')}] "
f"温度: {temp:.2f}°C (平均: {stats['temp_mean']:.2f}°C) | "
f"圧力: {pressure:.3f} MPa | "
f"流量: {flow:.2f} m³/h")
# 実際のリアルタイムシステムではtime.sleep()を使用
# ここでは高速化のためスキップ
print("\nデータ収集完了!\n")
# 収集データの可視化
fig, axes = plt.subplots(3, 1, figsize=(14, 12))
# 温度トレンド
axes[0].plot(list(stream.time_buffer), list(stream.temp_buffer),
color='#11998e', linewidth=1.5, label='温度')
axes[0].axhline(y=175, color='red', linestyle='--', linewidth=2, label='目標値')
axes[0].fill_between(list(stream.time_buffer), 173, 177, alpha=0.15, color='green', label='管理範囲')
axes[0].set_ylabel('温度 (°C)', fontsize=12)
axes[0].set_title('リアルタイムストリーミングデータ - 反応器温度', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)
# 圧力トレンド
axes[1].plot(list(stream.time_buffer), list(stream.pressure_buffer),
color='#f59e0b', linewidth=1.5, label='圧力')
axes[1].axhline(y=1.5, color='red', linestyle='--', linewidth=2, label='目標値')
axes[1].set_ylabel('圧力 (MPa)', fontsize=12)
axes[1].set_title('圧力トレンド', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
# 流量トレンド
axes[2].plot(list(stream.time_buffer), list(stream.flow_buffer),
color='#7b2cbf', linewidth=1.5, label='流量')
axes[2].axhline(y=50, color='red', linestyle='--', linewidth=2, label='目標値')
axes[2].axvline(x=list(stream.time_buffer)[60], color='orange', linestyle=':', alpha=0.5, label='流量変化')
axes[2].axvline(x=list(stream.time_buffer)[120], color='orange', linestyle=':', alpha=0.5)
axes[2].set_xlabel('時刻', fontsize=12)
axes[2].set_ylabel('流量 (m³/h)', fontsize=12)
axes[2].set_title('流量トレンド', fontsize=14, fontweight='bold')
axes[2].legend()
axes[2].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 最終統計
final_stats = stream.get_statistics()
print("=== 最終統計 ===")
for key, value in final_stats.items():
print(f" {key}: {value:.2f}")
解説: このコードは、dequeを使った効率的なデータバッファリングを実装しています。固定長キュー(deque)は、古いデータを自動的に削除しながら新しいデータを追加するため、メモリ効率的なリアルタイム処理に最適です。実際のプロセスでは、PLC/DCSから連続的にデータを取得し、このようなバッファリングを行います。
コード例3: マルチチャート監視インターフェース
目的: 複数のプロセス変数を同時にモニタリングする包括的な監視インターフェースを構築する。
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import pandas as pd
class ProcessMonitoringInterface:
"""
マルチチャートプロセス監視インターフェース
Parameters:
-----------
process_name : str
プロセス名
variables : list of dict
監視変数のリスト(name, unit, target, range)
"""
def __init__(self, process_name, variables):
self.process_name = process_name
self.variables = variables
self.n_vars = len(variables)
def create_monitoring_dashboard(self, time_data, variable_data):
"""
監視ダッシュボードの作成
Parameters:
-----------
time_data : array-like
時間データ
variable_data : dict
変数名をキーとしたデータ辞書
Returns:
--------
fig : plotly.graph_objects.Figure
"""
# サブプロット構成(変数数に応じて動的に配置)
n_rows = int(np.ceil(self.n_vars / 2))
n_cols = 2 if self.n_vars > 1 else 1
fig = make_subplots(
rows=n_rows,
cols=n_cols,
subplot_titles=[var['name'] for var in self.variables],
vertical_spacing=0.12,
horizontal_spacing=0.1
)
# 各変数のトレンドチャートを追加
for idx, var in enumerate(self.variables):
row = idx // 2 + 1
col = idx % 2 + 1
data = variable_data[var['name']]
# トレンドライン
fig.add_trace(
go.Scatter(
x=time_data,
y=data,
mode='lines',
name=var['name'],
line=dict(color=var.get('color', '#11998e'), width=2)
),
row=row, col=col
)
# 目標値ライン
if 'target' in var:
fig.add_hline(
y=var['target'],
line_dash="dash",
line_color="red",
annotation_text=f"目標: {var['target']}",
row=row, col=col
)
# 管理範囲
if 'range' in var:
lower, upper = var['range']
fig.add_hrect(
y0=lower, y1=upper,
fillcolor="green", opacity=0.1,
line_width=0,
row=row, col=col
)
# 軸ラベル
fig.update_xaxes(title_text="時刻", row=row, col=col)
fig.update_yaxes(title_text=f"{var['name']} ({var['unit']})", row=row, col=col)
# 全体レイアウト
fig.update_layout(
title_text=f"{self.process_name} - マルチ変数監視ダッシュボード",
title_font_size=20,
title_x=0.5,
height=300 * n_rows,
showlegend=False,
template="plotly_white"
)
return fig
# 監視インターフェースの構築例
# 化学反応器の監視変数定義
variables = [
{'name': '反応器温度', 'unit': '°C', 'target': 175, 'range': (173, 177), 'color': '#11998e'},
{'name': 'ジャケット温度', 'unit': '°C', 'target': 165, 'range': (163, 167), 'color': '#f59e0b'},
{'name': '反応器圧力', 'unit': 'MPa', 'target': 1.5, 'range': (1.45, 1.55), 'color': '#7b2cbf'},
{'name': '原料流量', 'unit': 'm³/h', 'target': 50, 'range': (48, 52), 'color': '#e63946'},
{'name': '冷却水流量', 'unit': 'm³/h', 'target': 100, 'range': (95, 105), 'color': '#06a77d'},
{'name': 'pH', 'unit': '-', 'target': 7.0, 'range': (6.8, 7.2), 'color': '#ff006e'}
]
# データ生成(24時間、1分間隔)
np.random.seed(42)
n_points = 1440
time_data = pd.date_range('2025-01-01 00:00:00', periods=n_points, freq='1min')
# 各変数のデータ生成
variable_data = {}
# 反応器温度
variable_data['反応器温度'] = 175 + np.random.normal(0, 1.5, n_points) + \
2 * np.sin(2 * np.pi * np.arange(n_points) / 360)
# ジャケット温度
variable_data['ジャケット温度'] = 165 + np.random.normal(0, 1.2, n_points) + \
1.5 * np.sin(2 * np.pi * np.arange(n_points) / 360)
# 反応器圧力
variable_data['反応器圧力'] = 1.5 + np.random.normal(0, 0.02, n_points)
# 原料流量
variable_data['原料流量'] = 50 + np.random.normal(0, 1.5, n_points)
# 冷却水流量
variable_data['冷却水流量'] = 100 + np.random.normal(0, 2.5, n_points)
# pH
variable_data['pH'] = 7.0 + np.random.normal(0, 0.15, n_points)
# 監視インターフェースの作成
interface = ProcessMonitoringInterface(
process_name="化学反応器 R-101",
variables=variables
)
fig = interface.create_monitoring_dashboard(time_data, variable_data)
# HTMLとして保存
fig.write_html("multi_variable_monitoring_dashboard.html")
print("マルチ変数監視ダッシュボードを保存しました。")
# 現在の状態サマリー
print("\n=== 現在のプロセス状態(最新値) ===")
for var in variables:
latest_value = variable_data[var['name']][-1]
target = var.get('target', None)
status = "正常"
if target and 'range' in var:
lower, upper = var['range']
if not (lower <= latest_value <= upper):
status = "警告"
print(f"{var['name']:<15}: {latest_value:>7.2f} {var['unit']:<5} (目標: {target}) - {status}")
解説: このマルチチャート監視インターフェースは、プロセスの複数の変数を同時に監視できるダッシュボードを動的に生成します。各変数に対して目標値と管理範囲を可視化することで、オペレータが一目でプロセス状態を把握できます。実際のプラントでは、数十〜数百の変数を監視します。
コード例4: アラーム通知システムの実装
目的: 重要度レベル別のアラーム通知システムを構築し、アラームログを管理する。
import pandas as pd
import numpy as np
from datetime import datetime
from enum import Enum
class AlarmSeverity(Enum):
"""アラーム重要度レベル"""
INFO = 1 # 情報
WARNING = 2 # 警告
ALARM = 3 # アラーム
CRITICAL = 4 # 緊急
class AlarmManager:
"""
プロセスアラーム管理システム
Parameters:
-----------
alarm_rules : list of dict
アラームルールのリスト
"""
def __init__(self, alarm_rules):
self.alarm_rules = alarm_rules
self.active_alarms = {}
self.alarm_history = []
def check_alarms(self, process_data):
"""
アラーム状態のチェック
Parameters:
-----------
process_data : dict
プロセス変数の現在値
Returns:
--------
new_alarms : list
新規発生アラーム
"""
new_alarms = []
current_time = datetime.now()
for rule in self.alarm_rules:
variable = rule['variable']
value = process_data.get(variable, None)
if value is None:
continue
# アラーム条件のチェック
alarm_triggered = self._evaluate_condition(value, rule)
alarm_id = f"{variable}_{rule['name']}"
if alarm_triggered:
# アラーム発生
if alarm_id not in self.active_alarms:
alarm = {
'id': alarm_id,
'variable': variable,
'name': rule['name'],
'severity': rule['severity'],
'value': value,
'condition': rule['condition'],
'threshold': rule['threshold'],
'timestamp': current_time,
'acknowledged': False
}
self.active_alarms[alarm_id] = alarm
self.alarm_history.append(alarm.copy())
new_alarms.append(alarm)
else:
# アラーム復帰
if alarm_id in self.active_alarms:
alarm = self.active_alarms.pop(alarm_id)
alarm['cleared_time'] = current_time
alarm['duration'] = (current_time - alarm['timestamp']).total_seconds()
return new_alarms
def _evaluate_condition(self, value, rule):
"""アラーム条件の評価"""
condition = rule['condition']
threshold = rule['threshold']
if condition == 'greater_than':
return value > threshold
elif condition == 'less_than':
return value < threshold
elif condition == 'out_of_range':
lower, upper = threshold
return value < lower or value > upper
elif condition == 'deviation':
target = rule['target']
deviation = rule['threshold']
return abs(value - target) > deviation
else:
return False
def acknowledge_alarm(self, alarm_id):
"""アラームの確認"""
if alarm_id in self.active_alarms:
self.active_alarms[alarm_id]['acknowledged'] = True
return True
return False
def get_active_alarms(self, severity=None):
"""
アクティブアラームの取得
Parameters:
-----------
severity : AlarmSeverity or None
重要度でフィルタリング
Returns:
--------
alarms : list
"""
alarms = list(self.active_alarms.values())
if severity:
alarms = [a for a in alarms if a['severity'] == severity]
# 重要度で降順ソート
alarms.sort(key=lambda x: x['severity'].value, reverse=True)
return alarms
def get_alarm_statistics(self):
"""アラーム統計の取得"""
total_alarms = len(self.alarm_history)
active_count = len(self.active_alarms)
acknowledged_count = sum(1 for a in self.active_alarms.values() if a['acknowledged'])
severity_counts = {}
for severity in AlarmSeverity:
count = sum(1 for a in self.alarm_history if a['severity'] == severity)
severity_counts[severity.name] = count
stats = {
'total_alarms': total_alarms,
'active_alarms': active_count,
'acknowledged_alarms': acknowledged_count,
'unacknowledged_alarms': active_count - acknowledged_count,
'severity_breakdown': severity_counts
}
return stats
# アラームルールの定義
alarm_rules = [
{
'variable': '反応器温度',
'name': '高温警告',
'severity': AlarmSeverity.WARNING,
'condition': 'greater_than',
'threshold': 177
},
{
'variable': '反応器温度',
'name': '高温アラーム',
'severity': AlarmSeverity.ALARM,
'condition': 'greater_than',
'threshold': 180
},
{
'variable': '反応器温度',
'name': '低温警告',
'severity': AlarmSeverity.WARNING,
'condition': 'less_than',
'threshold': 173
},
{
'variable': '反応器圧力',
'name': '圧力異常',
'severity': AlarmSeverity.CRITICAL,
'condition': 'out_of_range',
'threshold': (1.4, 1.6)
},
{
'variable': '原料流量',
'name': '流量偏差',
'severity': AlarmSeverity.WARNING,
'condition': 'deviation',
'target': 50,
'threshold': 5
}
]
# アラームマネージャーの初期化
alarm_mgr = AlarmManager(alarm_rules)
# シミュレーション: 1時間のプロセス運転
np.random.seed(42)
n_samples = 60 # 1分ごと、60分間
print("=== プロセスアラーム監視システム ===\n")
print("1時間のシミュレーションを開始します...\n")
for i in range(n_samples):
# プロセスデータのシミュレーション
process_data = {
'反応器温度': 175 + np.random.normal(0, 2) + 5 * np.sin(2 * np.pi * i / 60),
'反応器圧力': 1.5 + np.random.normal(0, 0.05),
'原料流量': 50 + np.random.normal(0, 3)
}
# アラームチェック
new_alarms = alarm_mgr.check_alarms(process_data)
# 新規アラームの表示
if new_alarms:
for alarm in new_alarms:
severity_color = {
AlarmSeverity.INFO: '🔵',
AlarmSeverity.WARNING: '🟡',
AlarmSeverity.ALARM: '🟠',
AlarmSeverity.CRITICAL: '🔴'
}
icon = severity_color.get(alarm['severity'], '⚪')
print(f"[{alarm['timestamp'].strftime('%H:%M:%S')}] {icon} {alarm['severity'].name}: "
f"{alarm['variable']} - {alarm['name']} "
f"(値: {alarm['value']:.2f})")
# 最終統計
print("\n" + "="*60)
print("=== アラーム統計サマリー ===")
stats = alarm_mgr.get_alarm_statistics()
print(f"\n総アラーム数: {stats['total_alarms']}")
print(f"アクティブアラーム: {stats['active_alarms']}")
print(f" - 未確認: {stats['unacknowledged_alarms']}")
print(f" - 確認済み: {stats['acknowledged_alarms']}")
print("\n重要度別内訳:")
for severity, count in stats['severity_breakdown'].items():
print(f" {severity:<10}: {count:>3}件")
# アクティブアラームの一覧
active_alarms = alarm_mgr.get_active_alarms()
if active_alarms:
print("\n=== 現在のアクティブアラーム ===")
for alarm in active_alarms:
duration = (datetime.now() - alarm['timestamp']).total_seconds()
ack_status = "確認済み" if alarm['acknowledged'] else "未確認"
print(f" - [{alarm['severity'].name}] {alarm['variable']}: {alarm['name']} "
f"({duration:.0f}秒継続中, {ack_status})")
else:
print("\n現在アクティブなアラームはありません。")
# アラーム履歴のDataFrame化
if alarm_mgr.alarm_history:
df_alarms = pd.DataFrame(alarm_mgr.alarm_history)
df_alarms['severity_name'] = df_alarms['severity'].apply(lambda x: x.name)
print("\n=== アラーム履歴トップ10 ===")
print(df_alarms[['timestamp', 'variable', 'name', 'severity_name', 'value']].head(10).to_string(index=False))
期待される出力:
=== プロセスアラーム監視システム ===
1時間のシミュレーションを開始します...
[14:23:12] 🟡 WARNING: 反応器温度 - 高温警告 (値: 178.45)
[14:31:45] 🟡 WARNING: 原料流量 - 流量偏差 (値: 56.23)
[14:42:18] 🔴 CRITICAL: 反応器圧力 - 圧力異常 (値: 1.62)
============================================================
=== アラーム統計サマリー ===
総アラーム数: 8
アクティブアラーム: 2
- 未確認: 2
- 確認済み: 0
重要度別内訳:
INFO : 0件
WARNING : 5件
ALARM : 1件
CRITICAL : 2件
=== 現在のアクティブアラーム ===
- [CRITICAL] 反応器圧力: 圧力異常 (123秒継続中, 未確認)
- [WARNING] 反応器温度: 高温警告 (89秒継続中, 未確認)
解説: このアラームシステムは、プロセス変数を連続的に監視し、異常状態を重要度レベル別に通知します。アラーム履歴の記録、確認(Acknowledgement)機能、統計レポートにより、オペレータが効率的にアラームを管理できます。実際のプラントでは、メール通知やSlack連携も実装します。
コード例5: 履歴データトレンド分析とパターン検出
目的: 履歴データから傾向を分析し、ピーク・谷・トレンドを自動検出する。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import find_peaks, savgol_filter
# 日本語フォント設定
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
def analyze_historical_trends(time_data, values, variable_name):
"""
履歴データのトレンド分析
Parameters:
-----------
time_data : array-like
時間データ
values : array-like
変数値
variable_name : str
変数名
Returns:
--------
analysis : dict
分析結果
"""
# 移動平均でトレンド抽出
window_size = 60 # 60ポイント移動平均
trend = pd.Series(values).rolling(window=window_size, center=True).mean().values
# Savitzky-Golayフィルタで平滑化
if len(values) > 51:
smoothed = savgol_filter(values, window_length=51, polyorder=3)
else:
smoothed = values
# ピーク検出
peaks, peak_props = find_peaks(smoothed, prominence=1.5, distance=30)
valleys, valley_props = find_peaks(-smoothed, prominence=1.5, distance=30)
# 統計量
mean_value = np.mean(values)
std_value = np.std(values)
max_value = np.max(values)
min_value = np.min(values)
# トレンド傾き(最小二乗法)
x = np.arange(len(values))
slope = np.polyfit(x, values, 1)[0]
analysis = {
'variable': variable_name,
'trend': trend,
'smoothed': smoothed,
'peaks': peaks,
'valleys': valleys,
'peak_values': values[peaks],
'valley_values': values[valleys],
'mean': mean_value,
'std': std_value,
'max': max_value,
'min': min_value,
'slope': slope,
'n_peaks': len(peaks),
'n_valleys': len(valleys)
}
return analysis
# 履歴データ生成(7日間、1時間間隔)
np.random.seed(42)
n_days = 7
n_points = n_days * 24
time_data = pd.date_range('2025-01-01', periods=n_points, freq='1h')
# 反応器温度データ(日周期変動 + トレンド + ノイズ)
base_temp = 175
daily_cycle = 5 * np.sin(2 * np.pi * np.arange(n_points) / 24) # 日周期
weekly_trend = 3 * np.sin(2 * np.pi * np.arange(n_points) / (7*24)) # 週周期
noise = np.random.normal(0, 1.2, n_points)
temperature = base_temp + daily_cycle + weekly_trend + noise
# トレンド分析実行
analysis = analyze_historical_trends(time_data, temperature, '反応器温度')
# 可視化
fig, axes = plt.subplots(3, 1, figsize=(16, 12))
# 元データとトレンド
axes[0].plot(time_data, temperature, 'b-', linewidth=0.8, alpha=0.5, label='生データ')
axes[0].plot(time_data, analysis['smoothed'], 'r-', linewidth=2, label='平滑化データ')
axes[0].plot(time_data, analysis['trend'], 'g--', linewidth=2, label='トレンド(移動平均)')
axes[0].scatter(time_data[analysis['peaks']], analysis['peak_values'],
color='red', s=80, marker='^', label='ピーク', zorder=5)
axes[0].scatter(time_data[analysis['valleys']], analysis['valley_values'],
color='blue', s=80, marker='v', label='谷', zorder=5)
axes[0].axhline(y=analysis['mean'], color='black', linestyle='--', alpha=0.5, label='平均値')
axes[0].set_ylabel('温度 (°C)', fontsize=12)
axes[0].set_title('履歴データトレンド分析 - 反応器温度(7日間)', fontsize=14, fontweight='bold')
axes[0].legend(loc='upper right')
axes[0].grid(alpha=0.3)
# ヒストグラム(分布分析)
axes[1].hist(temperature, bins=30, color='#11998e', alpha=0.7, edgecolor='black')
axes[1].axvline(x=analysis['mean'], color='red', linestyle='--', linewidth=2,
label=f"平均: {analysis['mean']:.2f}°C")
axes[1].axvline(x=analysis['mean'] + analysis['std'], color='orange', linestyle=':',
linewidth=2, label=f"±1σ: {analysis['std']:.2f}°C")
axes[1].axvline(x=analysis['mean'] - analysis['std'], color='orange', linestyle=':', linewidth=2)
axes[1].set_xlabel('温度 (°C)', fontsize=12)
axes[1].set_ylabel('頻度', fontsize=12)
axes[1].set_title('温度分布', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
# 日次統計(ボックスプロット)
df_temp = pd.DataFrame({'timestamp': time_data, 'temperature': temperature})
df_temp['day'] = df_temp['timestamp'].dt.day
daily_data = [df_temp[df_temp['day'] == day]['temperature'].values for day in range(1, n_days+1)]
bp = axes[2].boxplot(daily_data, labels=[f'{i+1}日' for i in range(n_days)],
patch_artist=True, showmeans=True)
for patch in bp['boxes']:
patch.set_facecolor('#11998e')
patch.set_alpha(0.6)
axes[2].set_xlabel('日', fontsize=12)
axes[2].set_ylabel('温度 (°C)', fontsize=12)
axes[2].set_title('日次温度分布(ボックスプロット)', fontsize=13, fontweight='bold')
axes[2].grid(alpha=0.3, axis='y')
plt.tight_layout()
plt.show()
# 分析結果のサマリー
print("=== 履歴データトレンド分析結果 ===\n")
print(f"変数: {analysis['variable']}")
print(f"期間: {time_data[0]} ~ {time_data[-1]} ({n_days}日間)")
print(f"\n統計量:")
print(f" 平均値: {analysis['mean']:.2f} °C")
print(f" 標準偏差: {analysis['std']:.2f} °C")
print(f" 最大値: {analysis['max']:.2f} °C")
print(f" 最小値: {analysis['min']:.2f} °C")
print(f" 範囲: {analysis['max'] - analysis['min']:.2f} °C")
print(f"\nトレンド:")
print(f" 傾き: {analysis['slope']:.4f} °C/時間")
print(f" 7日間の変化: {analysis['slope'] * n_points:.2f} °C")
print(f"\nパターン検出:")
print(f" ピーク数: {analysis['n_peaks']}")
print(f" 谷の数: {analysis['n_valleys']}")
if analysis['n_peaks'] > 0:
print(f" 平均ピーク値: {np.mean(analysis['peak_values']):.2f} °C")
if analysis['n_valleys'] > 0:
print(f" 平均谷値: {np.mean(analysis['valley_values']):.2f} °C")
# 異常期間の検出(±3σ範囲外)
outliers = np.abs(temperature - analysis['mean']) > 3 * analysis['std']
outlier_count = np.sum(outliers)
print(f"\n異常検出:")
print(f" ±3σ範囲外のポイント: {outlier_count}個 ({outlier_count/len(temperature)*100:.2f}%)")
if outlier_count > 0:
outlier_times = time_data[outliers]
print(f" 最初の異常時刻: {outlier_times[0]}")
print(f" 最後の異常時刻: {outlier_times[-1]}")
解説: 履歴データ分析は、プロセスの長期的な傾向を把握し、異常パターンを検出するために重要です。このコードは、移動平均によるトレンド抽出、Savitzky-Golayフィルタによる平滑化、ピーク・谷の自動検出を実装しています。ボックスプロットによる日次比較は、周期的なパターンや異常日を特定するのに有効です。
コード例6: KPI計算とダッシュボード表示
目的: プロセス産業で使用される主要KPI(OEE、稼働率、品質率)を計算し、可視化する。
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
class ProcessKPICalculator:
"""
プロセスKPI計算システム
主要KPI:
- OEE (Overall Equipment Effectiveness): 設備総合効率
- 可用率 (Availability)
- 性能稼働率 (Performance)
- 品質率 (Quality)
"""
def __init__(self, planned_production_time, ideal_cycle_time):
"""
Parameters:
-----------
planned_production_time : float
計画稼働時間(分)
ideal_cycle_time : float
理想サイクルタイム(分/個)
"""
self.planned_production_time = planned_production_time
self.ideal_cycle_time = ideal_cycle_time
def calculate_availability(self, actual_production_time):
"""
可用率 = 実稼働時間 / 計画稼働時間
Parameters:
-----------
actual_production_time : float
実稼働時間(分)
Returns:
--------
availability : float
可用率(0-1)
"""
availability = actual_production_time / self.planned_production_time
return min(availability, 1.0)
def calculate_performance(self, actual_output, actual_production_time):
"""
性能稼働率 = (理想サイクルタイム × 実生産量) / 実稼働時間
Parameters:
-----------
actual_output : int
実生産量(個)
actual_production_time : float
実稼働時間(分)
Returns:
--------
performance : float
性能稼働率(0-1)
"""
ideal_production_time = self.ideal_cycle_time * actual_output
performance = ideal_production_time / actual_production_time
return min(performance, 1.0)
def calculate_quality(self, actual_output, good_output):
"""
品質率 = 良品数 / 総生産量
Parameters:
-----------
actual_output : int
総生産量(個)
good_output : int
良品数(個)
Returns:
--------
quality : float
品質率(0-1)
"""
if actual_output == 0:
return 0.0
quality = good_output / actual_output
return min(quality, 1.0)
def calculate_oee(self, actual_production_time, actual_output, good_output):
"""
OEE = 可用率 × 性能稼働率 × 品質率
Parameters:
-----------
actual_production_time : float
実稼働時間(分)
actual_output : int
実生産量(個)
good_output : int
良品数(個)
Returns:
--------
oee_metrics : dict
OEEとその構成要素
"""
availability = self.calculate_availability(actual_production_time)
performance = self.calculate_performance(actual_output, actual_production_time)
quality = self.calculate_quality(actual_output, good_output)
oee = availability * performance * quality
metrics = {
'OEE': oee,
'Availability': availability,
'Performance': performance,
'Quality': quality
}
return metrics
# KPI計算例
# 化学プラントの1週間のデータ
planned_time = 7 * 24 * 60 # 7日間(分)
ideal_cycle_time = 2.0 # 理想サイクルタイム(分/バッチ)
kpi_calc = ProcessKPICalculator(planned_time, ideal_cycle_time)
# 週次データ
week_data = {
'月曜': {'actual_time': 22*60, 'output': 620, 'good': 598},
'火曜': {'actual_time': 23*60, 'output': 680, 'good': 672},
'水曜': {'actual_time': 21*60, 'output': 610, 'good': 595},
'木曜': {'actual_time': 23*60, 'output': 685, 'good': 678},
'金曜': {'actual_time': 22*60, 'output': 650, 'good': 640},
'土曜': {'actual_time': 18*60, 'output': 520, 'good': 512},
'日曜': {'actual_time': 15*60, 'output': 430, 'good': 425}
}
# 日次KPI計算
daily_kpis = {}
for day, data in week_data.items():
kpis = kpi_calc.calculate_oee(
data['actual_time'],
data['output'],
data['good']
)
daily_kpis[day] = kpis
# KPIダッシュボードの作成
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('日次OEE推移', 'OEE構成要素', '週次OEE', '生産量と良品率'),
specs=[
[{"type": "scatter"}, {"type": "bar"}],
[{"type": "indicator"}, {"type": "scatter"}]
],
vertical_spacing=0.15,
horizontal_spacing=0.12
)
days = list(week_data.keys())
oee_values = [daily_kpis[day]['OEE'] * 100 for day in days]
availability_values = [daily_kpis[day]['Availability'] * 100 for day in days]
performance_values = [daily_kpis[day]['Performance'] * 100 for day in days]
quality_values = [daily_kpis[day]['Quality'] * 100 for day in days]
# 日次OEE推移(左上)
fig.add_trace(
go.Scatter(
x=days,
y=oee_values,
mode='lines+markers',
name='OEE',
line=dict(color='#11998e', width=3),
marker=dict(size=10)
),
row=1, col=1
)
fig.add_hline(y=85, line_dash="dash", line_color="green",
annotation_text="目標: 85%", row=1, col=1)
fig.add_hline(y=60, line_dash="dash", line_color="orange",
annotation_text="最低基準: 60%", row=1, col=1)
# OEE構成要素(右上)
fig.add_trace(
go.Bar(x=days, y=availability_values, name='可用率',
marker_color='#11998e'),
row=1, col=2
)
fig.add_trace(
go.Bar(x=days, y=performance_values, name='性能稼働率',
marker_color='#f59e0b'),
row=1, col=2
)
fig.add_trace(
go.Bar(x=days, y=quality_values, name='品質率',
marker_color='#7b2cbf'),
row=1, col=2
)
# 週次OEE(左下)
weekly_oee = np.mean(oee_values)
fig.add_trace(
go.Indicator(
mode="gauge+number+delta",
value=weekly_oee,
title={'text': "週次OEE (%)"},
delta={'reference': 85, 'increasing': {'color': "green"}, 'decreasing': {'color': "red"}},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': "#11998e"},
'steps': [
{'range': [0, 60], 'color': "lightcoral"},
{'range': [60, 85], 'color': "lightyellow"},
{'range': [85, 100], 'color': "lightgreen"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 85
}
}
),
row=2, col=1
)
# 生産量と良品率(右下)
output_values = [week_data[day]['output'] for day in days]
good_values = [week_data[day]['good'] for day in days]
fig.add_trace(
go.Scatter(
x=days,
y=output_values,
mode='lines+markers',
name='総生産量',
line=dict(color='#11998e', width=2),
yaxis='y'
),
row=2, col=2
)
fig.add_trace(
go.Scatter(
x=days,
y=good_values,
mode='lines+markers',
name='良品数',
line=dict(color='#4caf50', width=2),
yaxis='y'
),
row=2, col=2
)
# レイアウト設定
fig.update_xaxes(title_text="曜日", row=1, col=1)
fig.update_xaxes(title_text="曜日", row=1, col=2)
fig.update_xaxes(title_text="曜日", row=2, col=2)
fig.update_yaxes(title_text="OEE (%)", row=1, col=1)
fig.update_yaxes(title_text="割合 (%)", row=1, col=2)
fig.update_yaxes(title_text="生産量 (バッチ)", row=2, col=2)
fig.update_layout(
title_text="プロセスKPIダッシュボード - 週次レポート",
title_font_size=20,
title_x=0.5,
height=900,
showlegend=True,
template="plotly_white"
)
fig.write_html("kpi_dashboard.html")
print("KPIダッシュボードを保存しました。\n")
# KPIレポート
print("=== 週次KPIレポート ===\n")
print(f"{'曜日':<8} {'OEE':>7} {'可用率':>7} {'性能':>7} {'品質':>7} {'生産量':>8} {'良品':>8}")
print("-" * 65)
for day in days:
kpi = daily_kpis[day]
data = week_data[day]
print(f"{day:<8} "
f"{kpi['OEE']*100:>6.1f}% "
f"{kpi['Availability']*100:>6.1f}% "
f"{kpi['Performance']*100:>6.1f}% "
f"{kpi['Quality']*100:>6.1f}% "
f"{data['output']:>7}個 "
f"{data['good']:>7}個")
print("-" * 65)
print(f"週次平均: {weekly_oee:>5.1f}%\n")
# 改善提案
print("=== 改善提案 ===")
avg_availability = np.mean(availability_values)
avg_performance = np.mean(performance_values)
avg_quality = np.mean(quality_values)
bottleneck = min([
('可用率', avg_availability),
('性能稼働率', avg_performance),
('品質率', avg_quality)
], key=lambda x: x[1])
print(f"最大のボトルネック: {bottleneck[0]} ({bottleneck[1]:.1f}%)")
if bottleneck[0] == '可用率':
print(" → 設備停止時間の削減、予知保全の導入を検討")
elif bottleneck[0] == '性能稼働率':
print(" → プロセス最適化、ボトルネック工程の改善を検討")
else:
print(" → 品質管理強化、不良原因の特定と対策を検討")
# OEEクラス分類
if weekly_oee >= 85:
oee_class = "世界クラス"
elif weekly_oee >= 60:
oee_class = "平均的"
else:
oee_class = "改善が必要"
print(f"\nOEE評価: {oee_class} (目標: 85%以上)")
期待される出力:
=== 週次KPIレポート ===
曜日 OEE 可用率 性能 品質 生産量 良品
-----------------------------------------------------------------
月曜 84.4% 91.7% 93.5% 96.5% 620個 598個
火曜 91.2% 95.8% 97.1% 98.8% 680個 672個
水曜 82.1% 87.5% 95.2% 97.5% 610個 595個
木曜 91.8% 95.8% 97.5% 99.0% 685個 678個
金曜 87.9% 91.7% 97.2% 98.5% 650個 640個
土曜 85.2% 75.0% 96.3% 98.5% 520個 512個
日曜 83.6% 62.5% 95.3% 98.8% 430個 425個
-----------------------------------------------------------------
週次平均: 86.6%
=== 改善提案 ===
最大のボトルネック: 可用率 (85.7%)
→ 設備停止時間の削減、予知保全の導入を検討
OEE評価: 世界クラス (目標: 85%以上)
解説: OEE(設備総合効率)は、製造業・プロセス産業で最も重要なKPIの一つです。可用率、性能稼働率、品質率の3要素で構成され、設備の総合的な生産性を評価します。このコードは、日次・週次のOEE計算、可視化、ボトルネック分析、改善提案までを自動化しています。
コード例7: プロセス状態の可視化(有限状態機械)
目的: プロセスの運転状態を有限状態機械でモデル化し、状態遷移を可視化する。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from enum import Enum
# 日本語フォント設定
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
class ProcessState(Enum):
"""プロセス運転状態"""
STOPPED = 0 # 停止
STARTUP = 1 # 起動中
RUNNING = 2 # 定常運転
SHUTDOWN = 3 # 停止操作中
ALARM = 4 # アラーム状態
MAINTENANCE = 5 # メンテナンス
class ProcessStateMachine:
"""
プロセス状態機械
状態遷移ルール:
STOPPED → STARTUP → RUNNING
RUNNING → SHUTDOWN → STOPPED
ANY → ALARM (アラーム発生時)
ALARM → 前の状態 (アラーム復帰時)
STOPPED ↔ MAINTENANCE
"""
def __init__(self, initial_state=ProcessState.STOPPED):
self.current_state = initial_state
self.previous_state = None
self.state_history = [(pd.Timestamp.now(), initial_state)]
# 状態遷移ルール
self.transitions = {
ProcessState.STOPPED: [ProcessState.STARTUP, ProcessState.MAINTENANCE],
ProcessState.STARTUP: [ProcessState.RUNNING, ProcessState.ALARM],
ProcessState.RUNNING: [ProcessState.SHUTDOWN, ProcessState.ALARM],
ProcessState.SHUTDOWN: [ProcessState.STOPPED, ProcessState.ALARM],
ProcessState.ALARM: [], # アラームからは任意の状態に復帰可能
ProcessState.MAINTENANCE: [ProcessState.STOPPED]
}
def transition(self, new_state, timestamp=None):
"""
状態遷移
Parameters:
-----------
new_state : ProcessState
新しい状態
timestamp : pd.Timestamp or None
遷移時刻
Returns:
--------
success : bool
遷移の成功/失敗
"""
if timestamp is None:
timestamp = pd.Timestamp.now()
# アラーム状態への遷移は常に許可
if new_state == ProcessState.ALARM:
self.previous_state = self.current_state
self.current_state = new_state
self.state_history.append((timestamp, new_state))
return True
# アラーム状態からの復帰
if self.current_state == ProcessState.ALARM:
self.current_state = new_state
self.state_history.append((timestamp, new_state))
return True
# 通常の状態遷移ルールチェック
if new_state in self.transitions[self.current_state]:
self.current_state = new_state
self.state_history.append((timestamp, new_state))
return True
# 不正な遷移
return False
def get_state_duration(self):
"""現在の状態の継続時間(秒)"""
if len(self.state_history) < 2:
return 0
current_time = pd.Timestamp.now()
last_transition = self.state_history[-1][0]
duration = (current_time - last_transition).total_seconds()
return duration
def get_state_statistics(self):
"""状態別の統計情報"""
if len(self.state_history) < 2:
return {}
df_history = pd.DataFrame(self.state_history, columns=['timestamp', 'state'])
df_history['duration'] = df_history['timestamp'].diff().shift(-1).dt.total_seconds()
stats = {}
for state in ProcessState:
state_data = df_history[df_history['state'] == state]
if len(state_data) > 0:
stats[state.name] = {
'count': len(state_data),
'total_duration': state_data['duration'].sum(),
'avg_duration': state_data['duration'].mean()
}
return stats
# プロセス状態機械のシミュレーション
print("=== プロセス状態機械シミュレーション ===\n")
# 初期化
state_machine = ProcessStateMachine(initial_state=ProcessState.STOPPED)
# 1日間のプロセス運転シミュレーション
start_time = pd.Timestamp('2025-01-01 00:00:00')
# 状態遷移シナリオ
transitions_scenario = [
(start_time + pd.Timedelta(hours=0), ProcessState.STOPPED),
(start_time + pd.Timedelta(hours=1), ProcessState.STARTUP),
(start_time + pd.Timedelta(hours=2), ProcessState.RUNNING),
(start_time + pd.Timedelta(hours=8), ProcessState.ALARM), # アラーム発生
(start_time + pd.Timedelta(hours=8.5), ProcessState.RUNNING), # アラーム復帰
(start_time + pd.Timedelta(hours=16), ProcessState.SHUTDOWN),
(start_time + pd.Timedelta(hours=17), ProcessState.STOPPED),
(start_time + pd.Timedelta(hours=18), ProcessState.MAINTENANCE),
(start_time + pd.Timedelta(hours=22), ProcessState.STOPPED),
]
# 状態遷移実行
for timestamp, new_state in transitions_scenario[1:]:
success = state_machine.transition(new_state, timestamp)
if success:
print(f"[{timestamp.strftime('%H:%M')}] {state_machine.current_state.name}")
else:
print(f"[{timestamp.strftime('%H:%M')}] 不正な遷移: → {new_state.name}")
# 状態履歴の可視化
df_history = pd.DataFrame(state_machine.state_history, columns=['timestamp', 'state'])
df_history['state_code'] = df_history['state'].apply(lambda x: x.value)
df_history['state_name'] = df_history['state'].apply(lambda x: x.name)
# Ganttチャート風の可視化
fig, ax = plt.subplots(figsize=(16, 6))
# 状態ごとに色を定義
state_colors = {
ProcessState.STOPPED: '#gray',
ProcessState.STARTUP: '#ffeb3b',
ProcessState.RUNNING: '#4caf50',
ProcessState.SHUTDOWN: '#ff9800',
ProcessState.ALARM: '#f44336',
ProcessState.MAINTENANCE: '#2196f3'
}
# 各状態期間をバーで表示
for i in range(len(df_history) - 1):
start = df_history.iloc[i]['timestamp']
end = df_history.iloc[i + 1]['timestamp']
state = df_history.iloc[i]['state']
ax.barh(
0,
width=(end - start).total_seconds() / 3600, # 時間単位
left=(start - start_time).total_seconds() / 3600,
height=0.5,
color=state_colors[state],
edgecolor='black',
linewidth=1.5,
label=state.name if i == 0 or df_history.iloc[i-1]['state'] != state else ""
)
# 状態名をバーの中央に表示
duration = (end - start).total_seconds() / 3600
if duration > 0.5: # 30分以上の状態のみラベル表示
mid_point = (start - start_time).total_seconds() / 3600 + duration / 2
ax.text(mid_point, 0, state.name, ha='center', va='center',
fontsize=11, fontweight='bold', color='white')
# 凡例(重複を除去)
handles, labels = ax.get_legend_handles_labels()
by_label = dict(zip(labels, handles))
ax.legend(by_label.values(), by_label.keys(), loc='upper right', fontsize=10)
ax.set_xlabel('時刻(時)', fontsize=12)
ax.set_yticks([])
ax.set_xlim(0, 24)
ax.set_title('プロセス状態遷移タイムライン(24時間)', fontsize=14, fontweight='bold')
ax.grid(alpha=0.3, axis='x')
plt.tight_layout()
plt.show()
# 統計情報
print("\n=== 状態統計 ===")
stats = state_machine.get_state_statistics()
print(f"\n{'状態':<15} {'回数':>6} {'合計時間':>10} {'平均時間':>10}")
print("-" * 50)
for state_name, stat in stats.items():
total_hours = stat['total_duration'] / 3600
avg_hours = stat['avg_duration'] / 3600
print(f"{state_name:<15} {stat['count']:>6}回 "
f"{total_hours:>9.2f}時間 {avg_hours:>9.2f}時間")
# 稼働率計算
total_time = 24 # 時間
running_time = stats.get('RUNNING', {}).get('total_duration', 0) / 3600
availability = running_time / total_time * 100
print(f"\n稼働率: {availability:.1f}% ({running_time:.2f}時間 / {total_time}時間)")
期待される出力:
=== プロセス状態機械シミュレーション ===
[01:00] STARTUP
[02:00] RUNNING
[08:00] ALARM
[08:30] RUNNING
[16:00] SHUTDOWN
[17:00] STOPPED
[18:00] MAINTENANCE
[22:00] STOPPED
=== 状態統計 ===
状態 回数 合計時間 平均時間
--------------------------------------------------
STOPPED 2回 5.00時間 2.50時間
STARTUP 1回 1.00時間 1.00時間
RUNNING 2回 13.50時間 6.75時間
ALARM 1回 0.50時間 0.50時間
SHUTDOWN 1回 1.00時間 1.00時間
MAINTENANCE 1回 4.00時間 4.00時間
稼働率: 56.2% (13.50時間 / 24時間)
解説: 有限状態機械(FSM)は、プロセスの運転状態を明確にモデル化し、状態遷移を管理する強力なツールです。このコードは、プロセスの起動、定常運転、停止、アラーム、メンテナンスの各状態と、それらの間の遷移ルールを実装しています。タイムラインの可視化により、オペレータがプロセスの運転履歴を直感的に把握できます。
コード例8: 完全な統合監視システム - 化学反応器ケーススタディ
目的: これまで学んだ全ての要素を統合し、化学反応器の完全な監視システムを構築する。
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from collections import deque
from datetime import datetime
class IntegratedProcessMonitoringSystem:
"""
統合プロセス監視システム
機能:
- リアルタイムデータ収集
- マルチ変数トレンド監視
- アラーム管理
- KPI計算
- 状態管理
"""
def __init__(self, process_name):
self.process_name = process_name
# データバッファ(1時間分、1秒間隔)
self.buffer_size = 3600
self.time_buffer = deque(maxlen=self.buffer_size)
self.data_buffers = {
'反応器温度': deque(maxlen=self.buffer_size),
'ジャケット温度': deque(maxlen=self.buffer_size),
'反応器圧力': deque(maxlen=self.buffer_size),
'原料流量': deque(maxlen=self.buffer_size),
'生成物濃度': deque(maxlen=self.buffer_size)
}
# アラームシステム
self.active_alarms = []
# KPIデータ
self.kpi_data = {
'production_count': 0,
'good_count': 0,
'running_time': 0
}
# プロセス状態
self.process_state = 'RUNNING'
self.iteration = 0
def generate_process_data(self):
"""プロセスデータの生成(実際はPLC/DCSから取得)"""
elapsed = self.iteration
# 反応器温度(目標175°C)
temp_base = 175.0
temp_noise = np.random.normal(0, 1.2)
temp_disturbance = 3 * np.sin(2 * np.pi * elapsed / 300) # 5分周期
reactor_temp = temp_base + temp_noise + temp_disturbance
# ジャケット温度(反応器温度より低い)
jacket_temp = reactor_temp - 10 + np.random.normal(0, 0.8)
# 圧力(目標1.5 MPa)
pressure = 1.5 + np.random.normal(0, 0.03)
# 原料流量(目標50 m³/h)
flow_rate = 50 + np.random.normal(0, 2.5)
# 生成物濃度(目標80%)
concentration = 80 + np.random.normal(0, 3) - 0.5 * (reactor_temp - 175)
return {
'反応器温度': reactor_temp,
'ジャケット温度': jacket_temp,
'反応器圧力': pressure,
'原料流量': flow_rate,
'生成物濃度': concentration
}
def check_process_alarms(self, data):
"""プロセスアラームのチェック"""
alarms = []
# 温度アラーム
if data['反応器温度'] > 180:
alarms.append({'severity': 'CRITICAL', 'variable': '反応器温度',
'message': '高温アラーム', 'value': data['反応器温度']})
elif data['反応器温度'] > 177:
alarms.append({'severity': 'WARNING', 'variable': '反応器温度',
'message': '高温警告', 'value': data['反応器温度']})
elif data['反応器温度'] < 173:
alarms.append({'severity': 'WARNING', 'variable': '反応器温度',
'message': '低温警告', 'value': data['反応器温度']})
# 圧力アラーム
if data['反応器圧力'] > 1.6 or data['反応器圧力'] < 1.4:
alarms.append({'severity': 'CRITICAL', 'variable': '反応器圧力',
'message': '圧力異常', 'value': data['反応器圧力']})
# 濃度アラーム
if data['生成物濃度'] < 75:
alarms.append({'severity': 'WARNING', 'variable': '生成物濃度',
'message': '品質低下', 'value': data['生成物濃度']})
self.active_alarms = alarms
return alarms
def update(self):
"""システムの更新(1秒ごとに呼び出される)"""
current_time = datetime.now()
data = self.generate_process_data()
# データバッファに追加
self.time_buffer.append(current_time)
for var, value in data.items():
self.data_buffers[var].append(value)
# アラームチェック
alarms = self.check_process_alarms(data)
# KPI更新
self.kpi_data['running_time'] += 1 # 秒
self.iteration += 1
return data, alarms
def create_integrated_dashboard(self):
"""統合ダッシュボードの作成"""
if len(self.time_buffer) < 10:
return None
# データをnumpy配列に変換
time_array = list(self.time_buffer)
reactor_temp = np.array(list(self.data_buffers['反応器温度']))
jacket_temp = np.array(list(self.data_buffers['ジャケット温度']))
pressure = np.array(list(self.data_buffers['反応器圧力']))
flow_rate = np.array(list(self.data_buffers['原料流量']))
concentration = np.array(list(self.data_buffers['生成物濃度']))
# サブプロットの作成
fig = make_subplots(
rows=3, cols=2,
subplot_titles=(
'反応器温度トレンド', 'ジャケット温度トレンド',
'反応器圧力トレンド', '原料流量トレンド',
'生成物濃度トレンド', '現在のプロセス状態'
),
specs=[
[{"type": "scatter"}, {"type": "scatter"}],
[{"type": "scatter"}, {"type": "scatter"}],
[{"type": "scatter"}, {"type": "indicator"}]
],
vertical_spacing=0.12,
horizontal_spacing=0.12
)
# 反応器温度
fig.add_trace(
go.Scatter(x=time_array, y=reactor_temp, mode='lines',
name='反応器温度', line=dict(color='#11998e', width=2)),
row=1, col=1
)
fig.add_hline(y=175, line_dash="dash", line_color="red", row=1, col=1)
fig.add_hrect(y0=173, y1=177, fillcolor="green", opacity=0.1,
line_width=0, row=1, col=1)
# ジャケット温度
fig.add_trace(
go.Scatter(x=time_array, y=jacket_temp, mode='lines',
name='ジャケット温度', line=dict(color='#f59e0b', width=2)),
row=1, col=2
)
# 圧力
fig.add_trace(
go.Scatter(x=time_array, y=pressure, mode='lines',
name='圧力', line=dict(color='#7b2cbf', width=2)),
row=2, col=1
)
fig.add_hline(y=1.5, line_dash="dash", line_color="red", row=2, col=1)
# 流量
fig.add_trace(
go.Scatter(x=time_array, y=flow_rate, mode='lines',
name='流量', line=dict(color='#e63946', width=2)),
row=2, col=2
)
fig.add_hline(y=50, line_dash="dash", line_color="red", row=2, col=2)
# 濃度
fig.add_trace(
go.Scatter(x=time_array, y=concentration, mode='lines',
name='濃度', line=dict(color='#06a77d', width=2)),
row=3, col=1
)
fig.add_hline(y=80, line_dash="dash", line_color="red", row=3, col=1)
# プロセス状態インジケータ
current_temp = reactor_temp[-1]
fig.add_trace(
go.Indicator(
mode="gauge+number",
value=current_temp,
title={'text': "反応器温度 (°C)"},
gauge={
'axis': {'range': [None, 200]},
'bar': {'color': "#11998e"},
'steps': [
{'range': [0, 173], 'color': "lightblue"},
{'range': [173, 177], 'color': "lightgreen"},
{'range': [177, 200], 'color': "lightcoral"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 175
}
}
),
row=3, col=2
)
# 軸ラベル
fig.update_yaxes(title_text="温度 (°C)", row=1, col=1)
fig.update_yaxes(title_text="温度 (°C)", row=1, col=2)
fig.update_yaxes(title_text="圧力 (MPa)", row=2, col=1)
fig.update_yaxes(title_text="流量 (m³/h)", row=2, col=2)
fig.update_yaxes(title_text="濃度 (%)", row=3, col=1)
fig.update_layout(
title_text=f"{self.process_name} - 統合監視ダッシュボード",
title_font_size=20,
title_x=0.5,
height=1000,
showlegend=False,
template="plotly_white"
)
return fig
# 統合監視システムのデモンストレーション
print("=== 統合プロセス監視システム ===")
print("化学反応器 R-101 監視システム起動\n")
# システム初期化
monitoring_system = IntegratedProcessMonitoringSystem("化学反応器 R-101")
# 10分間のシミュレーション(600秒)
print("10分間のプロセス監視を開始します...\n")
alarm_count = 0
for i in range(600):
data, alarms = monitoring_system.update()
# 30秒ごとに状態表示
if (i + 1) % 30 == 0:
print(f"[{i+1:>3}秒] "
f"温度: {data['反応器温度']:>6.2f}°C | "
f"圧力: {data['反応器圧力']:>5.3f} MPa | "
f"濃度: {data['生成物濃度']:>5.2f}% | "
f"アラーム: {len(alarms)}件")
if alarms:
for alarm in alarms:
severity_icon = '🔴' if alarm['severity'] == 'CRITICAL' else '🟡'
print(f" {severity_icon} {alarm['variable']}: {alarm['message']} ({alarm['value']:.2f})")
alarm_count += len(alarms)
print("\nプロセス監視完了!\n")
# 統合ダッシュボードの生成
fig = monitoring_system.create_integrated_dashboard()
if fig:
fig.write_html("integrated_monitoring_system.html")
print("統合ダッシュボードを 'integrated_monitoring_system.html' に保存しました。\n")
# 最終統計
print("=== プロセス統計サマリー ===")
print(f"監視時間: {monitoring_system.kpi_data['running_time']} 秒 (10分)")
print(f"総アラーム数: {alarm_count}件")
# 各変数の統計
print("\n変数統計:")
for var_name, buffer in monitoring_system.data_buffers.items():
data_array = np.array(list(buffer))
print(f" {var_name:<15}: 平均 {np.mean(data_array):>7.2f}, "
f"標準偏差 {np.std(data_array):>5.2f}, "
f"最大 {np.max(data_array):>7.2f}, "
f"最小 {np.min(data_array):>7.2f}")
print("\nシステム正常終了。")
解説: このコードは、本章で学んだ全ての要素(リアルタイムデータ収集、マルチ変数監視、アラーム管理、KPI計算、ダッシュボード可視化)を統合した完全なプロセス監視システムです。化学反応器を例に、実際のプラントで使用される監視システムの構成を実装しています。実運用では、このシステムをWebアプリケーション(Dash, Streamlit等)として展開し、複数のオペレータがリアルタイムでアクセスできるようにします。
5.3 本章のまとめ
学んだこと
- リアルタイム監視システムのアーキテクチャ
- データ収集層、処理層、可視化層、保存層の4層構造
- データフロー: センサー → バッファリング → 処理 → 可視化 → 保存
- システム設計の原則とスケーラビリティ
- ダッシュボード設計と可視化
- Plotlyによるインタラクティブなダッシュボード構築
- トレンドチャート、ゲージ、インジケータの効果的な配置
- オペレータ向けUIデザインの原則
- リアルタイムデータ処理
- dequeによる効率的なデータバッファリング
- ストリーミングデータの統計処理
- リアルタイム異常検知
- アラーム管理システム
- 重要度レベル別のアラーム分類(INFO, WARNING, ALARM, CRITICAL)
- アラーム履歴管理とアクノリッジメント
- アラームフラッド防止戦略
- KPIとプロセス分析
- OEE(設備総合効率)の計算と評価
- 履歴データトレンド分析とパターン検出
- ボトルネック分析と改善提案
- 統合監視システム
- 全機能を統合した完全なシステム構築
- 化学反応器の実践的なケーススタディ
- 実運用への展開戦略
重要なポイント
- データバッファリング: dequeを使った固定長キューで、メモリ効率的なリアルタイム処理を実現
- アラーム設計: 重要度分類と適切な閾値設定で、オペレータの負担を軽減
- KPI可視化: OEEとその構成要素を視覚的に表示し、改善機会を明確化
- 統合ダッシュボード: 複数の変数を同時に監視できるインターフェースで、プロセス全体を俯瞰
- スケーラビリティ: モジュール化された設計で、変数追加や機能拡張が容易
実務での応用
本章で学んだ監視システムは、以下のような実プロセスで活用できます:
- 化学プラント: 反応器、蒸留塔、熱交換器の統合監視
- 製薬プラント: GMP準拠の記録管理とアラームシステム
- 食品プラント: HACCP対応の温度・pH管理
- 半導体製造: クリーンルーム環境の精密監視
- 発電プラント: ボイラー、タービンの効率監視
次のステップ
シリーズを完了したあなたは、以下のスキルを習得しました:
- ✅ プロセスモニタリングの基礎知識と実装能力
- ✅ 統計的プロセス管理(SPC)の実践スキル
- ✅ 機械学習・深層学習による異常検知
- ✅ PID制御の理論と実装
- ✅ リアルタイム監視システムの構築能力
さらなる学習のために:
- モデル予測制御(MPC): 多変数制御の高度な手法
- デジタルツイン: プロセスの仮想モデル構築と予測シミュレーション
- 強化学習制御: AIによる適応的プロセス制御
- ソフトセンサー開発: 機械学習による困難な変数の推定
- クラウド統合: AWS/Azure/GCPとの連携とビッグデータ分析
シリーズ完了おめでとうございます!
あなたは「プロセスモニタリング・制御入門シリーズ v1.0」の全5章を完了しました。センサーデータの取得から、統計的プロセス管理、異常検知、PID制御、そしてリアルタイム監視システムの構築まで、プロセスエンジニアリングの包括的な知識とスキルを習得しました。
この知識を活かして、次のステップへ進みましょう:
- ✅ 自社プロセスへの適用を検討する
- ✅ 学んだコードをGitHubで公開・共有する
- ✅ 実データでソフトセンサーやダッシュボードを構築する
- ✅ プロセス制御エンジニアとしてのキャリアを発展させる
あなたのフィードバックをお待ちしています!
このシリーズの改善提案、質問、成功事例などがあれば、ぜひお知らせください。
連絡先: yusuke.hashimoto.b8@tohoku.ac.jp