Digital TwinとIoTセンサーの統合により、物理システムとデジタル空間を同期する実装技術
Digital Twinの精度と実用性は、物理システムとデジタルモデルの同期精度に大きく依存します。本章では、IoTセンサーから得られるリアルタイムデータをDigital Twinに統合する7つの実践的な手法を学びます。
プロセス産業におけるリアルタイムデータ連携では、以下の課題に対処する必要があります:
MQTTは軽量で信頼性の高いメッセージングプロトコルで、IoTデバイスとの通信に広く使われています。
"""
Example 1: MQTT通信によるリアルタイムセンサーデータ取得
化学反応器の温度・圧力センサーからMQTT経由でデータを受信
"""
import paho.mqtt.client as mqtt
import json
from datetime import datetime
from typing import Dict, Callable
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MQTTSensorClient:
"""MQTTプロトコルでIoTセンサーと通信するクライアント
化学プラントの反応器センサー(温度、圧力、流量)から
リアルタイムデータを取得し、Digital Twinに送信します。
"""
def __init__(self, broker_address: str, port: int = 1883):
"""
Args:
broker_address: MQTTブローカーのIPアドレス/ホスト名
port: MQTTポート番号(デフォルト1883)
"""
self.client = mqtt.Client(client_id="digital_twin_client")
self.broker_address = broker_address
self.port = port
self.data_callbacks: Dict[str, Callable] = {}
# コールバック関数の登録
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
def _on_connect(self, client, userdata, flags, rc):
"""接続確立時のコールバック"""
if rc == 0:
logger.info(f"MQTTブローカーに接続成功: {self.broker_address}")
# 反応器のすべてのセンサートピックをサブスクライブ
self.client.subscribe("reactor/+/sensor/#")
else:
logger.error(f"接続失敗: return code {rc}")
def _on_message(self, client, userdata, msg):
"""メッセージ受信時のコールバック"""
try:
# JSONペイロードをパース
payload = json.loads(msg.payload.decode())
topic = msg.topic
# タイムスタンプの追加(センサーデータに含まれない場合)
if 'timestamp' not in payload:
payload['timestamp'] = datetime.now().isoformat()
logger.info(f"受信: {topic} - {payload}")
# 登録されたコールバック関数を実行
for callback in self.data_callbacks.values():
callback(topic, payload)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error: {e}")
except Exception as e:
logger.error(f"メッセージ処理エラー: {e}")
def _on_disconnect(self, client, userdata, rc):
"""切断時のコールバック"""
if rc != 0:
logger.warning(f"予期しない切断: return code {rc}")
logger.info("再接続を試行中...")
def register_callback(self, name: str, callback: Callable):
"""データ受信時のコールバック関数を登録
Args:
name: コールバックの識別名
callback: 実行する関数 callback(topic: str, payload: dict)
"""
self.data_callbacks[name] = callback
def connect(self):
"""MQTTブローカーに接続"""
self.client.connect(self.broker_address, self.port, 60)
self.client.loop_start()
def disconnect(self):
"""MQTTブローカーから切断"""
self.client.loop_stop()
self.client.disconnect()
# 使用例
def process_sensor_data(topic: str, payload: dict):
"""受信したセンサーデータをDigital Twinに送信"""
reactor_id = topic.split('/')[1]
sensor_type = topic.split('/')[-1]
print(f"Reactor {reactor_id} - {sensor_type}: {payload['value']} {payload['unit']}")
# Digital Twinモデルに反映(後述)
# digital_twin.update_sensor_value(reactor_id, sensor_type, payload)
if __name__ == "__main__":
# MQTTクライアントの初期化
mqtt_client = MQTTSensorClient(broker_address="192.168.1.100")
# データ処理コールバックの登録
mqtt_client.register_callback("process_data", process_sensor_data)
# 接続開始
mqtt_client.connect()
try:
# 継続的にデータを受信(Ctrl+Cで終了)
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
mqtt_client.disconnect()
print("\n接続を終了しました")
# 期待される出力例:
# INFO:__main__:MQTTブローカーに接続成功: 192.168.1.100
# INFO:__main__:受信: reactor/R101/sensor/temperature - {'value': 85.3, 'unit': 'degC', 'timestamp': '2025-10-26T10:30:15'}
# Reactor R101 - temperature: 85.3 degC
# INFO:__main__:受信: reactor/R101/sensor/pressure - {'value': 2.45, 'unit': 'MPa', 'timestamp': '2025-10-26T10:30:15'}
# Reactor R101 - pressure: 2.45 MPa
OPC UA(OPC Unified Architecture)は産業オートメーション分野の標準通信プロトコルで、PLCやDCSとの接続に使用されます。
"""
Example 2: OPC UA通信による産業機器データ取得
PLCから反応器の制御パラメータと測定値を取得
"""
from opcua import Client, ua
from typing import List, Dict
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OPCUAReactorClient:
"""OPC UAで反応器データを取得するクライアント
DCS(分散制御システム)やPLCから設定値(SV)と
測定値(PV)をリアルタイムに取得します。
"""
def __init__(self, server_url: str):
"""
Args:
server_url: OPC UAサーバーのURL(例: "opc.tcp://192.168.1.50:4840")
"""
self.client = Client(server_url)
self.server_url = server_url
self.subscriptions = []
def connect(self):
"""OPC UAサーバーに接続"""
try:
self.client.connect()
logger.info(f"OPC UAサーバーに接続: {self.server_url}")
# サーバー情報の取得
server_info = self.client.get_server_node()
logger.info(f"サーバー名: {server_info.get_browse_name()}")
except Exception as e:
logger.error(f"接続エラー: {e}")
raise
def read_node(self, node_id: str) -> Dict:
"""OPC UAノードから値を読み取り
Args:
node_id: ノードID(例: "ns=2;s=Reactor.R101.Temperature")
Returns:
{'value': 85.3, 'quality': 'Good', 'timestamp': '2025-10-26T10:30:15'}
"""
try:
node = self.client.get_node(node_id)
data_value = node.get_data_value()
return {
'value': data_value.Value.Value,
'quality': str(data_value.StatusCode),
'timestamp': data_value.SourceTimestamp.isoformat(),
'node_id': node_id
}
except Exception as e:
logger.error(f"ノード読み取りエラー ({node_id}): {e}")
return None
def read_multiple_nodes(self, node_ids: List[str]) -> Dict[str, Dict]:
"""複数ノードを一括読み取り(効率的)
Args:
node_ids: ノードIDのリスト
Returns:
{node_id: {'value': ..., 'quality': ..., 'timestamp': ...}}
"""
results = {}
nodes = [self.client.get_node(nid) for nid in node_ids]
# 一括読み取り(パフォーマンス向上)
data_values = self.client.get_values(nodes)
for node_id, data_value in zip(node_ids, data_values):
results[node_id] = {
'value': data_value,
'quality': 'Good', # 簡略化(実際はStatusCodeを確認)
'timestamp': time.time()
}
return results
def subscribe_to_changes(self, node_id: str, callback):
"""ノードの変化をサブスクライブ(変更時のみ通知)
Args:
node_id: 監視するノードID
callback: 変更時に呼ばれる関数 callback(node_id, value)
"""
try:
# サブスクリプションの作成(100ms間隔)
subscription = self.client.create_subscription(100, callback)
node = self.client.get_node(node_id)
# データ変更の監視開始
handle = subscription.subscribe_data_change(node)
self.subscriptions.append((subscription, handle))
logger.info(f"サブスクリプション開始: {node_id}")
except Exception as e:
logger.error(f"サブスクリプションエラー: {e}")
def disconnect(self):
"""OPC UAサーバーから切断"""
# すべてのサブスクリプションを削除
for subscription, handle in self.subscriptions:
subscription.unsubscribe(handle)
subscription.delete()
self.client.disconnect()
logger.info("OPC UAサーバーから切断")
# 使用例
if __name__ == "__main__":
# OPC UAクライアントの初期化
opcua_client = OPCUAReactorClient(server_url="opc.tcp://192.168.1.50:4840")
opcua_client.connect()
# 反応器R101の監視ノード
reactor_nodes = [
"ns=2;s=Reactor.R101.Temperature.PV", # 温度測定値
"ns=2;s=Reactor.R101.Temperature.SV", # 温度設定値
"ns=2;s=Reactor.R101.Pressure.PV", # 圧力測定値
"ns=2;s=Reactor.R101.Pressure.SV", # 圧力設定値
"ns=2;s=Reactor.R101.FlowRate.PV", # 流量測定値
]
try:
# 複数ノードを一括読み取り
data = opcua_client.read_multiple_nodes(reactor_nodes)
for node_id, values in data.items():
print(f"{node_id}: {values['value']}")
# 連続監視(10秒間)
print("\n連続監視を開始...")
for _ in range(10):
data = opcua_client.read_multiple_nodes(reactor_nodes)
temp_pv = data["ns=2;s=Reactor.R101.Temperature.PV"]['value']
pres_pv = data["ns=2;s=Reactor.R101.Pressure.PV"]['value']
print(f"Temperature: {temp_pv:.1f}°C, Pressure: {pres_pv:.2f} MPa")
time.sleep(1)
finally:
opcua_client.disconnect()
# 期待される出力例:
# INFO:__main__:OPC UAサーバーに接続: opc.tcp://192.168.1.50:4840
# ns=2;s=Reactor.R101.Temperature.PV: 85.3
# ns=2;s=Reactor.R101.Temperature.SV: 85.0
# ns=2;s=Reactor.R101.Pressure.PV: 2.45
# ns=2;s=Reactor.R101.Pressure.SV: 2.50
# ns=2;s=Reactor.R101.FlowRate.PV: 120.5
#
# 連続監視を開始...
# Temperature: 85.3°C, Pressure: 2.45 MPa
# Temperature: 85.5°C, Pressure: 2.46 MPa
# Temperature: 85.4°C, Pressure: 2.45 MPa
InfluxDBは高性能な時系列データベースで、センサーデータの長期保存とクエリに最適です。
"""
Example 3: InfluxDB時系列データベース統合
センサーデータの効率的な保存と高速クエリ
"""
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime, timedelta
import pandas as pd
from typing import List, Dict
class ReactorDataStore:
"""InfluxDBを使った反応器データの保存・取得
センサーデータを時系列データベースに保存し、
Digital Twin用の高速クエリを実現します。
"""
def __init__(self, url: str, token: str, org: str, bucket: str):
"""
Args:
url: InfluxDBのURL(例: "http://localhost:8086")
token: 認証トークン
org: 組織名
bucket: バケット名(データベース名に相当)
"""
self.client = InfluxDBClient(url=url, token=token, org=org)
self.org = org
self.bucket = bucket
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
def write_sensor_data(self, reactor_id: str, sensor_type: str,
value: float, unit: str, quality: str = "Good"):
"""センサーデータを書き込み
Args:
reactor_id: 反応器ID(例: "R101")
sensor_type: センサータイプ(例: "temperature")
value: 測定値
unit: 単位(例: "degC")
quality: データ品質("Good", "Bad", "Uncertain")
"""
point = (
Point("reactor_sensor")
.tag("reactor_id", reactor_id)
.tag("sensor_type", sensor_type)
.tag("unit", unit)
.tag("quality", quality)
.field("value", value)
.time(datetime.utcnow())
)
self.write_api.write(bucket=self.bucket, record=point)
def write_batch_data(self, data_points: List[Dict]):
"""複数データポイントをバッチ書き込み(高速化)
Args:
data_points: [{'reactor_id': 'R101', 'sensor_type': 'temperature', ...}]
"""
points = []
for dp in data_points:
point = (
Point("reactor_sensor")
.tag("reactor_id", dp['reactor_id'])
.tag("sensor_type", dp['sensor_type'])
.tag("unit", dp.get('unit', ''))
.field("value", dp['value'])
.time(dp.get('timestamp', datetime.utcnow()))
)
points.append(point)
self.write_api.write(bucket=self.bucket, record=points)
def query_latest_value(self, reactor_id: str, sensor_type: str) -> Dict:
"""最新のセンサー値を取得
Args:
reactor_id: 反応器ID
sensor_type: センサータイプ
Returns:
{'time': '2025-10-26T10:30:15', 'value': 85.3, 'unit': 'degC'}
"""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "reactor_sensor")
|> filter(fn: (r) => r.reactor_id == "{reactor_id}")
|> filter(fn: (r) => r.sensor_type == "{sensor_type}")
|> last()
'''
result = self.query_api.query(query=query, org=self.org)
if result and len(result) > 0 and len(result[0].records) > 0:
record = result[0].records[0]
return {
'time': record.get_time().isoformat(),
'value': record.get_value(),
'unit': record.values.get('unit', ''),
'quality': record.values.get('quality', 'Unknown')
}
return None
def query_time_range(self, reactor_id: str, sensor_type: str,
start: datetime, end: datetime) -> pd.DataFrame:
"""指定期間のデータを取得してDataFrameに変換
Args:
reactor_id: 反応器ID
sensor_type: センサータイプ
start: 開始時刻
end: 終了時刻
Returns:
pd.DataFrame with columns ['time', 'value', 'unit']
"""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start.isoformat()}Z, stop: {end.isoformat()}Z)
|> filter(fn: (r) => r._measurement == "reactor_sensor")
|> filter(fn: (r) => r.reactor_id == "{reactor_id}")
|> filter(fn: (r) => r.sensor_type == "{sensor_type}")
'''
result = self.query_api.query_data_frame(query=query, org=self.org)
if not result.empty:
# 必要な列のみ抽出
df = result[['_time', '_value', 'unit']].copy()
df.columns = ['time', 'value', 'unit']
return df
return pd.DataFrame(columns=['time', 'value', 'unit'])
def query_aggregated_data(self, reactor_id: str, sensor_type: str,
window: str = "1m") -> pd.DataFrame:
"""集計データを取得(平均、最大、最小)
Args:
reactor_id: 反応器ID
sensor_type: センサータイプ
window: 集計ウィンドウ(例: "1m", "5m", "1h")
Returns:
pd.DataFrame with columns ['time', 'mean', 'max', 'min']
"""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "reactor_sensor")
|> filter(fn: (r) => r.reactor_id == "{reactor_id}")
|> filter(fn: (r) => r.sensor_type == "{sensor_type}")
|> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
'''
result = self.query_api.query_data_frame(query=query, org=self.org)
if not result.empty:
df = result[['_time', '_value']].copy()
df.columns = ['time', 'mean']
return df
return pd.DataFrame(columns=['time', 'mean'])
def close(self):
"""データベース接続をクローズ"""
self.client.close()
# 使用例
if __name__ == "__main__":
# InfluxDBクライアントの初期化
db = ReactorDataStore(
url="http://localhost:8086",
token="your_influxdb_token",
org="your_org",
bucket="reactor_data"
)
try:
# 単一データポイントの書き込み
db.write_sensor_data(
reactor_id="R101",
sensor_type="temperature",
value=85.3,
unit="degC"
)
# バッチ書き込み(効率的)
batch_data = [
{'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.5, 'unit': 'degC'},
{'reactor_id': 'R101', 'sensor_type': 'pressure', 'value': 2.45, 'unit': 'MPa'},
{'reactor_id': 'R101', 'sensor_type': 'flow_rate', 'value': 120.5, 'unit': 'L/min'},
]
db.write_batch_data(batch_data)
# 最新値の取得
latest = db.query_latest_value("R101", "temperature")
print(f"最新温度: {latest['value']} {latest['unit']} at {latest['time']}")
# 過去1時間のデータ取得
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
df = db.query_time_range("R101", "temperature", start_time, end_time)
print(f"\n過去1時間のデータポイント数: {len(df)}")
print(df.head())
# 1分間隔の集計データ
agg_df = db.query_aggregated_data("R101", "temperature", window="1m")
print(f"\n1分平均データ(直近24時間):")
print(agg_df.tail())
finally:
db.close()
# 期待される出力例:
# 最新温度: 85.5 degC at 2025-10-26T10:30:15.123456
#
# 過去1時間のデータポイント数: 3600
# time value unit
# 0 2025-10-26 09:30:15.123 85.1 degC
# 1 2025-10-26 09:30:16.234 85.2 degC
# 2 2025-10-26 09:30:17.345 85.3 degC
#
# 1分平均データ(直近24時間):
# time mean
# 1436 2025-10-26 10:26:00 85.2
# 1437 2025-10-26 10:27:00 85.3
# 1438 2025-10-26 10:28:00 85.4
# 1439 2025-10-26 10:29:00 85.5
# 1440 2025-10-26 10:30:00 85.5
センサーデータには必ずノイズや異常値が含まれます。Digital Twinに送る前に品質チェックが必要です。
"""
Example 4: リアルタイムデータ検証とクリーニング
センサーデータの品質チェック、異常値検出、欠損値補完
"""
import numpy as np
import pandas as pd
from typing import Optional, Dict, Tuple
from dataclasses import dataclass
from enum import Enum
class DataQuality(Enum):
"""データ品質レベル"""
GOOD = "Good" # 正常データ
UNCERTAIN = "Uncertain" # 不確実(補正済み)
BAD = "Bad" # 異常データ(使用不可)
@dataclass
class SensorReading:
"""センサー読み取り値"""
reactor_id: str
sensor_type: str
value: float
unit: str
timestamp: str
quality: DataQuality = DataQuality.GOOD
class DataValidator:
"""リアルタイムデータ検証・クリーニング
物理的制約チェック、統計的外れ値検出、
欠損値補完を実行してデータ品質を保証します。
"""
def __init__(self):
# センサーごとの物理的範囲(プロセス知識に基づく)
self.physical_ranges = {
'temperature': {'min': -50, 'max': 400, 'unit': 'degC'},
'pressure': {'min': 0, 'max': 10, 'unit': 'MPa'},
'flow_rate': {'min': 0, 'max': 500, 'unit': 'L/min'},
'ph': {'min': 0, 'max': 14, 'unit': ''},
}
# 移動平均用の履歴データ(最大100ポイント)
self.history: Dict[str, list] = {}
self.max_history = 100
def validate_physical_range(self, reading: SensorReading) -> Tuple[bool, str]:
"""物理的範囲チェック
Args:
reading: センサー読み取り値
Returns:
(is_valid, message)
"""
if reading.sensor_type not in self.physical_ranges:
return True, "Range not defined"
range_info = self.physical_ranges[reading.sensor_type]
value = reading.value
if value < range_info['min'] or value > range_info['max']:
msg = (f"{reading.sensor_type}が物理的範囲外: {value} "
f"(許容範囲: {range_info['min']}-{range_info['max']} {range_info['unit']})")
return False, msg
return True, "OK"
def detect_outlier_zscore(self, reading: SensorReading,
threshold: float = 3.0) -> Tuple[bool, str]:
"""Z-scoreによる統計的外れ値検出
Args:
reading: センサー読み取り値
threshold: Z-scoreのしきい値(デフォルト3.0)
Returns:
(is_outlier, message)
"""
key = f"{reading.reactor_id}_{reading.sensor_type}"
# 履歴データに追加
if key not in self.history:
self.history[key] = []
self.history[key].append(reading.value)
# 履歴が少ない場合は判定不可
if len(self.history[key]) < 10:
return False, "Insufficient history"
# 最新100ポイントのみ保持
if len(self.history[key]) > self.max_history:
self.history[key] = self.history[key][-self.max_history:]
# Z-score計算
values = np.array(self.history[key])
mean = np.mean(values)
std = np.std(values)
if std < 1e-6: # 標準偏差がゼロに近い場合
return False, "Constant values"
z_score = abs((reading.value - mean) / std)
if z_score > threshold:
msg = f"外れ値検出: Z-score={z_score:.2f} (threshold={threshold})"
return True, msg
return False, f"OK (Z-score={z_score:.2f})"
def detect_spike(self, reading: SensorReading,
max_change_rate: float = 0.2) -> Tuple[bool, str]:
"""急激な変化(スパイク)の検出
Args:
reading: センサー読み取り値
max_change_rate: 許容変化率(例: 0.2 = 20%)
Returns:
(is_spike, message)
"""
key = f"{reading.reactor_id}_{reading.sensor_type}"
if key not in self.history or len(self.history[key]) == 0:
return False, "No previous value"
previous_value = self.history[key][-1]
# ゼロ除算を避ける
if abs(previous_value) < 1e-6:
return False, "Previous value near zero"
# 変化率を計算
change_rate = abs((reading.value - previous_value) / previous_value)
if change_rate > max_change_rate:
msg = f"急激な変化検出: {change_rate*100:.1f}% (許容: {max_change_rate*100:.1f}%)"
return True, msg
return False, f"OK (変化率: {change_rate*100:.1f}%)"
def interpolate_missing_value(self, reactor_id: str,
sensor_type: str) -> Optional[float]:
"""欠損値を線形補間
Args:
reactor_id: 反応器ID
sensor_type: センサータイプ
Returns:
補間値(履歴がない場合はNone)
"""
key = f"{reactor_id}_{sensor_type}"
if key not in self.history or len(self.history[key]) < 2:
return None
# 最新2点の平均値で補間(簡易的)
recent_values = self.history[key][-2:]
interpolated = np.mean(recent_values)
return interpolated
def validate(self, reading: SensorReading) -> SensorReading:
"""総合検証を実行してデータ品質を判定
Args:
reading: センサー読み取り値
Returns:
品質フラグを更新したSensorReading
"""
# 1. 物理的範囲チェック
is_valid, msg = self.validate_physical_range(reading)
if not is_valid:
print(f"[BAD] {msg}")
reading.quality = DataQuality.BAD
return reading
# 2. 外れ値検出
is_outlier, msg = self.detect_outlier_zscore(reading)
if is_outlier:
print(f"[UNCERTAIN] {msg}")
reading.quality = DataQuality.UNCERTAIN
# 3. スパイク検出
is_spike, msg = self.detect_spike(reading)
if is_spike:
print(f"[UNCERTAIN] {msg}")
reading.quality = DataQuality.UNCERTAIN
return reading
# 使用例
if __name__ == "__main__":
validator = DataValidator()
# 正常データのシミュレーション
print("=== 正常データ ===")
for i in range(5):
reading = SensorReading(
reactor_id="R101",
sensor_type="temperature",
value=85.0 + np.random.normal(0, 0.5), # 平均85℃、標準偏差0.5℃
unit="degC",
timestamp=f"2025-10-26T10:30:{i:02d}"
)
validated = validator.validate(reading)
print(f"Time {i}: {validated.value:.2f}°C - Quality: {validated.quality.value}")
# 物理的範囲外のデータ
print("\n=== 物理的範囲外データ ===")
bad_reading = SensorReading(
reactor_id="R101",
sensor_type="temperature",
value=500.0, # 範囲外(max 400℃)
unit="degC",
timestamp="2025-10-26T10:30:10"
)
validated = validator.validate(bad_reading)
print(f"Value: {validated.value}°C - Quality: {validated.quality.value}")
# 外れ値(スパイク)
print("\n=== スパイクデータ ===")
spike_reading = SensorReading(
reactor_id="R101",
sensor_type="temperature",
value=120.0, # 急激な上昇
unit="degC",
timestamp="2025-10-26T10:30:15"
)
validated = validator.validate(spike_reading)
print(f"Value: {validated.value}°C - Quality: {validated.quality.value}")
# 欠損値の補間
print("\n=== 欠損値補間 ===")
interpolated = validator.interpolate_missing_value("R101", "temperature")
if interpolated:
print(f"補間値: {interpolated:.2f}°C")
# 期待される出力例:
# === 正常データ ===
# Time 0: 85.23°C - Quality: Good
# Time 1: 84.87°C - Quality: Good
# Time 2: 85.41°C - Quality: Good
# Time 3: 85.12°C - Quality: Good
# Time 4: 84.95°C - Quality: Good
#
# === 物理的範囲外データ ===
# [BAD] temperatureが物理的範囲外: 500.0 (許容範囲: -50-400 degC)
# Value: 500.0°C - Quality: Bad
#
# === スパイクデータ ===
# [UNCERTAIN] 急激な変化検出: 41.2% (許容: 20.0%)
# Value: 120.0°C - Quality: Uncertain
#
# === 欠損値補間 ===
# 補間値: 85.04°C
Kafka風のストリーミング処理で、複数のセンサーからのデータを統合的に処理します。
"""
Example 5: データストリーミングパイプライン
複数センサーからのデータをリアルタイムに処理・配信
"""
from queue import Queue
from threading import Thread, Event
from typing import Callable, List
import time
from datetime import datetime
import json
class DataStream:
"""リアルタイムデータストリーム
センサーデータをキューで受け取り、複数の処理を
パイプライン形式で実行します。
"""
def __init__(self, name: str, buffer_size: int = 1000):
"""
Args:
name: ストリーム名
buffer_size: バッファサイズ(キューの最大長)
"""
self.name = name
self.queue = Queue(maxsize=buffer_size)
self.processors: List[Callable] = []
self.is_running = Event()
self.worker_thread = None
def add_processor(self, processor: Callable):
"""データ処理関数を追加
Args:
processor: データを受け取って処理する関数 processor(data)
"""
self.processors.append(processor)
def publish(self, data: dict):
"""データをストリームに送信
Args:
data: センサーデータ辞書
"""
try:
self.queue.put(data, timeout=1)
except Exception as e:
print(f"[{self.name}] キューがフル: {e}")
def _process_loop(self):
"""データ処理ループ(別スレッドで実行)"""
while self.is_running.is_set():
try:
# キューからデータを取得(1秒タイムアウト)
data = self.queue.get(timeout=1)
# すべての処理関数を順次実行
for processor in self.processors:
try:
data = processor(data)
if data is None: # 処理でフィルタされた場合
break
except Exception as e:
print(f"[{self.name}] 処理エラー: {e}")
break
except Exception:
# タイムアウト(データなし)→ 継続
continue
def start(self):
"""ストリーム処理を開始"""
if self.worker_thread and self.worker_thread.is_alive():
print(f"[{self.name}] 既に実行中です")
return
self.is_running.set()
self.worker_thread = Thread(target=self._process_loop, daemon=True)
self.worker_thread.start()
print(f"[{self.name}] ストリーム処理を開始")
def stop(self):
"""ストリーム処理を停止"""
self.is_running.clear()
if self.worker_thread:
self.worker_thread.join(timeout=5)
print(f"[{self.name}] ストリーム処理を停止")
class StreamingPipeline:
"""複数ストリームを管理するパイプライン"""
def __init__(self):
self.streams = {}
def create_stream(self, name: str) -> DataStream:
"""新しいストリームを作成
Args:
name: ストリーム名
Returns:
DataStreamオブジェクト
"""
stream = DataStream(name)
self.streams[name] = stream
return stream
def start_all(self):
"""すべてのストリームを開始"""
for stream in self.streams.values():
stream.start()
def stop_all(self):
"""すべてのストリームを停止"""
for stream in self.streams.values():
stream.stop()
# データ処理関数の例
def enrich_with_metadata(data: dict) -> dict:
"""メタデータを追加"""
data['processed_at'] = datetime.now().isoformat()
data['pipeline'] = 'streaming_v1'
return data
def filter_bad_quality(data: dict) -> dict:
"""品質が悪いデータをフィルタ"""
if data.get('quality') == 'Bad':
print(f"不良データをフィルタ: {data}")
return None # None = このデータは破棄
return data
def aggregate_1sec_window(data: dict) -> dict:
"""1秒ウィンドウで集計(簡易版)"""
# 実際は時間窓で複数データを集計
data['aggregation'] = '1sec_mean'
return data
def send_to_digital_twin(data: dict) -> dict:
"""Digital Twinモデルに送信"""
print(f"Digital Twinに送信: {data['reactor_id']}.{data['sensor_type']} = {data['value']}")
# 実際はDigital TwinのAPIを呼び出し
return data
def save_to_database(data: dict) -> dict:
"""データベースに保存"""
# 実際はInfluxDB等に保存
print(f"DBに保存: {json.dumps(data, ensure_ascii=False)}")
return data
# 使用例
if __name__ == "__main__":
# パイプラインの作成
pipeline = StreamingPipeline()
# 温度データストリーム
temp_stream = pipeline.create_stream("temperature_stream")
temp_stream.add_processor(enrich_with_metadata)
temp_stream.add_processor(filter_bad_quality)
temp_stream.add_processor(aggregate_1sec_window)
temp_stream.add_processor(send_to_digital_twin)
temp_stream.add_processor(save_to_database)
# 圧力データストリーム
pres_stream = pipeline.create_stream("pressure_stream")
pres_stream.add_processor(enrich_with_metadata)
pres_stream.add_processor(filter_bad_quality)
pres_stream.add_processor(send_to_digital_twin)
# すべてのストリームを開始
pipeline.start_all()
try:
# センサーデータをシミュレーション
for i in range(10):
# 温度データ
temp_data = {
'reactor_id': 'R101',
'sensor_type': 'temperature',
'value': 85.0 + i * 0.1,
'unit': 'degC',
'quality': 'Good' if i < 8 else 'Bad', # 8番目以降は不良
'timestamp': datetime.now().isoformat()
}
temp_stream.publish(temp_data)
# 圧力データ
pres_data = {
'reactor_id': 'R101',
'sensor_type': 'pressure',
'value': 2.45 + i * 0.01,
'unit': 'MPa',
'quality': 'Good',
'timestamp': datetime.now().isoformat()
}
pres_stream.publish(pres_data)
time.sleep(0.5) # 0.5秒間隔でデータ送信
# 処理が完了するまで待機
time.sleep(2)
finally:
# すべてのストリームを停止
pipeline.stop_all()
# 期待される出力例:
# [temperature_stream] ストリーム処理を開始
# [pressure_stream] ストリーム処理を開始
# Digital Twinに送信: R101.temperature = 85.0
# DBに保存: {"reactor_id": "R101", "sensor_type": "temperature", "value": 85.0, ...}
# Digital Twinに送信: R101.pressure = 2.45
# Digital Twinに送信: R101.temperature = 85.1
# DBに保存: {"reactor_id": "R101", "sensor_type": "temperature", "value": 85.1, ...}
# ...
# 不良データをフィルタ: {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.8, 'quality': 'Bad'}
# 不良データをフィルタ: {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.9, 'quality': 'Bad'}
# [temperature_stream] ストリーム処理を停止
# [pressure_stream] ストリーム処理を停止
ノイズを含むセンサーデータから、真の状態を推定するカルマンフィルタの実装です。
"""
Example 6: カルマンフィルタによる状態推定
ノイズを含むセンサーデータから反応器の真の状態を推定
"""
import numpy as np
from dataclasses import dataclass
from typing import Tuple
@dataclass
class KalmanState:
"""カルマンフィルタの状態"""
x: np.ndarray # 状態ベクトル [温度, 温度変化率]
P: np.ndarray # 誤差共分散行列
class ReactorKalmanFilter:
"""反応器状態推定用カルマンフィルタ
温度センサーのノイズを除去し、真の温度と
温度変化率を推定します。
"""
def __init__(self,
initial_temp: float,
process_noise: float = 0.01,
measurement_noise: float = 0.5):
"""
Args:
initial_temp: 初期温度 [℃]
process_noise: プロセスノイズ(システムの不確実性)
measurement_noise: 測定ノイズ(センサーノイズ)
"""
# 状態ベクトル: [温度, 温度変化率]
self.state = KalmanState(
x=np.array([initial_temp, 0.0]), # [℃, ℃/s]
P=np.eye(2) * 1.0 # 初期誤差共分散
)
# システムモデル: x(k+1) = F * x(k) + w
# dt = 1秒と仮定
dt = 1.0
self.F = np.array([
[1.0, dt], # 温度(k+1) = 温度(k) + 変化率(k) * dt
[0.0, 1.0] # 変化率(k+1) = 変化率(k) (一定と仮定)
])
# 観測モデル: z(k) = H * x(k) + v
# 温度のみ観測可能
self.H = np.array([[1.0, 0.0]])
# プロセスノイズ共分散
self.Q = np.array([
[process_noise, 0.0],
[0.0, process_noise]
])
# 測定ノイズ共分散
self.R = np.array([[measurement_noise]])
def predict(self) -> Tuple[float, float]:
"""予測ステップ(システムモデルに基づく状態予測)
Returns:
(predicted_temp, predicted_rate) 予測温度と変化率
"""
# 状態予測: x_pred = F * x
x_pred = self.F @ self.state.x
# 誤差共分散予測: P_pred = F * P * F^T + Q
P_pred = self.F @ self.state.P @ self.F.T + self.Q
# 予測状態を更新
self.state.x = x_pred
self.state.P = P_pred
return self.state.x[0], self.state.x[1]
def update(self, measurement: float) -> Tuple[float, float]:
"""更新ステップ(測定値で状態を補正)
Args:
measurement: センサー測定値(温度)[℃]
Returns:
(estimated_temp, estimated_rate) 推定温度と変化率
"""
# イノベーション(予測と測定の差): y = z - H * x_pred
z = np.array([measurement])
y = z - self.H @ self.state.x
# イノベーション共分散: S = H * P_pred * H^T + R
S = self.H @ self.state.P @ self.H.T + self.R
# カルマンゲイン: K = P_pred * H^T * S^-1
K = self.state.P @ self.H.T @ np.linalg.inv(S)
# 状態更新: x = x_pred + K * y
self.state.x = self.state.x + K @ y
# 誤差共分散更新: P = (I - K * H) * P_pred
I = np.eye(2)
self.state.P = (I - K @ self.H) @ self.state.P
return self.state.x[0], self.state.x[1]
def estimate(self, measurement: float) -> Tuple[float, float]:
"""予測→更新の1サイクルを実行
Args:
measurement: センサー測定値 [℃]
Returns:
(estimated_temp, estimated_rate) 推定温度と変化率
"""
# 予測
self.predict()
# 更新
return self.update(measurement)
# 使用例
if __name__ == "__main__":
# カルマンフィルタの初期化
kf = ReactorKalmanFilter(
initial_temp=85.0,
process_noise=0.01, # プロセスは安定
measurement_noise=0.5 # センサーノイズは中程度
)
# 真の温度プロファイル(ランプ加熱: 85℃ → 95℃)
time_steps = 50
true_temps = np.linspace(85.0, 95.0, time_steps)
# センサー測定値(ノイズあり)
np.random.seed(42)
noise = np.random.normal(0, 0.5, time_steps) # 標準偏差0.5℃
measured_temps = true_temps + noise
print("Time\tTrue\tMeasured\tEstimated\tRate")
print("-" * 60)
estimated_temps = []
estimated_rates = []
for t in range(time_steps):
# カルマンフィルタで推定
est_temp, est_rate = kf.estimate(measured_temps[t])
estimated_temps.append(est_temp)
estimated_rates.append(est_rate)
if t % 5 == 0: # 5秒ごとに表示
print(f"{t}\t{true_temps[t]:.2f}\t{measured_temps[t]:.2f}\t\t{est_temp:.2f}\t\t{est_rate:.3f}")
# 精度評価
mse_raw = np.mean((measured_temps - true_temps) ** 2)
mse_filtered = np.mean((np.array(estimated_temps) - true_temps) ** 2)
print(f"\n=== 精度評価 ===")
print(f"生データのMSE: {mse_raw:.4f}")
print(f"フィルタ後のMSE: {mse_filtered:.4f}")
print(f"精度改善率: {(1 - mse_filtered/mse_raw) * 100:.1f}%")
# 期待される出力例:
# Time True Measured Estimated Rate
# ------------------------------------------------------------
# 0 85.00 85.25 85.13 0.051
# 5 86.02 85.78 85.96 0.192
# 10 87.04 86.92 87.01 0.204
# 15 88.06 88.31 88.08 0.206
# 20 89.08 89.23 89.10 0.204
# 25 90.10 90.35 90.13 0.203
# 30 91.12 90.98 91.14 0.204
# 35 92.14 92.45 92.17 0.203
# 40 93.16 93.08 93.18 0.204
# 45 94.18 94.52 94.21 0.203
#
# === 精度評価 ===
# 生データのMSE: 0.2435
# フィルタ後のMSE: 0.0089
# 精度改善率: 96.3%
すべての要素を統合し、Digital Twinとのリアルタイム同期を実現する完全な実装です。
"""
Example 7: Digital Twin同期ループ
センサーデータ取得→検証→状態推定→モデル更新の完全サイクル
"""
import time
from datetime import datetime
from typing import Dict, Optional
import numpy as np
class DigitalTwinModel:
"""Digital Twinの簡易物理モデル
反応器の熱バランスモデルで、温度・圧力・流量から
反応進行度を計算します。
"""
def __init__(self, reactor_id: str):
self.reactor_id = reactor_id
# モデルパラメータ
self.heat_capacity = 4200 # J/(kg·K)
self.mass = 1000 # kg
self.heat_transfer_coeff = 500 # W/(m²·K)
self.area = 10 # m²
# 状態変数
self.state = {
'temperature': 85.0, # ℃
'pressure': 2.5, # MPa
'flow_rate': 120.0, # L/min
'conversion': 0.0, # 反応進行度 (0-1)
'last_update': datetime.now()
}
def update_from_sensors(self, sensor_data: Dict[str, float]):
"""センサーデータでモデルを更新
Args:
sensor_data: {'temperature': 85.3, 'pressure': 2.45, 'flow_rate': 120.5}
"""
# センサー値でモデル状態を更新
if 'temperature' in sensor_data:
self.state['temperature'] = sensor_data['temperature']
if 'pressure' in sensor_data:
self.state['pressure'] = sensor_data['pressure']
if 'flow_rate' in sensor_data:
self.state['flow_rate'] = sensor_data['flow_rate']
self.state['last_update'] = datetime.now()
def calculate_reaction_conversion(self) -> float:
"""反応進行度を計算(簡易モデル)
Arrhenius式に基づく反応速度の積分
Returns:
反応進行度 (0-1)
"""
T = self.state['temperature'] + 273.15 # K
P = self.state['pressure'] * 1e6 # Pa
# アレニウスパラメータ(仮想値)
A = 1e10 # 頻度因子
Ea = 80000 # 活性化エネルギー [J/mol]
R = 8.314 # 気体定数 [J/(mol·K)]
# 反応速度定数
k = A * np.exp(-Ea / (R * T))
# 圧力補正(簡易的)
k_eff = k * (P / 2.5e6) ** 0.5
# 反応進行度(時間積分の近似)
dt = 1.0 # 1秒
dX = k_eff * (1 - self.state['conversion']) * dt
new_conversion = min(self.state['conversion'] + dX, 1.0)
self.state['conversion'] = new_conversion
return new_conversion
def predict_next_state(self, time_horizon: float = 60) -> Dict:
"""将来の状態を予測
Args:
time_horizon: 予測時間 [秒]
Returns:
予測状態辞書
"""
# 現在の反応速度から将来の変換率を予測
predicted_conversion = min(
self.state['conversion'] + 0.01 * time_horizon,
1.0
)
# 発熱反応の場合、温度上昇を予測
heat_released = predicted_conversion * 100000 # J (仮想値)
temp_rise = heat_released / (self.mass * self.heat_capacity)
predicted_temp = self.state['temperature'] + temp_rise
return {
'temperature': predicted_temp,
'conversion': predicted_conversion,
'time_horizon': time_horizon
}
def get_state(self) -> Dict:
"""現在の状態を取得"""
return self.state.copy()
class DigitalTwinSyncLoop:
"""Digital Twin同期ループ
センサーデータ取得→検証→状態推定→モデル更新を
リアルタイムに実行します。
"""
def __init__(self, reactor_id: str, update_interval: float = 1.0):
"""
Args:
reactor_id: 反応器ID
update_interval: 更新間隔 [秒]
"""
self.reactor_id = reactor_id
self.update_interval = update_interval
# Digital Twinモデル
self.twin_model = DigitalTwinModel(reactor_id)
# カルマンフィルタ(温度用)
self.temp_filter = ReactorKalmanFilter(
initial_temp=85.0,
process_noise=0.01,
measurement_noise=0.5
)
# データバリデータ
self.validator = DataValidator()
self.is_running = False
self.cycle_count = 0
def fetch_sensor_data(self) -> Dict[str, float]:
"""センサーデータを取得(実際はMQTT/OPC UAから)
Returns:
{'temperature': 85.3, 'pressure': 2.45, 'flow_rate': 120.5}
"""
# ここでは模擬データを生成
# 実際はMQTTクライアントやOPC UAクライアントから取得
base_temp = 85.0 + self.cycle_count * 0.1 # ゆっくり昇温
noise = np.random.normal(0, 0.5)
return {
'temperature': base_temp + noise,
'pressure': 2.5 + np.random.normal(0, 0.02),
'flow_rate': 120.0 + np.random.normal(0, 1.0)
}
def run_single_cycle(self):
"""同期ループの1サイクルを実行"""
cycle_start = time.time()
# 1. センサーデータ取得
raw_sensor_data = self.fetch_sensor_data()
# 2. データ検証
temp_reading = SensorReading(
reactor_id=self.reactor_id,
sensor_type="temperature",
value=raw_sensor_data['temperature'],
unit="degC",
timestamp=datetime.now().isoformat()
)
validated_reading = self.validator.validate(temp_reading)
# 3. カルマンフィルタで状態推定
if validated_reading.quality != DataQuality.BAD:
est_temp, est_rate = self.temp_filter.estimate(validated_reading.value)
filtered_data = {
'temperature': est_temp,
'pressure': raw_sensor_data['pressure'],
'flow_rate': raw_sensor_data['flow_rate']
}
else:
# 不良データの場合は前回値を使用
filtered_data = None
# 4. Digital Twinモデル更新
if filtered_data:
self.twin_model.update_from_sensors(filtered_data)
# 5. 反応進行度計算
conversion = self.twin_model.calculate_reaction_conversion()
# 6. 将来予測
prediction = self.twin_model.predict_next_state(time_horizon=60)
# 7. 結果表示
state = self.twin_model.get_state()
cycle_time = (time.time() - cycle_start) * 1000 # ms
print(f"[Cycle {self.cycle_count}] "
f"Temp: {state['temperature']:.2f}°C "
f"(filtered: {est_temp:.2f}°C), "
f"Pressure: {state['pressure']:.2f} MPa, "
f"Conversion: {conversion*100:.1f}%, "
f"Prediction(60s): {prediction['conversion']*100:.1f}% "
f"({cycle_time:.1f}ms)")
self.cycle_count += 1
def run(self, duration: Optional[float] = None):
"""同期ループを開始
Args:
duration: 実行時間 [秒] (Noneの場合は無限ループ)
"""
self.is_running = True
start_time = time.time()
print(f"=== Digital Twin同期ループ開始 ===")
print(f"Reactor ID: {self.reactor_id}")
print(f"Update Interval: {self.update_interval}s")
print("-" * 80)
try:
while self.is_running:
# 1サイクル実行
self.run_single_cycle()
# 終了条件チェック
if duration and (time.time() - start_time) >= duration:
break
# 次の更新まで待機
time.sleep(self.update_interval)
except KeyboardInterrupt:
print("\n\nキーボード割り込みで停止")
finally:
self.is_running = False
print(f"\n=== 同期ループ終了 ===")
print(f"総サイクル数: {self.cycle_count}")
print(f"実行時間: {time.time() - start_time:.1f}秒")
# 使用例
if __name__ == "__main__":
# Digital Twin同期ループの作成
sync_loop = DigitalTwinSyncLoop(
reactor_id="R101",
update_interval=1.0 # 1秒ごとに更新
)
# 30秒間実行
sync_loop.run(duration=30)
# 期待される出力例:
# === Digital Twin同期ループ開始 ===
# Reactor ID: R101
# Update Interval: 1.0s
# --------------------------------------------------------------------------------
# [Cycle 0] Temp: 85.23°C (filtered: 85.13°C), Pressure: 2.51 MPa, Conversion: 0.2%, Prediction(60s): 0.8% (2.3ms)
# [Cycle 1] Temp: 85.34°C (filtered: 85.28°C), Pressure: 2.49 MPa, Conversion: 0.4%, Prediction(60s): 1.0% (1.8ms)
# [Cycle 2] Temp: 85.41°C (filtered: 85.39°C), Pressure: 2.52 MPa, Conversion: 0.6%, Prediction(60s): 1.2% (1.7ms)
# ...
# [Cycle 29] Temp: 88.12°C (filtered: 88.09°C), Pressure: 2.48 MPa, Conversion: 6.2%, Prediction(60s): 6.8% (1.9ms)
#
# === 同期ループ終了 ===
# 総サイクル数: 30
# 実行時間: 30.1秒
この章を完了すると、以下を実装できるようになります:
Q1: MQTTとOPC UAの主な違いは何ですか?最も適切なものを選んでください。
a) MQTTは産業用、OPC UAはIoT用
b) MQTTは軽量でIoT向き、OPC UAは産業オートメーション標準
c) MQTTは有料、OPC UAは無料
d) MQTTは古い規格、OPC UAは新しい規格
正解: b) MQTTは軽量でIoT向き、OPC UAは産業オートメーション標準
解説:
Q2: センサーデータのZ-scoreが3.5の場合、このデータは外れ値として扱うべきですか?Z-scoreのしきい値を3.0とした場合の判断と、その理由を説明してください。
正解: はい、外れ値として扱うべきです。
理由:
実務的な対応:
Q3: 1000個のセンサーが1秒間隔でデータを送信する化学プラントで、Digital Twin同期ループの更新間隔を1秒に設定しています。各サイクルの処理時間が平均800msの場合、システムは安定動作しますか?もし問題があれば、3つの改善策を提案してください。
正解: 不安定になる可能性が高い(処理時間 800ms ≈ 更新間隔 1000msでマージンが不十分)
問題点:
改善策:
実装例(並列処理):
from concurrent.futures import ThreadPoolExecutor
def process_sensor_group(sensor_ids):
# センサーグループの処理
pass
# 1000センサーを4グループに分割
groups = [sensors[i:i+250] for i in range(0, 1000, 250)]
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(process_sensor_group, groups)
# 処理時間: 800ms → 200ms
リアルタイムデータ連携の基盤ができたら、次はDigital Twinのモデリング手法を学びます。物理モデル(第一原理)とデータ駆動モデル(機械学習)を組み合わせたハイブリッドモデリングにより、高精度な予測と制御を実現します。