第2章:リアルタイムデータ連携

Digital TwinとIoTセンサーの統合により、物理システムとデジタル空間を同期する実装技術

📚 Digital Twin入門シリーズ ⏱️ 読了時間: 35分 💡 難易度: 中級

2.1 リアルタイムデータ連携の重要性

Digital Twinの精度と実用性は、物理システムとデジタルモデルの同期精度に大きく依存します。本章では、IoTセンサーから得られるリアルタイムデータをDigital Twinに統合する7つの実践的な手法を学びます。

2.1.1 データ連携の課題

プロセス産業におけるリアルタイムデータ連携では、以下の課題に対処する必要があります:

💡 産業界の実例
化学プラントでは1000点以上のセンサーが1秒間隔でデータを送信します。1日あたり86.4M(8640万)のデータポイントが生成され、これをリアルタイムに処理してDigital Twinに反映させる必要があります。

2.2 実装例1:MQTT通信によるIoTセンサーデータ取得

MQTTは軽量で信頼性の高いメッセージングプロトコルで、IoTデバイスとの通信に広く使われています。

"""
Example 1: MQTT通信によるリアルタイムセンサーデータ取得
化学反応器の温度・圧力センサーからMQTT経由でデータを受信
"""

import paho.mqtt.client as mqtt
import json
from datetime import datetime
from typing import Dict, Callable
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class MQTTSensorClient:
    """MQTTプロトコルでIoTセンサーと通信するクライアント

    化学プラントの反応器センサー(温度、圧力、流量)から
    リアルタイムデータを取得し、Digital Twinに送信します。
    """

    def __init__(self, broker_address: str, port: int = 1883):
        """
        Args:
            broker_address: MQTTブローカーのIPアドレス/ホスト名
            port: MQTTポート番号(デフォルト1883)
        """
        self.client = mqtt.Client(client_id="digital_twin_client")
        self.broker_address = broker_address
        self.port = port
        self.data_callbacks: Dict[str, Callable] = {}

        # コールバック関数の登録
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect

    def _on_connect(self, client, userdata, flags, rc):
        """接続確立時のコールバック"""
        if rc == 0:
            logger.info(f"MQTTブローカーに接続成功: {self.broker_address}")
            # 反応器のすべてのセンサートピックをサブスクライブ
            self.client.subscribe("reactor/+/sensor/#")
        else:
            logger.error(f"接続失敗: return code {rc}")

    def _on_message(self, client, userdata, msg):
        """メッセージ受信時のコールバック"""
        try:
            # JSONペイロードをパース
            payload = json.loads(msg.payload.decode())
            topic = msg.topic

            # タイムスタンプの追加(センサーデータに含まれない場合)
            if 'timestamp' not in payload:
                payload['timestamp'] = datetime.now().isoformat()

            logger.info(f"受信: {topic} - {payload}")

            # 登録されたコールバック関数を実行
            for callback in self.data_callbacks.values():
                callback(topic, payload)

        except json.JSONDecodeError as e:
            logger.error(f"JSON parse error: {e}")
        except Exception as e:
            logger.error(f"メッセージ処理エラー: {e}")

    def _on_disconnect(self, client, userdata, rc):
        """切断時のコールバック"""
        if rc != 0:
            logger.warning(f"予期しない切断: return code {rc}")
            logger.info("再接続を試行中...")

    def register_callback(self, name: str, callback: Callable):
        """データ受信時のコールバック関数を登録

        Args:
            name: コールバックの識別名
            callback: 実行する関数 callback(topic: str, payload: dict)
        """
        self.data_callbacks[name] = callback

    def connect(self):
        """MQTTブローカーに接続"""
        self.client.connect(self.broker_address, self.port, 60)
        self.client.loop_start()

    def disconnect(self):
        """MQTTブローカーから切断"""
        self.client.loop_stop()
        self.client.disconnect()


# 使用例
def process_sensor_data(topic: str, payload: dict):
    """受信したセンサーデータをDigital Twinに送信"""
    reactor_id = topic.split('/')[1]
    sensor_type = topic.split('/')[-1]

    print(f"Reactor {reactor_id} - {sensor_type}: {payload['value']} {payload['unit']}")

    # Digital Twinモデルに反映(後述)
    # digital_twin.update_sensor_value(reactor_id, sensor_type, payload)


if __name__ == "__main__":
    # MQTTクライアントの初期化
    mqtt_client = MQTTSensorClient(broker_address="192.168.1.100")

    # データ処理コールバックの登録
    mqtt_client.register_callback("process_data", process_sensor_data)

    # 接続開始
    mqtt_client.connect()

    try:
        # 継続的にデータを受信(Ctrl+Cで終了)
        import time
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        mqtt_client.disconnect()
        print("\n接続を終了しました")

# 期待される出力例:
# INFO:__main__:MQTTブローカーに接続成功: 192.168.1.100
# INFO:__main__:受信: reactor/R101/sensor/temperature - {'value': 85.3, 'unit': 'degC', 'timestamp': '2025-10-26T10:30:15'}
# Reactor R101 - temperature: 85.3 degC
# INFO:__main__:受信: reactor/R101/sensor/pressure - {'value': 2.45, 'unit': 'MPa', 'timestamp': '2025-10-26T10:30:15'}
# Reactor R101 - pressure: 2.45 MPa

2.3 実装例2:OPC UA統合による産業機器データ取得

OPC UA(OPC Unified Architecture)は産業オートメーション分野の標準通信プロトコルで、PLCやDCSとの接続に使用されます。

"""
Example 2: OPC UA通信による産業機器データ取得
PLCから反応器の制御パラメータと測定値を取得
"""

from opcua import Client, ua
from typing import List, Dict
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class OPCUAReactorClient:
    """OPC UAで反応器データを取得するクライアント

    DCS(分散制御システム)やPLCから設定値(SV)と
    測定値(PV)をリアルタイムに取得します。
    """

    def __init__(self, server_url: str):
        """
        Args:
            server_url: OPC UAサーバーのURL(例: "opc.tcp://192.168.1.50:4840")
        """
        self.client = Client(server_url)
        self.server_url = server_url
        self.subscriptions = []

    def connect(self):
        """OPC UAサーバーに接続"""
        try:
            self.client.connect()
            logger.info(f"OPC UAサーバーに接続: {self.server_url}")

            # サーバー情報の取得
            server_info = self.client.get_server_node()
            logger.info(f"サーバー名: {server_info.get_browse_name()}")

        except Exception as e:
            logger.error(f"接続エラー: {e}")
            raise

    def read_node(self, node_id: str) -> Dict:
        """OPC UAノードから値を読み取り

        Args:
            node_id: ノードID(例: "ns=2;s=Reactor.R101.Temperature")

        Returns:
            {'value': 85.3, 'quality': 'Good', 'timestamp': '2025-10-26T10:30:15'}
        """
        try:
            node = self.client.get_node(node_id)
            data_value = node.get_data_value()

            return {
                'value': data_value.Value.Value,
                'quality': str(data_value.StatusCode),
                'timestamp': data_value.SourceTimestamp.isoformat(),
                'node_id': node_id
            }
        except Exception as e:
            logger.error(f"ノード読み取りエラー ({node_id}): {e}")
            return None

    def read_multiple_nodes(self, node_ids: List[str]) -> Dict[str, Dict]:
        """複数ノードを一括読み取り(効率的)

        Args:
            node_ids: ノードIDのリスト

        Returns:
            {node_id: {'value': ..., 'quality': ..., 'timestamp': ...}}
        """
        results = {}
        nodes = [self.client.get_node(nid) for nid in node_ids]

        # 一括読み取り(パフォーマンス向上)
        data_values = self.client.get_values(nodes)

        for node_id, data_value in zip(node_ids, data_values):
            results[node_id] = {
                'value': data_value,
                'quality': 'Good',  # 簡略化(実際はStatusCodeを確認)
                'timestamp': time.time()
            }

        return results

    def subscribe_to_changes(self, node_id: str, callback):
        """ノードの変化をサブスクライブ(変更時のみ通知)

        Args:
            node_id: 監視するノードID
            callback: 変更時に呼ばれる関数 callback(node_id, value)
        """
        try:
            # サブスクリプションの作成(100ms間隔)
            subscription = self.client.create_subscription(100, callback)
            node = self.client.get_node(node_id)

            # データ変更の監視開始
            handle = subscription.subscribe_data_change(node)
            self.subscriptions.append((subscription, handle))

            logger.info(f"サブスクリプション開始: {node_id}")

        except Exception as e:
            logger.error(f"サブスクリプションエラー: {e}")

    def disconnect(self):
        """OPC UAサーバーから切断"""
        # すべてのサブスクリプションを削除
        for subscription, handle in self.subscriptions:
            subscription.unsubscribe(handle)
            subscription.delete()

        self.client.disconnect()
        logger.info("OPC UAサーバーから切断")


# 使用例
if __name__ == "__main__":
    # OPC UAクライアントの初期化
    opcua_client = OPCUAReactorClient(server_url="opc.tcp://192.168.1.50:4840")
    opcua_client.connect()

    # 反応器R101の監視ノード
    reactor_nodes = [
        "ns=2;s=Reactor.R101.Temperature.PV",    # 温度測定値
        "ns=2;s=Reactor.R101.Temperature.SV",    # 温度設定値
        "ns=2;s=Reactor.R101.Pressure.PV",       # 圧力測定値
        "ns=2;s=Reactor.R101.Pressure.SV",       # 圧力設定値
        "ns=2;s=Reactor.R101.FlowRate.PV",       # 流量測定値
    ]

    try:
        # 複数ノードを一括読み取り
        data = opcua_client.read_multiple_nodes(reactor_nodes)

        for node_id, values in data.items():
            print(f"{node_id}: {values['value']}")

        # 連続監視(10秒間)
        print("\n連続監視を開始...")
        for _ in range(10):
            data = opcua_client.read_multiple_nodes(reactor_nodes)
            temp_pv = data["ns=2;s=Reactor.R101.Temperature.PV"]['value']
            pres_pv = data["ns=2;s=Reactor.R101.Pressure.PV"]['value']

            print(f"Temperature: {temp_pv:.1f}°C, Pressure: {pres_pv:.2f} MPa")
            time.sleep(1)

    finally:
        opcua_client.disconnect()

# 期待される出力例:
# INFO:__main__:OPC UAサーバーに接続: opc.tcp://192.168.1.50:4840
# ns=2;s=Reactor.R101.Temperature.PV: 85.3
# ns=2;s=Reactor.R101.Temperature.SV: 85.0
# ns=2;s=Reactor.R101.Pressure.PV: 2.45
# ns=2;s=Reactor.R101.Pressure.SV: 2.50
# ns=2;s=Reactor.R101.FlowRate.PV: 120.5
#
# 連続監視を開始...
# Temperature: 85.3°C, Pressure: 2.45 MPa
# Temperature: 85.5°C, Pressure: 2.46 MPa
# Temperature: 85.4°C, Pressure: 2.45 MPa

2.4 実装例3:時系列データベース(InfluxDB)統合

InfluxDBは高性能な時系列データベースで、センサーデータの長期保存とクエリに最適です。

"""
Example 3: InfluxDB時系列データベース統合
センサーデータの効率的な保存と高速クエリ
"""

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime, timedelta
import pandas as pd
from typing import List, Dict


class ReactorDataStore:
    """InfluxDBを使った反応器データの保存・取得

    センサーデータを時系列データベースに保存し、
    Digital Twin用の高速クエリを実現します。
    """

    def __init__(self, url: str, token: str, org: str, bucket: str):
        """
        Args:
            url: InfluxDBのURL(例: "http://localhost:8086")
            token: 認証トークン
            org: 組織名
            bucket: バケット名(データベース名に相当)
        """
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.org = org
        self.bucket = bucket
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()

    def write_sensor_data(self, reactor_id: str, sensor_type: str,
                         value: float, unit: str, quality: str = "Good"):
        """センサーデータを書き込み

        Args:
            reactor_id: 反応器ID(例: "R101")
            sensor_type: センサータイプ(例: "temperature")
            value: 測定値
            unit: 単位(例: "degC")
            quality: データ品質("Good", "Bad", "Uncertain")
        """
        point = (
            Point("reactor_sensor")
            .tag("reactor_id", reactor_id)
            .tag("sensor_type", sensor_type)
            .tag("unit", unit)
            .tag("quality", quality)
            .field("value", value)
            .time(datetime.utcnow())
        )

        self.write_api.write(bucket=self.bucket, record=point)

    def write_batch_data(self, data_points: List[Dict]):
        """複数データポイントをバッチ書き込み(高速化)

        Args:
            data_points: [{'reactor_id': 'R101', 'sensor_type': 'temperature', ...}]
        """
        points = []
        for dp in data_points:
            point = (
                Point("reactor_sensor")
                .tag("reactor_id", dp['reactor_id'])
                .tag("sensor_type", dp['sensor_type'])
                .tag("unit", dp.get('unit', ''))
                .field("value", dp['value'])
                .time(dp.get('timestamp', datetime.utcnow()))
            )
            points.append(point)

        self.write_api.write(bucket=self.bucket, record=points)

    def query_latest_value(self, reactor_id: str, sensor_type: str) -> Dict:
        """最新のセンサー値を取得

        Args:
            reactor_id: 反応器ID
            sensor_type: センサータイプ

        Returns:
            {'time': '2025-10-26T10:30:15', 'value': 85.3, 'unit': 'degC'}
        """
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -1h)
          |> filter(fn: (r) => r._measurement == "reactor_sensor")
          |> filter(fn: (r) => r.reactor_id == "{reactor_id}")
          |> filter(fn: (r) => r.sensor_type == "{sensor_type}")
          |> last()
        '''

        result = self.query_api.query(query=query, org=self.org)

        if result and len(result) > 0 and len(result[0].records) > 0:
            record = result[0].records[0]
            return {
                'time': record.get_time().isoformat(),
                'value': record.get_value(),
                'unit': record.values.get('unit', ''),
                'quality': record.values.get('quality', 'Unknown')
            }
        return None

    def query_time_range(self, reactor_id: str, sensor_type: str,
                        start: datetime, end: datetime) -> pd.DataFrame:
        """指定期間のデータを取得してDataFrameに変換

        Args:
            reactor_id: 反応器ID
            sensor_type: センサータイプ
            start: 開始時刻
            end: 終了時刻

        Returns:
            pd.DataFrame with columns ['time', 'value', 'unit']
        """
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: {start.isoformat()}Z, stop: {end.isoformat()}Z)
          |> filter(fn: (r) => r._measurement == "reactor_sensor")
          |> filter(fn: (r) => r.reactor_id == "{reactor_id}")
          |> filter(fn: (r) => r.sensor_type == "{sensor_type}")
        '''

        result = self.query_api.query_data_frame(query=query, org=self.org)

        if not result.empty:
            # 必要な列のみ抽出
            df = result[['_time', '_value', 'unit']].copy()
            df.columns = ['time', 'value', 'unit']
            return df

        return pd.DataFrame(columns=['time', 'value', 'unit'])

    def query_aggregated_data(self, reactor_id: str, sensor_type: str,
                             window: str = "1m") -> pd.DataFrame:
        """集計データを取得(平均、最大、最小)

        Args:
            reactor_id: 反応器ID
            sensor_type: センサータイプ
            window: 集計ウィンドウ(例: "1m", "5m", "1h")

        Returns:
            pd.DataFrame with columns ['time', 'mean', 'max', 'min']
        """
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -24h)
          |> filter(fn: (r) => r._measurement == "reactor_sensor")
          |> filter(fn: (r) => r.reactor_id == "{reactor_id}")
          |> filter(fn: (r) => r.sensor_type == "{sensor_type}")
          |> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
        '''

        result = self.query_api.query_data_frame(query=query, org=self.org)

        if not result.empty:
            df = result[['_time', '_value']].copy()
            df.columns = ['time', 'mean']
            return df

        return pd.DataFrame(columns=['time', 'mean'])

    def close(self):
        """データベース接続をクローズ"""
        self.client.close()


# 使用例
if __name__ == "__main__":
    # InfluxDBクライアントの初期化
    db = ReactorDataStore(
        url="http://localhost:8086",
        token="your_influxdb_token",
        org="your_org",
        bucket="reactor_data"
    )

    try:
        # 単一データポイントの書き込み
        db.write_sensor_data(
            reactor_id="R101",
            sensor_type="temperature",
            value=85.3,
            unit="degC"
        )

        # バッチ書き込み(効率的)
        batch_data = [
            {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.5, 'unit': 'degC'},
            {'reactor_id': 'R101', 'sensor_type': 'pressure', 'value': 2.45, 'unit': 'MPa'},
            {'reactor_id': 'R101', 'sensor_type': 'flow_rate', 'value': 120.5, 'unit': 'L/min'},
        ]
        db.write_batch_data(batch_data)

        # 最新値の取得
        latest = db.query_latest_value("R101", "temperature")
        print(f"最新温度: {latest['value']} {latest['unit']} at {latest['time']}")

        # 過去1時間のデータ取得
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(hours=1)

        df = db.query_time_range("R101", "temperature", start_time, end_time)
        print(f"\n過去1時間のデータポイント数: {len(df)}")
        print(df.head())

        # 1分間隔の集計データ
        agg_df = db.query_aggregated_data("R101", "temperature", window="1m")
        print(f"\n1分平均データ(直近24時間):")
        print(agg_df.tail())

    finally:
        db.close()

# 期待される出力例:
# 最新温度: 85.5 degC at 2025-10-26T10:30:15.123456
#
# 過去1時間のデータポイント数: 3600
#                       time  value  unit
# 0 2025-10-26 09:30:15.123   85.1  degC
# 1 2025-10-26 09:30:16.234   85.2  degC
# 2 2025-10-26 09:30:17.345   85.3  degC
#
# 1分平均データ(直近24時間):
#                       time   mean
# 1436 2025-10-26 10:26:00  85.2
# 1437 2025-10-26 10:27:00  85.3
# 1438 2025-10-26 10:28:00  85.4
# 1439 2025-10-26 10:29:00  85.5
# 1440 2025-10-26 10:30:00  85.5

2.5 実装例4:リアルタイムデータ検証とクリーニング

センサーデータには必ずノイズや異常値が含まれます。Digital Twinに送る前に品質チェックが必要です。

"""
Example 4: リアルタイムデータ検証とクリーニング
センサーデータの品質チェック、異常値検出、欠損値補完
"""

import numpy as np
import pandas as pd
from typing import Optional, Dict, Tuple
from dataclasses import dataclass
from enum import Enum


class DataQuality(Enum):
    """データ品質レベル"""
    GOOD = "Good"           # 正常データ
    UNCERTAIN = "Uncertain" # 不確実(補正済み)
    BAD = "Bad"            # 異常データ(使用不可)


@dataclass
class SensorReading:
    """センサー読み取り値"""
    reactor_id: str
    sensor_type: str
    value: float
    unit: str
    timestamp: str
    quality: DataQuality = DataQuality.GOOD


class DataValidator:
    """リアルタイムデータ検証・クリーニング

    物理的制約チェック、統計的外れ値検出、
    欠損値補完を実行してデータ品質を保証します。
    """

    def __init__(self):
        # センサーごとの物理的範囲(プロセス知識に基づく)
        self.physical_ranges = {
            'temperature': {'min': -50, 'max': 400, 'unit': 'degC'},
            'pressure': {'min': 0, 'max': 10, 'unit': 'MPa'},
            'flow_rate': {'min': 0, 'max': 500, 'unit': 'L/min'},
            'ph': {'min': 0, 'max': 14, 'unit': ''},
        }

        # 移動平均用の履歴データ(最大100ポイント)
        self.history: Dict[str, list] = {}
        self.max_history = 100

    def validate_physical_range(self, reading: SensorReading) -> Tuple[bool, str]:
        """物理的範囲チェック

        Args:
            reading: センサー読み取り値

        Returns:
            (is_valid, message)
        """
        if reading.sensor_type not in self.physical_ranges:
            return True, "Range not defined"

        range_info = self.physical_ranges[reading.sensor_type]
        value = reading.value

        if value < range_info['min'] or value > range_info['max']:
            msg = (f"{reading.sensor_type}が物理的範囲外: {value} "
                   f"(許容範囲: {range_info['min']}-{range_info['max']} {range_info['unit']})")
            return False, msg

        return True, "OK"

    def detect_outlier_zscore(self, reading: SensorReading,
                             threshold: float = 3.0) -> Tuple[bool, str]:
        """Z-scoreによる統計的外れ値検出

        Args:
            reading: センサー読み取り値
            threshold: Z-scoreのしきい値(デフォルト3.0)

        Returns:
            (is_outlier, message)
        """
        key = f"{reading.reactor_id}_{reading.sensor_type}"

        # 履歴データに追加
        if key not in self.history:
            self.history[key] = []

        self.history[key].append(reading.value)

        # 履歴が少ない場合は判定不可
        if len(self.history[key]) < 10:
            return False, "Insufficient history"

        # 最新100ポイントのみ保持
        if len(self.history[key]) > self.max_history:
            self.history[key] = self.history[key][-self.max_history:]

        # Z-score計算
        values = np.array(self.history[key])
        mean = np.mean(values)
        std = np.std(values)

        if std < 1e-6:  # 標準偏差がゼロに近い場合
            return False, "Constant values"

        z_score = abs((reading.value - mean) / std)

        if z_score > threshold:
            msg = f"外れ値検出: Z-score={z_score:.2f} (threshold={threshold})"
            return True, msg

        return False, f"OK (Z-score={z_score:.2f})"

    def detect_spike(self, reading: SensorReading,
                    max_change_rate: float = 0.2) -> Tuple[bool, str]:
        """急激な変化(スパイク)の検出

        Args:
            reading: センサー読み取り値
            max_change_rate: 許容変化率(例: 0.2 = 20%)

        Returns:
            (is_spike, message)
        """
        key = f"{reading.reactor_id}_{reading.sensor_type}"

        if key not in self.history or len(self.history[key]) == 0:
            return False, "No previous value"

        previous_value = self.history[key][-1]

        # ゼロ除算を避ける
        if abs(previous_value) < 1e-6:
            return False, "Previous value near zero"

        # 変化率を計算
        change_rate = abs((reading.value - previous_value) / previous_value)

        if change_rate > max_change_rate:
            msg = f"急激な変化検出: {change_rate*100:.1f}% (許容: {max_change_rate*100:.1f}%)"
            return True, msg

        return False, f"OK (変化率: {change_rate*100:.1f}%)"

    def interpolate_missing_value(self, reactor_id: str,
                                 sensor_type: str) -> Optional[float]:
        """欠損値を線形補間

        Args:
            reactor_id: 反応器ID
            sensor_type: センサータイプ

        Returns:
            補間値(履歴がない場合はNone)
        """
        key = f"{reactor_id}_{sensor_type}"

        if key not in self.history or len(self.history[key]) < 2:
            return None

        # 最新2点の平均値で補間(簡易的)
        recent_values = self.history[key][-2:]
        interpolated = np.mean(recent_values)

        return interpolated

    def validate(self, reading: SensorReading) -> SensorReading:
        """総合検証を実行してデータ品質を判定

        Args:
            reading: センサー読み取り値

        Returns:
            品質フラグを更新したSensorReading
        """
        # 1. 物理的範囲チェック
        is_valid, msg = self.validate_physical_range(reading)
        if not is_valid:
            print(f"[BAD] {msg}")
            reading.quality = DataQuality.BAD
            return reading

        # 2. 外れ値検出
        is_outlier, msg = self.detect_outlier_zscore(reading)
        if is_outlier:
            print(f"[UNCERTAIN] {msg}")
            reading.quality = DataQuality.UNCERTAIN

        # 3. スパイク検出
        is_spike, msg = self.detect_spike(reading)
        if is_spike:
            print(f"[UNCERTAIN] {msg}")
            reading.quality = DataQuality.UNCERTAIN

        return reading


# 使用例
if __name__ == "__main__":
    validator = DataValidator()

    # 正常データのシミュレーション
    print("=== 正常データ ===")
    for i in range(5):
        reading = SensorReading(
            reactor_id="R101",
            sensor_type="temperature",
            value=85.0 + np.random.normal(0, 0.5),  # 平均85℃、標準偏差0.5℃
            unit="degC",
            timestamp=f"2025-10-26T10:30:{i:02d}"
        )
        validated = validator.validate(reading)
        print(f"Time {i}: {validated.value:.2f}°C - Quality: {validated.quality.value}")

    # 物理的範囲外のデータ
    print("\n=== 物理的範囲外データ ===")
    bad_reading = SensorReading(
        reactor_id="R101",
        sensor_type="temperature",
        value=500.0,  # 範囲外(max 400℃)
        unit="degC",
        timestamp="2025-10-26T10:30:10"
    )
    validated = validator.validate(bad_reading)
    print(f"Value: {validated.value}°C - Quality: {validated.quality.value}")

    # 外れ値(スパイク)
    print("\n=== スパイクデータ ===")
    spike_reading = SensorReading(
        reactor_id="R101",
        sensor_type="temperature",
        value=120.0,  # 急激な上昇
        unit="degC",
        timestamp="2025-10-26T10:30:15"
    )
    validated = validator.validate(spike_reading)
    print(f"Value: {validated.value}°C - Quality: {validated.quality.value}")

    # 欠損値の補間
    print("\n=== 欠損値補間 ===")
    interpolated = validator.interpolate_missing_value("R101", "temperature")
    if interpolated:
        print(f"補間値: {interpolated:.2f}°C")

# 期待される出力例:
# === 正常データ ===
# Time 0: 85.23°C - Quality: Good
# Time 1: 84.87°C - Quality: Good
# Time 2: 85.41°C - Quality: Good
# Time 3: 85.12°C - Quality: Good
# Time 4: 84.95°C - Quality: Good
#
# === 物理的範囲外データ ===
# [BAD] temperatureが物理的範囲外: 500.0 (許容範囲: -50-400 degC)
# Value: 500.0°C - Quality: Bad
#
# === スパイクデータ ===
# [UNCERTAIN] 急激な変化検出: 41.2% (許容: 20.0%)
# Value: 120.0°C - Quality: Uncertain
#
# === 欠損値補間 ===
# 補間値: 85.04°C

2.6 実装例5:データストリーミングパイプライン

Kafka風のストリーミング処理で、複数のセンサーからのデータを統合的に処理します。

"""
Example 5: データストリーミングパイプライン
複数センサーからのデータをリアルタイムに処理・配信
"""

from queue import Queue
from threading import Thread, Event
from typing import Callable, List
import time
from datetime import datetime
import json


class DataStream:
    """リアルタイムデータストリーム

    センサーデータをキューで受け取り、複数の処理を
    パイプライン形式で実行します。
    """

    def __init__(self, name: str, buffer_size: int = 1000):
        """
        Args:
            name: ストリーム名
            buffer_size: バッファサイズ(キューの最大長)
        """
        self.name = name
        self.queue = Queue(maxsize=buffer_size)
        self.processors: List[Callable] = []
        self.is_running = Event()
        self.worker_thread = None

    def add_processor(self, processor: Callable):
        """データ処理関数を追加

        Args:
            processor: データを受け取って処理する関数 processor(data)
        """
        self.processors.append(processor)

    def publish(self, data: dict):
        """データをストリームに送信

        Args:
            data: センサーデータ辞書
        """
        try:
            self.queue.put(data, timeout=1)
        except Exception as e:
            print(f"[{self.name}] キューがフル: {e}")

    def _process_loop(self):
        """データ処理ループ(別スレッドで実行)"""
        while self.is_running.is_set():
            try:
                # キューからデータを取得(1秒タイムアウト)
                data = self.queue.get(timeout=1)

                # すべての処理関数を順次実行
                for processor in self.processors:
                    try:
                        data = processor(data)
                        if data is None:  # 処理でフィルタされた場合
                            break
                    except Exception as e:
                        print(f"[{self.name}] 処理エラー: {e}")
                        break

            except Exception:
                # タイムアウト(データなし)→ 継続
                continue

    def start(self):
        """ストリーム処理を開始"""
        if self.worker_thread and self.worker_thread.is_alive():
            print(f"[{self.name}] 既に実行中です")
            return

        self.is_running.set()
        self.worker_thread = Thread(target=self._process_loop, daemon=True)
        self.worker_thread.start()
        print(f"[{self.name}] ストリーム処理を開始")

    def stop(self):
        """ストリーム処理を停止"""
        self.is_running.clear()
        if self.worker_thread:
            self.worker_thread.join(timeout=5)
        print(f"[{self.name}] ストリーム処理を停止")


class StreamingPipeline:
    """複数ストリームを管理するパイプライン"""

    def __init__(self):
        self.streams = {}

    def create_stream(self, name: str) -> DataStream:
        """新しいストリームを作成

        Args:
            name: ストリーム名

        Returns:
            DataStreamオブジェクト
        """
        stream = DataStream(name)
        self.streams[name] = stream
        return stream

    def start_all(self):
        """すべてのストリームを開始"""
        for stream in self.streams.values():
            stream.start()

    def stop_all(self):
        """すべてのストリームを停止"""
        for stream in self.streams.values():
            stream.stop()


# データ処理関数の例
def enrich_with_metadata(data: dict) -> dict:
    """メタデータを追加"""
    data['processed_at'] = datetime.now().isoformat()
    data['pipeline'] = 'streaming_v1'
    return data


def filter_bad_quality(data: dict) -> dict:
    """品質が悪いデータをフィルタ"""
    if data.get('quality') == 'Bad':
        print(f"不良データをフィルタ: {data}")
        return None  # None = このデータは破棄
    return data


def aggregate_1sec_window(data: dict) -> dict:
    """1秒ウィンドウで集計(簡易版)"""
    # 実際は時間窓で複数データを集計
    data['aggregation'] = '1sec_mean'
    return data


def send_to_digital_twin(data: dict) -> dict:
    """Digital Twinモデルに送信"""
    print(f"Digital Twinに送信: {data['reactor_id']}.{data['sensor_type']} = {data['value']}")
    # 実際はDigital TwinのAPIを呼び出し
    return data


def save_to_database(data: dict) -> dict:
    """データベースに保存"""
    # 実際はInfluxDB等に保存
    print(f"DBに保存: {json.dumps(data, ensure_ascii=False)}")
    return data


# 使用例
if __name__ == "__main__":
    # パイプラインの作成
    pipeline = StreamingPipeline()

    # 温度データストリーム
    temp_stream = pipeline.create_stream("temperature_stream")
    temp_stream.add_processor(enrich_with_metadata)
    temp_stream.add_processor(filter_bad_quality)
    temp_stream.add_processor(aggregate_1sec_window)
    temp_stream.add_processor(send_to_digital_twin)
    temp_stream.add_processor(save_to_database)

    # 圧力データストリーム
    pres_stream = pipeline.create_stream("pressure_stream")
    pres_stream.add_processor(enrich_with_metadata)
    pres_stream.add_processor(filter_bad_quality)
    pres_stream.add_processor(send_to_digital_twin)

    # すべてのストリームを開始
    pipeline.start_all()

    try:
        # センサーデータをシミュレーション
        for i in range(10):
            # 温度データ
            temp_data = {
                'reactor_id': 'R101',
                'sensor_type': 'temperature',
                'value': 85.0 + i * 0.1,
                'unit': 'degC',
                'quality': 'Good' if i < 8 else 'Bad',  # 8番目以降は不良
                'timestamp': datetime.now().isoformat()
            }
            temp_stream.publish(temp_data)

            # 圧力データ
            pres_data = {
                'reactor_id': 'R101',
                'sensor_type': 'pressure',
                'value': 2.45 + i * 0.01,
                'unit': 'MPa',
                'quality': 'Good',
                'timestamp': datetime.now().isoformat()
            }
            pres_stream.publish(pres_data)

            time.sleep(0.5)  # 0.5秒間隔でデータ送信

        # 処理が完了するまで待機
        time.sleep(2)

    finally:
        # すべてのストリームを停止
        pipeline.stop_all()

# 期待される出力例:
# [temperature_stream] ストリーム処理を開始
# [pressure_stream] ストリーム処理を開始
# Digital Twinに送信: R101.temperature = 85.0
# DBに保存: {"reactor_id": "R101", "sensor_type": "temperature", "value": 85.0, ...}
# Digital Twinに送信: R101.pressure = 2.45
# Digital Twinに送信: R101.temperature = 85.1
# DBに保存: {"reactor_id": "R101", "sensor_type": "temperature", "value": 85.1, ...}
# ...
# 不良データをフィルタ: {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.8, 'quality': 'Bad'}
# 不良データをフィルタ: {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.9, 'quality': 'Bad'}
# [temperature_stream] ストリーム処理を停止
# [pressure_stream] ストリーム処理を停止

2.7 実装例6:カルマンフィルタによる状態推定

ノイズを含むセンサーデータから、真の状態を推定するカルマンフィルタの実装です。

"""
Example 6: カルマンフィルタによる状態推定
ノイズを含むセンサーデータから反応器の真の状態を推定
"""

import numpy as np
from dataclasses import dataclass
from typing import Tuple


@dataclass
class KalmanState:
    """カルマンフィルタの状態"""
    x: np.ndarray      # 状態ベクトル [温度, 温度変化率]
    P: np.ndarray      # 誤差共分散行列


class ReactorKalmanFilter:
    """反応器状態推定用カルマンフィルタ

    温度センサーのノイズを除去し、真の温度と
    温度変化率を推定します。
    """

    def __init__(self,
                 initial_temp: float,
                 process_noise: float = 0.01,
                 measurement_noise: float = 0.5):
        """
        Args:
            initial_temp: 初期温度 [℃]
            process_noise: プロセスノイズ(システムの不確実性)
            measurement_noise: 測定ノイズ(センサーノイズ)
        """
        # 状態ベクトル: [温度, 温度変化率]
        self.state = KalmanState(
            x=np.array([initial_temp, 0.0]),  # [℃, ℃/s]
            P=np.eye(2) * 1.0                  # 初期誤差共分散
        )

        # システムモデル: x(k+1) = F * x(k) + w
        # dt = 1秒と仮定
        dt = 1.0
        self.F = np.array([
            [1.0, dt],   # 温度(k+1) = 温度(k) + 変化率(k) * dt
            [0.0, 1.0]   # 変化率(k+1) = 変化率(k) (一定と仮定)
        ])

        # 観測モデル: z(k) = H * x(k) + v
        # 温度のみ観測可能
        self.H = np.array([[1.0, 0.0]])

        # プロセスノイズ共分散
        self.Q = np.array([
            [process_noise, 0.0],
            [0.0, process_noise]
        ])

        # 測定ノイズ共分散
        self.R = np.array([[measurement_noise]])

    def predict(self) -> Tuple[float, float]:
        """予測ステップ(システムモデルに基づく状態予測)

        Returns:
            (predicted_temp, predicted_rate) 予測温度と変化率
        """
        # 状態予測: x_pred = F * x
        x_pred = self.F @ self.state.x

        # 誤差共分散予測: P_pred = F * P * F^T + Q
        P_pred = self.F @ self.state.P @ self.F.T + self.Q

        # 予測状態を更新
        self.state.x = x_pred
        self.state.P = P_pred

        return self.state.x[0], self.state.x[1]

    def update(self, measurement: float) -> Tuple[float, float]:
        """更新ステップ(測定値で状態を補正)

        Args:
            measurement: センサー測定値(温度)[℃]

        Returns:
            (estimated_temp, estimated_rate) 推定温度と変化率
        """
        # イノベーション(予測と測定の差): y = z - H * x_pred
        z = np.array([measurement])
        y = z - self.H @ self.state.x

        # イノベーション共分散: S = H * P_pred * H^T + R
        S = self.H @ self.state.P @ self.H.T + self.R

        # カルマンゲイン: K = P_pred * H^T * S^-1
        K = self.state.P @ self.H.T @ np.linalg.inv(S)

        # 状態更新: x = x_pred + K * y
        self.state.x = self.state.x + K @ y

        # 誤差共分散更新: P = (I - K * H) * P_pred
        I = np.eye(2)
        self.state.P = (I - K @ self.H) @ self.state.P

        return self.state.x[0], self.state.x[1]

    def estimate(self, measurement: float) -> Tuple[float, float]:
        """予測→更新の1サイクルを実行

        Args:
            measurement: センサー測定値 [℃]

        Returns:
            (estimated_temp, estimated_rate) 推定温度と変化率
        """
        # 予測
        self.predict()

        # 更新
        return self.update(measurement)


# 使用例
if __name__ == "__main__":
    # カルマンフィルタの初期化
    kf = ReactorKalmanFilter(
        initial_temp=85.0,
        process_noise=0.01,      # プロセスは安定
        measurement_noise=0.5    # センサーノイズは中程度
    )

    # 真の温度プロファイル(ランプ加熱: 85℃ → 95℃)
    time_steps = 50
    true_temps = np.linspace(85.0, 95.0, time_steps)

    # センサー測定値(ノイズあり)
    np.random.seed(42)
    noise = np.random.normal(0, 0.5, time_steps)  # 標準偏差0.5℃
    measured_temps = true_temps + noise

    print("Time\tTrue\tMeasured\tEstimated\tRate")
    print("-" * 60)

    estimated_temps = []
    estimated_rates = []

    for t in range(time_steps):
        # カルマンフィルタで推定
        est_temp, est_rate = kf.estimate(measured_temps[t])
        estimated_temps.append(est_temp)
        estimated_rates.append(est_rate)

        if t % 5 == 0:  # 5秒ごとに表示
            print(f"{t}\t{true_temps[t]:.2f}\t{measured_temps[t]:.2f}\t\t{est_temp:.2f}\t\t{est_rate:.3f}")

    # 精度評価
    mse_raw = np.mean((measured_temps - true_temps) ** 2)
    mse_filtered = np.mean((np.array(estimated_temps) - true_temps) ** 2)

    print(f"\n=== 精度評価 ===")
    print(f"生データのMSE: {mse_raw:.4f}")
    print(f"フィルタ後のMSE: {mse_filtered:.4f}")
    print(f"精度改善率: {(1 - mse_filtered/mse_raw) * 100:.1f}%")

# 期待される出力例:
# Time    True    Measured        Estimated       Rate
# ------------------------------------------------------------
# 0       85.00   85.25           85.13           0.051
# 5       86.02   85.78           85.96           0.192
# 10      87.04   86.92           87.01           0.204
# 15      88.06   88.31           88.08           0.206
# 20      89.08   89.23           89.10           0.204
# 25      90.10   90.35           90.13           0.203
# 30      91.12   90.98           91.14           0.204
# 35      92.14   92.45           92.17           0.203
# 40      93.16   93.08           93.18           0.204
# 45      94.18   94.52           94.21           0.203
#
# === 精度評価 ===
# 生データのMSE: 0.2435
# フィルタ後のMSE: 0.0089
# 精度改善率: 96.3%

2.8 実装例7:Digital Twin同期ループ

すべての要素を統合し、Digital Twinとのリアルタイム同期を実現する完全な実装です。

"""
Example 7: Digital Twin同期ループ
センサーデータ取得→検証→状態推定→モデル更新の完全サイクル
"""

import time
from datetime import datetime
from typing import Dict, Optional
import numpy as np


class DigitalTwinModel:
    """Digital Twinの簡易物理モデル

    反応器の熱バランスモデルで、温度・圧力・流量から
    反応進行度を計算します。
    """

    def __init__(self, reactor_id: str):
        self.reactor_id = reactor_id

        # モデルパラメータ
        self.heat_capacity = 4200  # J/(kg·K)
        self.mass = 1000           # kg
        self.heat_transfer_coeff = 500  # W/(m²·K)
        self.area = 10             # m²

        # 状態変数
        self.state = {
            'temperature': 85.0,    # ℃
            'pressure': 2.5,        # MPa
            'flow_rate': 120.0,     # L/min
            'conversion': 0.0,      # 反応進行度 (0-1)
            'last_update': datetime.now()
        }

    def update_from_sensors(self, sensor_data: Dict[str, float]):
        """センサーデータでモデルを更新

        Args:
            sensor_data: {'temperature': 85.3, 'pressure': 2.45, 'flow_rate': 120.5}
        """
        # センサー値でモデル状態を更新
        if 'temperature' in sensor_data:
            self.state['temperature'] = sensor_data['temperature']

        if 'pressure' in sensor_data:
            self.state['pressure'] = sensor_data['pressure']

        if 'flow_rate' in sensor_data:
            self.state['flow_rate'] = sensor_data['flow_rate']

        self.state['last_update'] = datetime.now()

    def calculate_reaction_conversion(self) -> float:
        """反応進行度を計算(簡易モデル)

        Arrhenius式に基づく反応速度の積分

        Returns:
            反応進行度 (0-1)
        """
        T = self.state['temperature'] + 273.15  # K
        P = self.state['pressure'] * 1e6        # Pa

        # アレニウスパラメータ(仮想値)
        A = 1e10      # 頻度因子
        Ea = 80000    # 活性化エネルギー [J/mol]
        R = 8.314     # 気体定数 [J/(mol·K)]

        # 反応速度定数
        k = A * np.exp(-Ea / (R * T))

        # 圧力補正(簡易的)
        k_eff = k * (P / 2.5e6) ** 0.5

        # 反応進行度(時間積分の近似)
        dt = 1.0  # 1秒
        dX = k_eff * (1 - self.state['conversion']) * dt

        new_conversion = min(self.state['conversion'] + dX, 1.0)
        self.state['conversion'] = new_conversion

        return new_conversion

    def predict_next_state(self, time_horizon: float = 60) -> Dict:
        """将来の状態を予測

        Args:
            time_horizon: 予測時間 [秒]

        Returns:
            予測状態辞書
        """
        # 現在の反応速度から将来の変換率を予測
        predicted_conversion = min(
            self.state['conversion'] + 0.01 * time_horizon,
            1.0
        )

        # 発熱反応の場合、温度上昇を予測
        heat_released = predicted_conversion * 100000  # J (仮想値)
        temp_rise = heat_released / (self.mass * self.heat_capacity)
        predicted_temp = self.state['temperature'] + temp_rise

        return {
            'temperature': predicted_temp,
            'conversion': predicted_conversion,
            'time_horizon': time_horizon
        }

    def get_state(self) -> Dict:
        """現在の状態を取得"""
        return self.state.copy()


class DigitalTwinSyncLoop:
    """Digital Twin同期ループ

    センサーデータ取得→検証→状態推定→モデル更新を
    リアルタイムに実行します。
    """

    def __init__(self, reactor_id: str, update_interval: float = 1.0):
        """
        Args:
            reactor_id: 反応器ID
            update_interval: 更新間隔 [秒]
        """
        self.reactor_id = reactor_id
        self.update_interval = update_interval

        # Digital Twinモデル
        self.twin_model = DigitalTwinModel(reactor_id)

        # カルマンフィルタ(温度用)
        self.temp_filter = ReactorKalmanFilter(
            initial_temp=85.0,
            process_noise=0.01,
            measurement_noise=0.5
        )

        # データバリデータ
        self.validator = DataValidator()

        self.is_running = False
        self.cycle_count = 0

    def fetch_sensor_data(self) -> Dict[str, float]:
        """センサーデータを取得(実際はMQTT/OPC UAから)

        Returns:
            {'temperature': 85.3, 'pressure': 2.45, 'flow_rate': 120.5}
        """
        # ここでは模擬データを生成
        # 実際はMQTTクライアントやOPC UAクライアントから取得

        base_temp = 85.0 + self.cycle_count * 0.1  # ゆっくり昇温
        noise = np.random.normal(0, 0.5)

        return {
            'temperature': base_temp + noise,
            'pressure': 2.5 + np.random.normal(0, 0.02),
            'flow_rate': 120.0 + np.random.normal(0, 1.0)
        }

    def run_single_cycle(self):
        """同期ループの1サイクルを実行"""
        cycle_start = time.time()

        # 1. センサーデータ取得
        raw_sensor_data = self.fetch_sensor_data()

        # 2. データ検証
        temp_reading = SensorReading(
            reactor_id=self.reactor_id,
            sensor_type="temperature",
            value=raw_sensor_data['temperature'],
            unit="degC",
            timestamp=datetime.now().isoformat()
        )
        validated_reading = self.validator.validate(temp_reading)

        # 3. カルマンフィルタで状態推定
        if validated_reading.quality != DataQuality.BAD:
            est_temp, est_rate = self.temp_filter.estimate(validated_reading.value)
            filtered_data = {
                'temperature': est_temp,
                'pressure': raw_sensor_data['pressure'],
                'flow_rate': raw_sensor_data['flow_rate']
            }
        else:
            # 不良データの場合は前回値を使用
            filtered_data = None

        # 4. Digital Twinモデル更新
        if filtered_data:
            self.twin_model.update_from_sensors(filtered_data)

        # 5. 反応進行度計算
        conversion = self.twin_model.calculate_reaction_conversion()

        # 6. 将来予測
        prediction = self.twin_model.predict_next_state(time_horizon=60)

        # 7. 結果表示
        state = self.twin_model.get_state()
        cycle_time = (time.time() - cycle_start) * 1000  # ms

        print(f"[Cycle {self.cycle_count}] "
              f"Temp: {state['temperature']:.2f}°C "
              f"(filtered: {est_temp:.2f}°C), "
              f"Pressure: {state['pressure']:.2f} MPa, "
              f"Conversion: {conversion*100:.1f}%, "
              f"Prediction(60s): {prediction['conversion']*100:.1f}% "
              f"({cycle_time:.1f}ms)")

        self.cycle_count += 1

    def run(self, duration: Optional[float] = None):
        """同期ループを開始

        Args:
            duration: 実行時間 [秒] (Noneの場合は無限ループ)
        """
        self.is_running = True
        start_time = time.time()

        print(f"=== Digital Twin同期ループ開始 ===")
        print(f"Reactor ID: {self.reactor_id}")
        print(f"Update Interval: {self.update_interval}s")
        print("-" * 80)

        try:
            while self.is_running:
                # 1サイクル実行
                self.run_single_cycle()

                # 終了条件チェック
                if duration and (time.time() - start_time) >= duration:
                    break

                # 次の更新まで待機
                time.sleep(self.update_interval)

        except KeyboardInterrupt:
            print("\n\nキーボード割り込みで停止")

        finally:
            self.is_running = False
            print(f"\n=== 同期ループ終了 ===")
            print(f"総サイクル数: {self.cycle_count}")
            print(f"実行時間: {time.time() - start_time:.1f}秒")


# 使用例
if __name__ == "__main__":
    # Digital Twin同期ループの作成
    sync_loop = DigitalTwinSyncLoop(
        reactor_id="R101",
        update_interval=1.0  # 1秒ごとに更新
    )

    # 30秒間実行
    sync_loop.run(duration=30)

# 期待される出力例:
# === Digital Twin同期ループ開始 ===
# Reactor ID: R101
# Update Interval: 1.0s
# --------------------------------------------------------------------------------
# [Cycle 0] Temp: 85.23°C (filtered: 85.13°C), Pressure: 2.51 MPa, Conversion: 0.2%, Prediction(60s): 0.8% (2.3ms)
# [Cycle 1] Temp: 85.34°C (filtered: 85.28°C), Pressure: 2.49 MPa, Conversion: 0.4%, Prediction(60s): 1.0% (1.8ms)
# [Cycle 2] Temp: 85.41°C (filtered: 85.39°C), Pressure: 2.52 MPa, Conversion: 0.6%, Prediction(60s): 1.2% (1.7ms)
# ...
# [Cycle 29] Temp: 88.12°C (filtered: 88.09°C), Pressure: 2.48 MPa, Conversion: 6.2%, Prediction(60s): 6.8% (1.9ms)
#
# === 同期ループ終了 ===
# 総サイクル数: 30
# 実行時間: 30.1秒
💡 実装のポイント

学習目標の確認

この章を完了すると、以下を実装できるようになります:

基本理解

実践スキル

応用力

演習問題

Easy(基礎確認)

Q1: MQTTとOPC UAの主な違いは何ですか?最も適切なものを選んでください。

a) MQTTは産業用、OPC UAはIoT用
b) MQTTは軽量でIoT向き、OPC UAは産業オートメーション標準
c) MQTTは有料、OPC UAは無料
d) MQTTは古い規格、OPC UAは新しい規格

解答を見る

正解: b) MQTTは軽量でIoT向き、OPC UAは産業オートメーション標準

解説:

  • MQTT: 軽量なメッセージングプロトコル、IoTデバイスに最適、Publish/Subscribe型
  • OPC UA: 産業オートメーション標準、PLC/DCS接続、クライアント/サーバー型
  • どちらもオープン規格で無料で使用可能

Medium(応用)

Q2: センサーデータのZ-scoreが3.5の場合、このデータは外れ値として扱うべきですか?Z-scoreのしきい値を3.0とした場合の判断と、その理由を説明してください。

解答を見る

正解: はい、外れ値として扱うべきです。

理由:

  • Z-score = 3.5はしきい値3.0を超えている
  • 正規分布の場合、Z-score > 3.0のデータは全体の0.3%以下(非常に稀)
  • センサーノイズやスパイクの可能性が高い

実務的な対応:

  • データ品質を"Uncertain"としてフラグ付け
  • カルマンフィルタや移動平均で補正
  • 完全に除外せず、履歴として保存(後から原因分析)

Hard(発展)

Q3: 1000個のセンサーが1秒間隔でデータを送信する化学プラントで、Digital Twin同期ループの更新間隔を1秒に設定しています。各サイクルの処理時間が平均800msの場合、システムは安定動作しますか?もし問題があれば、3つの改善策を提案してください。

解答を見る

正解: 不安定になる可能性が高い(処理時間 800ms ≈ 更新間隔 1000msでマージンが不十分)

問題点:

  • 処理時間の変動(最悪ケース)で1000msを超える可能性
  • データ遅延が蓄積してバッファオーバーフロー
  • リアルタイム性の喪失

改善策:

  1. 並列処理の導入
    • センサー群をグループ化して並列処理(マルチスレッド/マルチプロセス)
    • 期待効果: 処理時間を1/4に短縮(4コア使用時)
  2. バッチ処理の最適化
    • センサーデータを一括読み取り(OPC UAのread_multiple_nodes使用)
    • データベース書き込みもバッチ化
    • 期待効果: 通信オーバーヘッドを90%削減
  3. 更新頻度の階層化
    • 重要センサー(温度、圧力): 1秒間隔
    • 補助センサー(流量、液位): 5秒間隔
    • 分析計: 60秒間隔
    • 期待効果: 処理負荷を60%削減

実装例(並列処理):

from concurrent.futures import ThreadPoolExecutor

def process_sensor_group(sensor_ids):
    # センサーグループの処理
    pass

# 1000センサーを4グループに分割
groups = [sensors[i:i+250] for i in range(0, 1000, 250)]

with ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(process_sensor_group, groups)
# 処理時間: 800ms → 200ms

次のステップ

リアルタイムデータ連携の基盤ができたら、次はDigital Twinのモデリング手法を学びます。物理モデル(第一原理)とデータ駆動モデル(機械学習)を組み合わせたハイブリッドモデリングにより、高精度な予測と制御を実現します。

参考文献

  1. Eclipse Paho™ MQTT Python Client. (2024). paho-mqtt Documentation. https://www.eclipse.org/paho/
  2. OPC Foundation. (2023). OPC Unified Architecture Specification Part 1: Overview and Concepts. https://opcfoundation.org/
  3. InfluxData. (2024). InfluxDB Python Client Library. https://docs.influxdata.com/influxdb/
  4. Welch, G., & Bishop, G. (2006). An Introduction to the Kalman Filter. UNC-Chapel Hill, TR 95-041.
  5. Kritzinger, W., et al. (2018). "Digital Twin in manufacturing: A categorical literature review and classification." IFAC-PapersOnLine, 51(11), 1016-1022.