🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 2: Real-time Data Integration

Implementation technology for synchronizing physical systems and digital space through Digital Twin and IoT sensor integration

📚 Digital Twin Introduction Series ⏱️ Reading time: 35 minutes 💡 Difficulty: Intermediate

2.1 Importance of Real-time Data Integration

The accuracy and practicality of Digital Twin heavily depends on the synchronization accuracy between the physical system and the digital model. In this chapter, we will learn seven practical methods for integrating real-time data from IoT sensors into Digital Twin.

2.1.1 Challenges in Data Integration

Real-time data integration in the process industry requires addressing the following challenges:

💡 Industrial Example
In chemical plants, over 1000 sensors transmit data at 1-second intervals. 86.4M (86.4 million) data points are generated per day, which need to be processed in real-time and reflected in Digital Twin.

2.2 Implementation Example 1: IoT Sensor Data Acquisition via MQTT Communication

MQTT is a lightweight and reliable messaging protocol widely used for communication with IoT devices.

"""
Example 1: Real-time sensor data acquisition via MQTT communication
Receive data via MQTT from temperature and pressure sensors of chemical reactor
"""

import paho.mqtt.client as mqtt
import json
from datetime import datetime
from typing import Dict, Callable
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class MQTTSensorClient:
    """Client for communicating with IoT sensors via MQTT protocol

    Acquires real-time data from chemical plant reactor sensors
    (temperature, pressure, flow rate) and sends to Digital Twin.
    """

    def __init__(self, broker_address: str, port: int = 1883):
        """
        Args:
            broker_address: IP address/hostname of MQTT broker
            port: MQTT port number (default 1883)
        """
        self.client = mqtt.Client(client_id="digital_twin_client")
        self.broker_address = broker_address
        self.port = port
        self.data_callbacks: Dict[str, Callable] = {}

        # Register callback functions
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect

    def _on_connect(self, client, userdata, flags, rc):
        """Callback on connection establishment"""
        if rc == 0:
            logger.info(f"Successfully connected to MQTT broker: {self.broker_address}")
            # Subscribe to all reactor sensor topics
            self.client.subscribe("reactor/+/sensor/#")
        else:
            logger.error(f"Connection failed: return code {rc}")

    def _on_message(self, client, userdata, msg):
        """Callback on message reception"""
        try:
            # Parse JSON payload
            payload = json.loads(msg.payload.decode())
            topic = msg.topic

            # Add timestamp (if not included in sensor data)
            if 'timestamp' not in payload:
                payload['timestamp'] = datetime.now().isoformat()

            logger.info(f"Received: {topic} - {payload}")

            # Execute registered callback functions
            for callback in self.data_callbacks.values():
                callback(topic, payload)

        except json.JSONDecodeError as e:
            logger.error(f"JSON parse error: {e}")
        except Exception as e:
            logger.error(f"Message processing error: {e}")

    def _on_disconnect(self, client, userdata, rc):
        """Callback on disconnection"""
        if rc != 0:
            logger.warning(f"Unexpected disconnection: return code {rc}")
            logger.info("Attempting to reconnect...")

    def register_callback(self, name: str, callback: Callable):
        """Register callback function for data reception

        Args:
            name: Callback identifier name
            callback: Function to execute callback(topic: str, payload: dict)
        """
        self.data_callbacks[name] = callback

    def connect(self):
        """Connect to MQTT broker"""
        self.client.connect(self.broker_address, self.port, 60)
        self.client.loop_start()

    def disconnect(self):
        """Disconnect from MQTT broker"""
        self.client.loop_stop()
        self.client.disconnect()


# Usage example
def process_sensor_data(topic: str, payload: dict):
    """Send received sensor data to Digital Twin"""
    reactor_id = topic.split('/')[1]
    sensor_type = topic.split('/')[-1]

    print(f"Reactor {reactor_id} - {sensor_type}: {payload['value']} {payload['unit']}")

    # Reflect in Digital Twin model (described later)
    # digital_twin.update_sensor_value(reactor_id, sensor_type, payload)


if __name__ == "__main__":
    # Initialize MQTT client
    mqtt_client = MQTTSensorClient(broker_address="192.168.1.100")

    # Register data processing callback
    mqtt_client.register_callback("process_data", process_sensor_data)

    # Start connection
    mqtt_client.connect()

    try:
        # Continuously receive data (exit with Ctrl+C)
        import time
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        mqtt_client.disconnect()
        print("\nConnection terminated")

# Expected output example:
# INFO:__main__:Successfully connected to MQTT broker: 192.168.1.100
# INFO:__main__:Received: reactor/R101/sensor/temperature - {'value': 85.3, 'unit': 'degC', 'timestamp': '2025-10-26T10:30:15'}
# Reactor R101 - temperature: 85.3 degC
# INFO:__main__:Received: reactor/R101/sensor/pressure - {'value': 2.45, 'unit': 'MPa', 'timestamp': '2025-10-26T10:30:15'}
# Reactor R101 - pressure: 2.45 MPa

2.3 Implementation Example 2: Industrial Equipment Data Acquisition via OPC UA Integration

OPC UA (OPC Unified Architecture) is a standard communication protocol in the industrial automation field, used for connection with PLC and DCS.

"""
Example 2: Industrial equipment data acquisition via OPC UA communication
Acquire control parameters and measured values from PLC for reactor
"""

from opcua import Client, ua
from typing import List, Dict
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class OPCUAReactorClient:
    """Client for acquiring reactor data via OPC UA

    Acquires set values (SV) and process values (PV) in real-time
    from DCS (Distributed Control System) or PLC.
    """

    def __init__(self, server_url: str):
        """
        Args:
            server_url: OPC UA server URL (e.g., "opc.tcp://192.168.1.50:4840")
        """
        self.client = Client(server_url)
        self.server_url = server_url
        self.subscriptions = []

    def connect(self):
        """Connect to OPC UA server"""
        try:
            self.client.connect()
            logger.info(f"Connected to OPC UA server: {self.server_url}")

            # Retrieve server information
            server_info = self.client.get_server_node()
            logger.info(f"Server name: {server_info.get_browse_name()}")

        except Exception as e:
            logger.error(f"Connection error: {e}")
            raise

    def read_node(self, node_id: str) -> Dict:
        """Read value from OPC UA node

        Args:
            node_id: Node ID (e.g., "ns=2;s=Reactor.R101.Temperature")

        Returns:
            {'value': 85.3, 'quality': 'Good', 'timestamp': '2025-10-26T10:30:15'}
        """
        try:
            node = self.client.get_node(node_id)
            data_value = node.get_data_value()

            return {
                'value': data_value.Value.Value,
                'quality': str(data_value.StatusCode),
                'timestamp': data_value.SourceTimestamp.isoformat(),
                'node_id': node_id
            }
        except Exception as e:
            logger.error(f"Node read error ({node_id}): {e}")
            return None

    def read_multiple_nodes(self, node_ids: List[str]) -> Dict[str, Dict]:
        """Batch read multiple nodes (efficient)

        Args:
            node_ids: List of node IDs

        Returns:
            {node_id: {'value': ..., 'quality': ..., 'timestamp': ...}}
        """
        results = {}
        nodes = [self.client.get_node(nid) for nid in node_ids]

        # Batch read (performance improvement)
        data_values = self.client.get_values(nodes)

        for node_id, data_value in zip(node_ids, data_values):
            results[node_id] = {
                'value': data_value,
                'quality': 'Good',  # Simplified (actually check StatusCode)
                'timestamp': time.time()
            }

        return results

    def subscribe_to_changes(self, node_id: str, callback):
        """Subscribe to node changes (notification only on change)

        Args:
            node_id: Node ID to monitor
            callback: Function called on change callback(node_id, value)
        """
        try:
            # Create subscription (100ms interval)
            subscription = self.client.create_subscription(100, callback)
            node = self.client.get_node(node_id)

            # Start monitoring data changes
            handle = subscription.subscribe_data_change(node)
            self.subscriptions.append((subscription, handle))

            logger.info(f"Subscription started: {node_id}")

        except Exception as e:
            logger.error(f"Subscription error: {e}")

    def disconnect(self):
        """Disconnect from OPC UA server"""
        # Remove all subscriptions
        for subscription, handle in self.subscriptions:
            subscription.unsubscribe(handle)
            subscription.delete()

        self.client.disconnect()
        logger.info("Disconnected from OPC UA server")


# Usage example
if __name__ == "__main__":
    # Initialize OPC UA client
    opcua_client = OPCUAReactorClient(server_url="opc.tcp://192.168.1.50:4840")
    opcua_client.connect()

    # Monitoring nodes for reactor R101
    reactor_nodes = [
        "ns=2;s=Reactor.R101.Temperature.PV",    # Temperature process value
        "ns=2;s=Reactor.R101.Temperature.SV",    # Temperature set value
        "ns=2;s=Reactor.R101.Pressure.PV",       # Pressure process value
        "ns=2;s=Reactor.R101.Pressure.SV",       # Pressure set value
        "ns=2;s=Reactor.R101.FlowRate.PV",       # Flow rate process value
    ]

    try:
        # Batch read multiple nodes
        data = opcua_client.read_multiple_nodes(reactor_nodes)

        for node_id, values in data.items():
            print(f"{node_id}: {values['value']}")

        # Continuous monitoring (10 seconds)
        print("\nStarting continuous monitoring...")
        for _ in range(10):
            data = opcua_client.read_multiple_nodes(reactor_nodes)
            temp_pv = data["ns=2;s=Reactor.R101.Temperature.PV"]['value']
            pres_pv = data["ns=2;s=Reactor.R101.Pressure.PV"]['value']

            print(f"Temperature: {temp_pv:.1f}°C, Pressure: {pres_pv:.2f} MPa")
            time.sleep(1)

    finally:
        opcua_client.disconnect()

# Expected output example:
# INFO:__main__:Connected to OPC UA server: opc.tcp://192.168.1.50:4840
# ns=2;s=Reactor.R101.Temperature.PV: 85.3
# ns=2;s=Reactor.R101.Temperature.SV: 85.0
# ns=2;s=Reactor.R101.Pressure.PV: 2.45
# ns=2;s=Reactor.R101.Pressure.SV: 2.50
# ns=2;s=Reactor.R101.FlowRate.PV: 120.5
#
# Starting continuous monitoring...
# Temperature: 85.3°C, Pressure: 2.45 MPa
# Temperature: 85.5°C, Pressure: 2.46 MPa
# Temperature: 85.4°C, Pressure: 2.45 MPa

2.4 Implementation Example 3: Time Series Database (InfluxDB) Integration

InfluxDB is a high-performance time series database optimal for long-term storage and querying of sensor data.

"""
Example 3: InfluxDB time series database integration
Efficient storage and high-speed queries for sensor data
"""

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime, timedelta
import pandas as pd
from typing import List, Dict


class ReactorDataStore:
    """Storage and retrieval of reactor data using InfluxDB

    Stores sensor data in time series database and realizes
    high-speed queries for Digital Twin.
    """

    def __init__(self, url: str, token: str, org: str, bucket: str):
        """
        Args:
            url: InfluxDB URL (e.g., "http://localhost:8086")
            token: Authentication token
            org: Organization name
            bucket: Bucket name (equivalent to database name)
        """
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.org = org
        self.bucket = bucket
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()

    def write_sensor_data(self, reactor_id: str, sensor_type: str,
                         value: float, unit: str, quality: str = "Good"):
        """Write sensor data

        Args:
            reactor_id: Reactor ID (e.g., "R101")
            sensor_type: Sensor type (e.g., "temperature")
            value: Measured value
            unit: Unit (e.g., "degC")
            quality: Data quality ("Good", "Bad", "Uncertain")
        """
        point = (
            Point("reactor_sensor")
            .tag("reactor_id", reactor_id)
            .tag("sensor_type", sensor_type)
            .tag("unit", unit)
            .tag("quality", quality)
            .field("value", value)
            .time(datetime.utcnow())
        )

        self.write_api.write(bucket=self.bucket, record=point)

    def write_batch_data(self, data_points: List[Dict]):
        """Batch write multiple data points (faster)

        Args:
            data_points: [{'reactor_id': 'R101', 'sensor_type': 'temperature', ...}]
        """
        points = []
        for dp in data_points:
            point = (
                Point("reactor_sensor")
                .tag("reactor_id", dp['reactor_id'])
                .tag("sensor_type", dp['sensor_type'])
                .tag("unit", dp.get('unit', ''))
                .field("value", dp['value'])
                .time(dp.get('timestamp', datetime.utcnow()))
            )
            points.append(point)

        self.write_api.write(bucket=self.bucket, record=points)

    def query_latest_value(self, reactor_id: str, sensor_type: str) -> Dict:
        """Retrieve latest sensor value

        Args:
            reactor_id: Reactor ID
            sensor_type: Sensor type

        Returns:
            {'time': '2025-10-26T10:30:15', 'value': 85.3, 'unit': 'degC'}
        """
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -1h)
          |> filter(fn: (r) => r._measurement == "reactor_sensor")
          |> filter(fn: (r) => r.reactor_id == "{reactor_id}")
          |> filter(fn: (r) => r.sensor_type == "{sensor_type}")
          |> last()
        '''

        result = self.query_api.query(query=query, org=self.org)

        if result and len(result) > 0 and len(result[0].records) > 0:
            record = result[0].records[0]
            return {
                'time': record.get_time().isoformat(),
                'value': record.get_value(),
                'unit': record.values.get('unit', ''),
                'quality': record.values.get('quality', 'Unknown')
            }
        return None

    def query_time_range(self, reactor_id: str, sensor_type: str,
                        start: datetime, end: datetime) -> pd.DataFrame:
        """Retrieve data for specified period and convert to DataFrame

        Args:
            reactor_id: Reactor ID
            sensor_type: Sensor type
            start: Start time
            end: End time

        Returns:
            pd.DataFrame with columns ['time', 'value', 'unit']
        """
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: {start.isoformat()}Z, stop: {end.isoformat()}Z)
          |> filter(fn: (r) => r._measurement == "reactor_sensor")
          |> filter(fn: (r) => r.reactor_id == "{reactor_id}")
          |> filter(fn: (r) => r.sensor_type == "{sensor_type}")
        '''

        result = self.query_api.query_data_frame(query=query, org=self.org)

        if not result.empty:
            # Extract only necessary columns
            df = result[['_time', '_value', 'unit']].copy()
            df.columns = ['time', 'value', 'unit']
            return df

        return pd.DataFrame(columns=['time', 'value', 'unit'])

    def query_aggregated_data(self, reactor_id: str, sensor_type: str,
                             window: str = "1m") -> pd.DataFrame:
        """Retrieve aggregated data (mean, max, min)

        Args:
            reactor_id: Reactor ID
            sensor_type: Sensor type
            window: Aggregation window (e.g., "1m", "5m", "1h")

        Returns:
            pd.DataFrame with columns ['time', 'mean', 'max', 'min']
        """
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -24h)
          |> filter(fn: (r) => r._measurement == "reactor_sensor")
          |> filter(fn: (r) => r.reactor_id == "{reactor_id}")
          |> filter(fn: (r) => r.sensor_type == "{sensor_type}")
          |> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
        '''

        result = self.query_api.query_data_frame(query=query, org=self.org)

        if not result.empty:
            df = result[['_time', '_value']].copy()
            df.columns = ['time', 'mean']
            return df

        return pd.DataFrame(columns=['time', 'mean'])

    def close(self):
        """Close database connection"""
        self.client.close()


# Usage example
if __name__ == "__main__":
    # Initialize InfluxDB client
    db = ReactorDataStore(
        url="http://localhost:8086",
        token="your_influxdb_token",
        org="your_org",
        bucket="reactor_data"
    )

    try:
        # Write single data point
        db.write_sensor_data(
            reactor_id="R101",
            sensor_type="temperature",
            value=85.3,
            unit="degC"
        )

        # Batch write (efficient)
        batch_data = [
            {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.5, 'unit': 'degC'},
            {'reactor_id': 'R101', 'sensor_type': 'pressure', 'value': 2.45, 'unit': 'MPa'},
            {'reactor_id': 'R101', 'sensor_type': 'flow_rate', 'value': 120.5, 'unit': 'L/min'},
        ]
        db.write_batch_data(batch_data)

        # Retrieve latest value
        latest = db.query_latest_value("R101", "temperature")
        print(f"Latest temperature: {latest['value']} {latest['unit']} at {latest['time']}")

        # Retrieve past 1 hour data
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(hours=1)

        df = db.query_time_range("R101", "temperature", start_time, end_time)
        print(f"\nNumber of data points in past 1 hour: {len(df)}")
        print(df.head())

        # Aggregated data at 1-minute intervals
        agg_df = db.query_aggregated_data("R101", "temperature", window="1m")
        print(f"\n1-minute average data (last 24 hours):")
        print(agg_df.tail())

    finally:
        db.close()

# Expected output example:
# Latest temperature: 85.5 degC at 2025-10-26T10:30:15.123456
#
# Number of data points in past 1 hour: 3600
#                       time  value  unit
# 0 2025-10-26 09:30:15.123   85.1  degC
# 1 2025-10-26 09:30:16.234   85.2  degC
# 2 2025-10-26 09:30:17.345   85.3  degC
#
# 1-minute average data (last 24 hours):
#                       time   mean
# 1436 2025-10-26 10:26:00  85.2
# 1437 2025-10-26 10:27:00  85.3
# 1438 2025-10-26 10:28:00  85.4
# 1439 2025-10-26 10:29:00  85.5
# 1440 2025-10-26 10:30:00  85.5

2.5 Implementation Example 4: Real-time Data Validation and Cleaning

Sensor data always contains noise and abnormal values. Quality checks are necessary before sending to Digital Twin.

"""
Example 4: Real-time data validation and cleaning
Quality check of sensor data, outlier detection, missing value imputation
"""

import numpy as np
import pandas as pd
from typing import Optional, Dict, Tuple
from dataclasses import dataclass
from enum import Enum


class DataQuality(Enum):
    """Data quality level"""
    GOOD = "Good"           # Normal data
    UNCERTAIN = "Uncertain" # Uncertain (corrected)
    BAD = "Bad"            # Abnormal data (unusable)


@dataclass
class SensorReading:
    """Sensor reading value"""
    reactor_id: str
    sensor_type: str
    value: float
    unit: str
    timestamp: str
    quality: DataQuality = DataQuality.GOOD


class DataValidator:
    """Real-time data validation and cleaning

    Executes physical constraint checks, statistical outlier detection,
    and missing value imputation to ensure data quality.
    """

    def __init__(self):
        # Physical ranges for each sensor (based on process knowledge)
        self.physical_ranges = {
            'temperature': {'min': -50, 'max': 400, 'unit': 'degC'},
            'pressure': {'min': 0, 'max': 10, 'unit': 'MPa'},
            'flow_rate': {'min': 0, 'max': 500, 'unit': 'L/min'},
            'ph': {'min': 0, 'max': 14, 'unit': ''},
        }

        # Historical data for moving average (max 100 points)
        self.history: Dict[str, list] = {}
        self.max_history = 100

    def validate_physical_range(self, reading: SensorReading) -> Tuple[bool, str]:
        """Physical range check

        Args:
            reading: Sensor reading value

        Returns:
            (is_valid, message)
        """
        if reading.sensor_type not in self.physical_ranges:
            return True, "Range not defined"

        range_info = self.physical_ranges[reading.sensor_type]
        value = reading.value

        if value < range_info['min'] or value > range_info['max']:
            msg = (f"{reading.sensor_type} outside physical range: {value} "
                   f"(allowed range: {range_info['min']}-{range_info['max']} {range_info['unit']})")
            return False, msg

        return True, "OK"

    def detect_outlier_zscore(self, reading: SensorReading,
                             threshold: float = 3.0) -> Tuple[bool, str]:
        """Statistical outlier detection by Z-score

        Args:
            reading: Sensor reading value
            threshold: Z-score threshold (default 3.0)

        Returns:
            (is_outlier, message)
        """
        key = f"{reading.reactor_id}_{reading.sensor_type}"

        # Add to historical data
        if key not in self.history:
            self.history[key] = []

        self.history[key].append(reading.value)

        # Cannot determine with insufficient history
        if len(self.history[key]) < 10:
            return False, "Insufficient history"

        # Retain only latest 100 points
        if len(self.history[key]) > self.max_history:
            self.history[key] = self.history[key][-self.max_history:]

        # Calculate Z-score
        values = np.array(self.history[key])
        mean = np.mean(values)
        std = np.std(values)

        if std < 1e-6:  # Standard deviation close to zero
            return False, "Constant values"

        z_score = abs((reading.value - mean) / std)

        if z_score > threshold:
            msg = f"Outlier detected: Z-score={z_score:.2f} (threshold={threshold})"
            return True, msg

        return False, f"OK (Z-score={z_score:.2f})"

    def detect_spike(self, reading: SensorReading,
                    max_change_rate: float = 0.2) -> Tuple[bool, str]:
        """Detect rapid changes (spikes)

        Args:
            reading: Sensor reading value
            max_change_rate: Allowable change rate (e.g., 0.2 = 20%)

        Returns:
            (is_spike, message)
        """
        key = f"{reading.reactor_id}_{reading.sensor_type}"

        if key not in self.history or len(self.history[key]) == 0:
            return False, "No previous value"

        previous_value = self.history[key][-1]

        # Avoid division by zero
        if abs(previous_value) < 1e-6:
            return False, "Previous value near zero"

        # Calculate change rate
        change_rate = abs((reading.value - previous_value) / previous_value)

        if change_rate > max_change_rate:
            msg = f"Rapid change detected: {change_rate*100:.1f}% (allowed: {max_change_rate*100:.1f}%)"
            return True, msg

        return False, f"OK (change rate: {change_rate*100:.1f}%)"

    def interpolate_missing_value(self, reactor_id: str,
                                 sensor_type: str) -> Optional[float]:
        """Linear interpolation of missing value

        Args:
            reactor_id: Reactor ID
            sensor_type: Sensor type

        Returns:
            Interpolated value (None if no history)
        """
        key = f"{reactor_id}_{sensor_type}"

        if key not in self.history or len(self.history[key]) < 2:
            return None

        # Interpolate with mean of latest 2 points (simplified)
        recent_values = self.history[key][-2:]
        interpolated = np.mean(recent_values)

        return interpolated

    def validate(self, reading: SensorReading) -> SensorReading:
        """Execute comprehensive validation and determine data quality

        Args:
            reading: Sensor reading value

        Returns:
            SensorReading with updated quality flag
        """
        # 1. Physical range check
        is_valid, msg = self.validate_physical_range(reading)
        if not is_valid:
            print(f"[BAD] {msg}")
            reading.quality = DataQuality.BAD
            return reading

        # 2. Outlier detection
        is_outlier, msg = self.detect_outlier_zscore(reading)
        if is_outlier:
            print(f"[UNCERTAIN] {msg}")
            reading.quality = DataQuality.UNCERTAIN

        # 3. Spike detection
        is_spike, msg = self.detect_spike(reading)
        if is_spike:
            print(f"[UNCERTAIN] {msg}")
            reading.quality = DataQuality.UNCERTAIN

        return reading


# Usage example
if __name__ == "__main__":
    validator = DataValidator()

    # Simulate normal data
    print("=== Normal Data ===")
    for i in range(5):
        reading = SensorReading(
            reactor_id="R101",
            sensor_type="temperature",
            value=85.0 + np.random.normal(0, 0.5),  # Mean 85°C, Std 0.5°C
            unit="degC",
            timestamp=f"2025-10-26T10:30:{i:02d}"
        )
        validated = validator.validate(reading)
        print(f"Time {i}: {validated.value:.2f}°C - Quality: {validated.quality.value}")

    # Data outside physical range
    print("\n=== Data Outside Physical Range ===")
    bad_reading = SensorReading(
        reactor_id="R101",
        sensor_type="temperature",
        value=500.0,  # Outside range (max 400°C)
        unit="degC",
        timestamp="2025-10-26T10:30:10"
    )
    validated = validator.validate(bad_reading)
    print(f"Value: {validated.value}°C - Quality: {validated.quality.value}")

    # Outlier (spike)
    print("\n=== Spike Data ===")
    spike_reading = SensorReading(
        reactor_id="R101",
        sensor_type="temperature",
        value=120.0,  # Rapid increase
        unit="degC",
        timestamp="2025-10-26T10:30:15"
    )
    validated = validator.validate(spike_reading)
    print(f"Value: {validated.value}°C - Quality: {validated.quality.value}")

    # Missing value interpolation
    print("\n=== Missing Value Interpolation ===")
    interpolated = validator.interpolate_missing_value("R101", "temperature")
    if interpolated:
        print(f"Interpolated value: {interpolated:.2f}°C")

# Expected output example:
# === Normal Data ===
# Time 0: 85.23°C - Quality: Good
# Time 1: 84.87°C - Quality: Good
# Time 2: 85.41°C - Quality: Good
# Time 3: 85.12°C - Quality: Good
# Time 4: 84.95°C - Quality: Good
#
# === Data Outside Physical Range ===
# [BAD] temperature outside physical range: 500.0 (allowed range: -50-400 degC)
# Value: 500.0°C - Quality: Bad
#
# === Spike Data ===
# [UNCERTAIN] Rapid change detected: 41.2% (allowed: 20.0%)
# Value: 120.0°C - Quality: Uncertain
#
# === Missing Value Interpolation ===
# Interpolated value: 85.04°C

2.6 Implementation Example 5: Data Streaming Pipeline

Process data from multiple sensors in an integrated manner using Kafka-style streaming processing.

"""
Example 5: Data streaming pipeline
Real-time processing and distribution of data from multiple sensors
"""

from queue import Queue
from threading import Thread, Event
from typing import Callable, List
import time
from datetime import datetime
import json


class DataStream:
    """Real-time data stream

    Receives sensor data via queue and executes multiple
    processes in pipeline format.
    """

    def __init__(self, name: str, buffer_size: int = 1000):
        """
        Args:
            name: Stream name
            buffer_size: Buffer size (maximum queue length)
        """
        self.name = name
        self.queue = Queue(maxsize=buffer_size)
        self.processors: List[Callable] = []
        self.is_running = Event()
        self.worker_thread = None

    def add_processor(self, processor: Callable):
        """Add data processing function

        Args:
            processor: Function that receives and processes data processor(data)
        """
        self.processors.append(processor)

    def publish(self, data: dict):
        """Send data to stream

        Args:
            data: Sensor data dictionary
        """
        try:
            self.queue.put(data, timeout=1)
        except Exception as e:
            print(f"[{self.name}] Queue full: {e}")

    def _process_loop(self):
        """Data processing loop (executed in separate thread)"""
        while self.is_running.is_set():
            try:
                # Retrieve data from queue (1 second timeout)
                data = self.queue.get(timeout=1)

                # Execute all processing functions sequentially
                for processor in self.processors:
                    try:
                        data = processor(data)
                        if data is None:  # Filtered in processing
                            break
                    except Exception as e:
                        print(f"[{self.name}] Processing error: {e}")
                        break

            except Exception:
                # Timeout (no data) → continue
                continue

    def start(self):
        """Start stream processing"""
        if self.worker_thread and self.worker_thread.is_alive():
            print(f"[{self.name}] Already running")
            return

        self.is_running.set()
        self.worker_thread = Thread(target=self._process_loop, daemon=True)
        self.worker_thread.start()
        print(f"[{self.name}] Stream processing started")

    def stop(self):
        """Stop stream processing"""
        self.is_running.clear()
        if self.worker_thread:
            self.worker_thread.join(timeout=5)
        print(f"[{self.name}] Stream processing stopped")


class StreamingPipeline:
    """Pipeline managing multiple streams"""

    def __init__(self):
        self.streams = {}

    def create_stream(self, name: str) -> DataStream:
        """Create new stream

        Args:
            name: Stream name

        Returns:
            DataStream object
        """
        stream = DataStream(name)
        self.streams[name] = stream
        return stream

    def start_all(self):
        """Start all streams"""
        for stream in self.streams.values():
            stream.start()

    def stop_all(self):
        """Stop all streams"""
        for stream in self.streams.values():
            stream.stop()


# Examples of data processing functions
def enrich_with_metadata(data: dict) -> dict:
    """Add metadata"""
    data['processed_at'] = datetime.now().isoformat()
    data['pipeline'] = 'streaming_v1'
    return data


def filter_bad_quality(data: dict) -> dict:
    """Filter bad quality data"""
    if data.get('quality') == 'Bad':
        print(f"Filtering bad data: {data}")
        return None  # None = discard this data
    return data


def aggregate_1sec_window(data: dict) -> dict:
    """Aggregate in 1-second window (simplified)"""
    # Actually aggregate multiple data in time window
    data['aggregation'] = '1sec_mean'
    return data


def send_to_digital_twin(data: dict) -> dict:
    """Send to Digital Twin model"""
    print(f"Send to Digital Twin: {data['reactor_id']}.{data['sensor_type']} = {data['value']}")
    # Actually call Digital Twin API
    return data


def save_to_database(data: dict) -> dict:
    """Save to database"""
    # Actually save to InfluxDB, etc.
    print(f"Save to DB: {json.dumps(data, ensure_ascii=False)}")
    return data


# Usage example
if __name__ == "__main__":
    # Create pipeline
    pipeline = StreamingPipeline()

    # Temperature data stream
    temp_stream = pipeline.create_stream("temperature_stream")
    temp_stream.add_processor(enrich_with_metadata)
    temp_stream.add_processor(filter_bad_quality)
    temp_stream.add_processor(aggregate_1sec_window)
    temp_stream.add_processor(send_to_digital_twin)
    temp_stream.add_processor(save_to_database)

    # Pressure data stream
    pres_stream = pipeline.create_stream("pressure_stream")
    pres_stream.add_processor(enrich_with_metadata)
    pres_stream.add_processor(filter_bad_quality)
    pres_stream.add_processor(send_to_digital_twin)

    # Start all streams
    pipeline.start_all()

    try:
        # Simulate sensor data
        for i in range(10):
            # Temperature data
            temp_data = {
                'reactor_id': 'R101',
                'sensor_type': 'temperature',
                'value': 85.0 + i * 0.1,
                'unit': 'degC',
                'quality': 'Good' if i < 8 else 'Bad',  # After 8th is bad
                'timestamp': datetime.now().isoformat()
            }
            temp_stream.publish(temp_data)

            # Pressure data
            pres_data = {
                'reactor_id': 'R101',
                'sensor_type': 'pressure',
                'value': 2.45 + i * 0.01,
                'unit': 'MPa',
                'quality': 'Good',
                'timestamp': datetime.now().isoformat()
            }
            pres_stream.publish(pres_data)

            time.sleep(0.5)  # Send data at 0.5 second intervals

        # Wait for processing to complete
        time.sleep(2)

    finally:
        # Stop all streams
        pipeline.stop_all()

# Expected output example:
# [temperature_stream] Stream processing started
# [pressure_stream] Stream processing started
# Send to Digital Twin: R101.temperature = 85.0
# Save to DB: {"reactor_id": "R101", "sensor_type": "temperature", "value": 85.0, ...}
# Send to Digital Twin: R101.pressure = 2.45
# Send to Digital Twin: R101.temperature = 85.1
# Save to DB: {"reactor_id": "R101", "sensor_type": "temperature", "value": 85.1, ...}
# ...
# Filtering bad data: {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.8, 'quality': 'Bad'}
# Filtering bad data: {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.9, 'quality': 'Bad'}
# [temperature_stream] Stream processing stopped
# [pressure_stream] Stream processing stopped

2.7 Implementation Example 6: State Estimation using Kalman Filter

Implementation of Kalman filter that estimates the true state from sensor data containing noise.

"""
Example 6: State estimation using Kalman filter
Estimate true state of reactor from sensor data containing noise
"""

import numpy as np
from dataclasses import dataclass
from typing import Tuple


@dataclass
class KalmanState:
    """State of Kalman filter"""
    x: np.ndarray      # State vector [temperature, temperature change rate]
    P: np.ndarray      # Error covariance matrix


class ReactorKalmanFilter:
    """Kalman filter for reactor state estimation

    Removes noise from temperature sensor and estimates true
    temperature and temperature change rate.
    """

    def __init__(self,
                 initial_temp: float,
                 process_noise: float = 0.01,
                 measurement_noise: float = 0.5):
        """
        Args:
            initial_temp: Initial temperature [°C]
            process_noise: Process noise (system uncertainty)
            measurement_noise: Measurement noise (sensor noise)
        """
        # State vector: [temperature, temperature change rate]
        self.state = KalmanState(
            x=np.array([initial_temp, 0.0]),  # [°C, °C/s]
            P=np.eye(2) * 1.0                  # Initial error covariance
        )

        # System model: x(k+1) = F * x(k) + w
        # Assume dt = 1 second
        dt = 1.0
        self.F = np.array([
            [1.0, dt],   # temp(k+1) = temp(k) + rate(k) * dt
            [0.0, 1.0]   # rate(k+1) = rate(k) (assume constant)
        ])

        # Observation model: z(k) = H * x(k) + v
        # Only temperature is observable
        self.H = np.array([[1.0, 0.0]])

        # Process noise covariance
        self.Q = np.array([
            [process_noise, 0.0],
            [0.0, process_noise]
        ])

        # Measurement noise covariance
        self.R = np.array([[measurement_noise]])

    def predict(self) -> Tuple[float, float]:
        """Prediction step (state prediction based on system model)

        Returns:
            (predicted_temp, predicted_rate) Predicted temperature and rate
        """
        # State prediction: x_pred = F * x
        x_pred = self.F @ self.state.x

        # Error covariance prediction: P_pred = F * P * F^T + Q
        P_pred = self.F @ self.state.P @ self.F.T + self.Q

        # Update predicted state
        self.state.x = x_pred
        self.state.P = P_pred

        return self.state.x[0], self.state.x[1]

    def update(self, measurement: float) -> Tuple[float, float]:
        """Update step (correct state with measurement value)

        Args:
            measurement: Sensor measurement value (temperature) [°C]

        Returns:
            (estimated_temp, estimated_rate) Estimated temperature and rate
        """
        # Innovation (difference between prediction and measurement): y = z - H * x_pred
        z = np.array([measurement])
        y = z - self.H @ self.state.x

        # Innovation covariance: S = H * P_pred * H^T + R
        S = self.H @ self.state.P @ self.H.T + self.R

        # Kalman gain: K = P_pred * H^T * S^-1
        K = self.state.P @ self.H.T @ np.linalg.inv(S)

        # State update: x = x_pred + K * y
        self.state.x = self.state.x + K @ y

        # Error covariance update: P = (I - K * H) * P_pred
        I = np.eye(2)
        self.state.P = (I - K @ self.H) @ self.state.P

        return self.state.x[0], self.state.x[1]

    def estimate(self, measurement: float) -> Tuple[float, float]:
        """Execute one cycle of prediction → update

        Args:
            measurement: Sensor measurement value [°C]

        Returns:
            (estimated_temp, estimated_rate) Estimated temperature and rate
        """
        # Predict
        self.predict()

        # Update
        return self.update(measurement)


# Usage example
if __name__ == "__main__":
    # Initialize Kalman filter
    kf = ReactorKalmanFilter(
        initial_temp=85.0,
        process_noise=0.01,      # Stable process
        measurement_noise=0.5    # Moderate sensor noise
    )

    # True temperature profile (ramp heating: 85°C → 95°C)
    time_steps = 50
    true_temps = np.linspace(85.0, 95.0, time_steps)

    # Sensor measurements (with noise)
    np.random.seed(42)
    noise = np.random.normal(0, 0.5, time_steps)  # Standard deviation 0.5°C
    measured_temps = true_temps + noise

    print("Time\tTrue\tMeasured\tEstimated\tRate")
    print("-" * 60)

    estimated_temps = []
    estimated_rates = []

    for t in range(time_steps):
        # Estimate with Kalman filter
        est_temp, est_rate = kf.estimate(measured_temps[t])
        estimated_temps.append(est_temp)
        estimated_rates.append(est_rate)

        if t % 5 == 0:  # Display every 5 seconds
            print(f"{t}\t{true_temps[t]:.2f}\t{measured_temps[t]:.2f}\t\t{est_temp:.2f}\t\t{est_rate:.3f}")

    # Accuracy evaluation
    mse_raw = np.mean((measured_temps - true_temps) ** 2)
    mse_filtered = np.mean((np.array(estimated_temps) - true_temps) ** 2)

    print(f"\n=== Accuracy Evaluation ===")
    print(f"MSE of raw data: {mse_raw:.4f}")
    print(f"MSE after filtering: {mse_filtered:.4f}")
    print(f"Accuracy improvement rate: {(1 - mse_filtered/mse_raw) * 100:.1f}%")

# Expected output example:
# Time    True    Measured        Estimated       Rate
# ------------------------------------------------------------
# 0       85.00   85.25           85.13           0.051
# 5       86.02   85.78           85.96           0.192
# 10      87.04   86.92           87.01           0.204
# 15      88.06   88.31           88.08           0.206
# 20      89.08   89.23           89.10           0.204
# 25      90.10   90.35           90.13           0.203
# 30      91.12   90.98           91.14           0.204
# 35      92.14   92.45           92.17           0.203
# 40      93.16   93.08           93.18           0.204
# 45      94.18   94.52           94.21           0.203
#
# === Accuracy Evaluation ===
# MSE of raw data: 0.2435
# MSE after filtering: 0.0089
# Accuracy improvement rate: 96.3%

2.8 Implementation Example 7: Digital Twin Synchronization Loop

Complete implementation that integrates all elements to realize real-time synchronization with Digital Twin.

"""
Example 7: Digital Twin synchronization loop
Complete cycle of sensor data acquisition → validation → state estimation → model update
"""

import time
from datetime import datetime
from typing import Dict, Optional
import numpy as np


class DigitalTwinModel:
    """Simplified physical model of Digital Twin

    Calculates reaction progress from temperature, pressure, flow rate
    using reactor heat balance model.
    """

    def __init__(self, reactor_id: str):
        self.reactor_id = reactor_id

        # Model parameters
        self.heat_capacity = 4200  # J/(kg·K)
        self.mass = 1000           # kg
        self.heat_transfer_coeff = 500  # W/(m²·K)
        self.area = 10             # m²

        # State variables
        self.state = {
            'temperature': 85.0,    # °C
            'pressure': 2.5,        # MPa
            'flow_rate': 120.0,     # L/min
            'conversion': 0.0,      # Reaction progress (0-1)
            'last_update': datetime.now()
        }

    def update_from_sensors(self, sensor_data: Dict[str, float]):
        """Update model with sensor data

        Args:
            sensor_data: {'temperature': 85.3, 'pressure': 2.45, 'flow_rate': 120.5}
        """
        # Update model state with sensor values
        if 'temperature' in sensor_data:
            self.state['temperature'] = sensor_data['temperature']

        if 'pressure' in sensor_data:
            self.state['pressure'] = sensor_data['pressure']

        if 'flow_rate' in sensor_data:
            self.state['flow_rate'] = sensor_data['flow_rate']

        self.state['last_update'] = datetime.now()

    def calculate_reaction_conversion(self) -> float:
        """Calculate reaction progress (simplified model)

        Integration of reaction rate based on Arrhenius equation

        Returns:
            Reaction progress (0-1)
        """
        T = self.state['temperature'] + 273.15  # K
        P = self.state['pressure'] * 1e6        # Pa

        # Arrhenius parameters (hypothetical values)
        A = 1e10      # Frequency factor
        Ea = 80000    # Activation energy [J/mol]
        R = 8.314     # Gas constant [J/(mol·K)]

        # Reaction rate constant
        k = A * np.exp(-Ea / (R * T))

        # Pressure correction (simplified)
        k_eff = k * (P / 2.5e6) ** 0.5

        # Reaction progress (approximation of time integration)
        dt = 1.0  # 1 second
        dX = k_eff * (1 - self.state['conversion']) * dt

        new_conversion = min(self.state['conversion'] + dX, 1.0)
        self.state['conversion'] = new_conversion

        return new_conversion

    def predict_next_state(self, time_horizon: float = 60) -> Dict:
        """Predict future state

        Args:
            time_horizon: Prediction time [seconds]

        Returns:
            Predicted state dictionary
        """
        # Predict future conversion from current reaction rate
        predicted_conversion = min(
            self.state['conversion'] + 0.01 * time_horizon,
            1.0
        )

        # For exothermic reaction, predict temperature rise
        heat_released = predicted_conversion * 100000  # J (hypothetical value)
        temp_rise = heat_released / (self.mass * self.heat_capacity)
        predicted_temp = self.state['temperature'] + temp_rise

        return {
            'temperature': predicted_temp,
            'conversion': predicted_conversion,
            'time_horizon': time_horizon
        }

    def get_state(self) -> Dict:
        """Get current state"""
        return self.state.copy()


class DigitalTwinSyncLoop:
    """Digital Twin synchronization loop

    Executes sensor data acquisition → validation → state estimation → model update
    in real-time.
    """

    def __init__(self, reactor_id: str, update_interval: float = 1.0):
        """
        Args:
            reactor_id: Reactor ID
            update_interval: Update interval [seconds]
        """
        self.reactor_id = reactor_id
        self.update_interval = update_interval

        # Digital Twin model
        self.twin_model = DigitalTwinModel(reactor_id)

        # Kalman filter (for temperature)
        self.temp_filter = ReactorKalmanFilter(
            initial_temp=85.0,
            process_noise=0.01,
            measurement_noise=0.5
        )

        # Data validator
        self.validator = DataValidator()

        self.is_running = False
        self.cycle_count = 0

    def fetch_sensor_data(self) -> Dict[str, float]:
        """Fetch sensor data (actually from MQTT/OPC UA)

        Returns:
            {'temperature': 85.3, 'pressure': 2.45, 'flow_rate': 120.5}
        """
        # Here we generate mock data
        # Actually fetch from MQTT client or OPC UA client

        base_temp = 85.0 + self.cycle_count * 0.1  # Slowly heat up
        noise = np.random.normal(0, 0.5)

        return {
            'temperature': base_temp + noise,
            'pressure': 2.5 + np.random.normal(0, 0.02),
            'flow_rate': 120.0 + np.random.normal(0, 1.0)
        }

    def run_single_cycle(self):
        """Execute one cycle of synchronization loop"""
        cycle_start = time.time()

        # 1. Fetch sensor data
        raw_sensor_data = self.fetch_sensor_data()

        # 2. Data validation
        temp_reading = SensorReading(
            reactor_id=self.reactor_id,
            sensor_type="temperature",
            value=raw_sensor_data['temperature'],
            unit="degC",
            timestamp=datetime.now().isoformat()
        )
        validated_reading = self.validator.validate(temp_reading)

        # 3. State estimation with Kalman filter
        if validated_reading.quality != DataQuality.BAD:
            est_temp, est_rate = self.temp_filter.estimate(validated_reading.value)
            filtered_data = {
                'temperature': est_temp,
                'pressure': raw_sensor_data['pressure'],
                'flow_rate': raw_sensor_data['flow_rate']
            }
        else:
            # Use previous value for bad data
            filtered_data = None

        # 4. Update Digital Twin model
        if filtered_data:
            self.twin_model.update_from_sensors(filtered_data)

        # 5. Calculate reaction progress
        conversion = self.twin_model.calculate_reaction_conversion()

        # 6. Future prediction
        prediction = self.twin_model.predict_next_state(time_horizon=60)

        # 7. Display results
        state = self.twin_model.get_state()
        cycle_time = (time.time() - cycle_start) * 1000  # ms

        print(f"[Cycle {self.cycle_count}] "
              f"Temp: {state['temperature']:.2f}°C "
              f"(filtered: {est_temp:.2f}°C), "
              f"Pressure: {state['pressure']:.2f} MPa, "
              f"Conversion: {conversion*100:.1f}%, "
              f"Prediction(60s): {prediction['conversion']*100:.1f}% "
              f"({cycle_time:.1f}ms)")

        self.cycle_count += 1

    def run(self, duration: Optional[float] = None):
        """Start synchronization loop

        Args:
            duration: Execution time [seconds] (infinite loop if None)
        """
        self.is_running = True
        start_time = time.time()

        print(f"=== Digital Twin Synchronization Loop Started ===")
        print(f"Reactor ID: {self.reactor_id}")
        print(f"Update Interval: {self.update_interval}s")
        print("-" * 80)

        try:
            while self.is_running:
                # Execute one cycle
                self.run_single_cycle()

                # Check termination condition
                if duration and (time.time() - start_time) >= duration:
                    break

                # Wait until next update
                time.sleep(self.update_interval)

        except KeyboardInterrupt:
            print("\n\nStopped by keyboard interrupt")

        finally:
            self.is_running = False
            print(f"\n=== Synchronization Loop Ended ===")
            print(f"Total cycles: {self.cycle_count}")
            print(f"Execution time: {time.time() - start_time:.1f} seconds")


# Usage example
if __name__ == "__main__":
    # Create Digital Twin synchronization loop
    sync_loop = DigitalTwinSyncLoop(
        reactor_id="R101",
        update_interval=1.0  # Update every 1 second
    )

    # Execute for 30 seconds
    sync_loop.run(duration=30)

# Expected output example:
# === Digital Twin Synchronization Loop Started ===
# Reactor ID: R101
# Update Interval: 1.0s
# --------------------------------------------------------------------------------
# [Cycle 0] Temp: 85.23°C (filtered: 85.13°C), Pressure: 2.51 MPa, Conversion: 0.2%, Prediction(60s): 0.8% (2.3ms)
# [Cycle 1] Temp: 85.34°C (filtered: 85.28°C), Pressure: 2.49 MPa, Conversion: 0.4%, Prediction(60s): 1.0% (1.8ms)
# [Cycle 2] Temp: 85.41°C (filtered: 85.39°C), Pressure: 2.52 MPa, Conversion: 0.6%, Prediction(60s): 1.2% (1.7ms)
# ...
# [Cycle 29] Temp: 88.12°C (filtered: 88.09°C), Pressure: 2.48 MPa, Conversion: 6.2%, Prediction(60s): 6.8% (1.9ms)
#
# === Synchronization Loop Ended ===
# Total cycles: 30
# Execution time: 30.1 seconds
💡 Implementation Points

Verification of Learning Objectives

Upon completing this chapter, you will be able to implement the following:

Basic Understanding

Practical Skills

Application Ability

Exercise Problems

Easy (Basic Verification)

Q1: What is the main difference between MQTT and OPC UA? Choose the most appropriate answer.

a) MQTT is for industrial use, OPC UA is for IoT
b) MQTT is lightweight and suitable for IoT, OPC UA is industrial automation standard
c) MQTT is paid, OPC UA is free
d) MQTT is old standard, OPC UA is new standard

View answer

Correct answer: b) MQTT is lightweight and suitable for IoT, OPC UA is industrial automation standard

Explanation:

  • MQTT: Lightweight messaging protocol, optimal for IoT devices, Publish/Subscribe type
  • OPC UA: Industrial automation standard, PLC/DCS connection, Client/Server type
  • Both are open standards and can be used free of charge

Medium (Application)

Q2: If sensor data has a Z-score of 3.5, should this data be treated as an outlier? Explain the judgment and reasons when the Z-score threshold is set to 3.0.

View answer

Correct answer: Yes, it should be treated as an outlier.

Reasons:

  • Z-score = 3.5 exceeds threshold 3.0
  • For normal distribution, data with Z-score > 3.0 is less than 0.3% of total (very rare)
  • High possibility of sensor noise or spike

Practical response:

  • Flag data quality as "Uncertain"
  • Correct with Kalman filter or moving average
  • Do not completely exclude, save as history (for later cause analysis)

Hard (Advanced)

Q3: In a chemical plant where 1000 sensors send data at 1-second intervals, the Digital Twin synchronization loop update interval is set to 1 second. If the average processing time per cycle is 800ms, will the system operate stably? If there are problems, propose three improvement measures.

View answer

Correct answer: High possibility of instability (processing time 800ms ≈ update interval 1000ms with insufficient margin)

Problems:

  • Possibility of exceeding 1000ms in worst case with processing time fluctuation
  • Data delay accumulation causes buffer overflow
  • Loss of real-time performance

Improvement measures:

  1. Introduction of parallel processing
    • Group sensors and process in parallel (multi-thread/multi-process)
    • Expected effect: Reduce processing time to 1/4 (with 4 cores)
  2. Batch processing optimization
    • Batch read sensor data (use OPC UA read_multiple_nodes)
    • Also batch database writes
    • Expected effect: 90% reduction in communication overhead
  3. Hierarchical update frequency
    • Critical sensors (temperature, pressure): 1 second interval
    • Auxiliary sensors (flow rate, level): 5 second interval
    • Analyzers: 60 second interval
    • Expected effect: 60% reduction in processing load

Implementation example (parallel processing):

from concurrent.futures import ThreadPoolExecutor

def process_sensor_group(sensor_ids):
    # Process sensor group
    pass

# Divide 1000 sensors into 4 groups
groups = [sensors[i:i+250] for i in range(0, 1000, 250)]

with ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(process_sensor_group, groups)
# Processing time: 800ms → 200ms

Next Steps

Once the foundation of real-time data integration is established, we will next learn Digital Twin modeling methods. Through hybrid modeling combining physical models (first principles) and data-driven models (machine learning), we will realize high-precision prediction and control.

References

  1. Eclipse Paho™ MQTT Python Client. (2024). paho-mqtt Documentation. https://www.eclipse.org/paho/
  2. OPC Foundation. (2023). OPC Unified Architecture Specification Part 1: Overview and Concepts. https://opcfoundation.org/
  3. InfluxData. (2024). InfluxDB Python Client Library. https://docs.influxdata.com/influxdb/
  4. Welch, G., & Bishop, G. (2006). An Introduction to the Kalman Filter. UNC-Chapel Hill, TR 95-041.
  5. Kritzinger, W., et al. (2018). "Digital Twin in manufacturing: A categorical literature review and classification." IFAC-PapersOnLine, 51(11), 1016-1022.

Disclaimer