Implementation technology for synchronizing physical systems and digital space through Digital Twin and IoT sensor integration
The accuracy and practicality of Digital Twin heavily depends on the synchronization accuracy between the physical system and the digital model. In this chapter, we will learn seven practical methods for integrating real-time data from IoT sensors into Digital Twin.
Real-time data integration in the process industry requires addressing the following challenges:
MQTT is a lightweight and reliable messaging protocol widely used for communication with IoT devices.
"""
Example 1: Real-time sensor data acquisition via MQTT communication
Receive data via MQTT from temperature and pressure sensors of chemical reactor
"""
import paho.mqtt.client as mqtt
import json
from datetime import datetime
from typing import Dict, Callable
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MQTTSensorClient:
"""Client for communicating with IoT sensors via MQTT protocol
Acquires real-time data from chemical plant reactor sensors
(temperature, pressure, flow rate) and sends to Digital Twin.
"""
def __init__(self, broker_address: str, port: int = 1883):
"""
Args:
broker_address: IP address/hostname of MQTT broker
port: MQTT port number (default 1883)
"""
self.client = mqtt.Client(client_id="digital_twin_client")
self.broker_address = broker_address
self.port = port
self.data_callbacks: Dict[str, Callable] = {}
# Register callback functions
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
def _on_connect(self, client, userdata, flags, rc):
"""Callback on connection establishment"""
if rc == 0:
logger.info(f"Successfully connected to MQTT broker: {self.broker_address}")
# Subscribe to all reactor sensor topics
self.client.subscribe("reactor/+/sensor/#")
else:
logger.error(f"Connection failed: return code {rc}")
def _on_message(self, client, userdata, msg):
"""Callback on message reception"""
try:
# Parse JSON payload
payload = json.loads(msg.payload.decode())
topic = msg.topic
# Add timestamp (if not included in sensor data)
if 'timestamp' not in payload:
payload['timestamp'] = datetime.now().isoformat()
logger.info(f"Received: {topic} - {payload}")
# Execute registered callback functions
for callback in self.data_callbacks.values():
callback(topic, payload)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error: {e}")
except Exception as e:
logger.error(f"Message processing error: {e}")
def _on_disconnect(self, client, userdata, rc):
"""Callback on disconnection"""
if rc != 0:
logger.warning(f"Unexpected disconnection: return code {rc}")
logger.info("Attempting to reconnect...")
def register_callback(self, name: str, callback: Callable):
"""Register callback function for data reception
Args:
name: Callback identifier name
callback: Function to execute callback(topic: str, payload: dict)
"""
self.data_callbacks[name] = callback
def connect(self):
"""Connect to MQTT broker"""
self.client.connect(self.broker_address, self.port, 60)
self.client.loop_start()
def disconnect(self):
"""Disconnect from MQTT broker"""
self.client.loop_stop()
self.client.disconnect()
# Usage example
def process_sensor_data(topic: str, payload: dict):
"""Send received sensor data to Digital Twin"""
reactor_id = topic.split('/')[1]
sensor_type = topic.split('/')[-1]
print(f"Reactor {reactor_id} - {sensor_type}: {payload['value']} {payload['unit']}")
# Reflect in Digital Twin model (described later)
# digital_twin.update_sensor_value(reactor_id, sensor_type, payload)
if __name__ == "__main__":
# Initialize MQTT client
mqtt_client = MQTTSensorClient(broker_address="192.168.1.100")
# Register data processing callback
mqtt_client.register_callback("process_data", process_sensor_data)
# Start connection
mqtt_client.connect()
try:
# Continuously receive data (exit with Ctrl+C)
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
mqtt_client.disconnect()
print("\nConnection terminated")
# Expected output example:
# INFO:__main__:Successfully connected to MQTT broker: 192.168.1.100
# INFO:__main__:Received: reactor/R101/sensor/temperature - {'value': 85.3, 'unit': 'degC', 'timestamp': '2025-10-26T10:30:15'}
# Reactor R101 - temperature: 85.3 degC
# INFO:__main__:Received: reactor/R101/sensor/pressure - {'value': 2.45, 'unit': 'MPa', 'timestamp': '2025-10-26T10:30:15'}
# Reactor R101 - pressure: 2.45 MPa
OPC UA (OPC Unified Architecture) is a standard communication protocol in the industrial automation field, used for connection with PLC and DCS.
"""
Example 2: Industrial equipment data acquisition via OPC UA communication
Acquire control parameters and measured values from PLC for reactor
"""
from opcua import Client, ua
from typing import List, Dict
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OPCUAReactorClient:
"""Client for acquiring reactor data via OPC UA
Acquires set values (SV) and process values (PV) in real-time
from DCS (Distributed Control System) or PLC.
"""
def __init__(self, server_url: str):
"""
Args:
server_url: OPC UA server URL (e.g., "opc.tcp://192.168.1.50:4840")
"""
self.client = Client(server_url)
self.server_url = server_url
self.subscriptions = []
def connect(self):
"""Connect to OPC UA server"""
try:
self.client.connect()
logger.info(f"Connected to OPC UA server: {self.server_url}")
# Retrieve server information
server_info = self.client.get_server_node()
logger.info(f"Server name: {server_info.get_browse_name()}")
except Exception as e:
logger.error(f"Connection error: {e}")
raise
def read_node(self, node_id: str) -> Dict:
"""Read value from OPC UA node
Args:
node_id: Node ID (e.g., "ns=2;s=Reactor.R101.Temperature")
Returns:
{'value': 85.3, 'quality': 'Good', 'timestamp': '2025-10-26T10:30:15'}
"""
try:
node = self.client.get_node(node_id)
data_value = node.get_data_value()
return {
'value': data_value.Value.Value,
'quality': str(data_value.StatusCode),
'timestamp': data_value.SourceTimestamp.isoformat(),
'node_id': node_id
}
except Exception as e:
logger.error(f"Node read error ({node_id}): {e}")
return None
def read_multiple_nodes(self, node_ids: List[str]) -> Dict[str, Dict]:
"""Batch read multiple nodes (efficient)
Args:
node_ids: List of node IDs
Returns:
{node_id: {'value': ..., 'quality': ..., 'timestamp': ...}}
"""
results = {}
nodes = [self.client.get_node(nid) for nid in node_ids]
# Batch read (performance improvement)
data_values = self.client.get_values(nodes)
for node_id, data_value in zip(node_ids, data_values):
results[node_id] = {
'value': data_value,
'quality': 'Good', # Simplified (actually check StatusCode)
'timestamp': time.time()
}
return results
def subscribe_to_changes(self, node_id: str, callback):
"""Subscribe to node changes (notification only on change)
Args:
node_id: Node ID to monitor
callback: Function called on change callback(node_id, value)
"""
try:
# Create subscription (100ms interval)
subscription = self.client.create_subscription(100, callback)
node = self.client.get_node(node_id)
# Start monitoring data changes
handle = subscription.subscribe_data_change(node)
self.subscriptions.append((subscription, handle))
logger.info(f"Subscription started: {node_id}")
except Exception as e:
logger.error(f"Subscription error: {e}")
def disconnect(self):
"""Disconnect from OPC UA server"""
# Remove all subscriptions
for subscription, handle in self.subscriptions:
subscription.unsubscribe(handle)
subscription.delete()
self.client.disconnect()
logger.info("Disconnected from OPC UA server")
# Usage example
if __name__ == "__main__":
# Initialize OPC UA client
opcua_client = OPCUAReactorClient(server_url="opc.tcp://192.168.1.50:4840")
opcua_client.connect()
# Monitoring nodes for reactor R101
reactor_nodes = [
"ns=2;s=Reactor.R101.Temperature.PV", # Temperature process value
"ns=2;s=Reactor.R101.Temperature.SV", # Temperature set value
"ns=2;s=Reactor.R101.Pressure.PV", # Pressure process value
"ns=2;s=Reactor.R101.Pressure.SV", # Pressure set value
"ns=2;s=Reactor.R101.FlowRate.PV", # Flow rate process value
]
try:
# Batch read multiple nodes
data = opcua_client.read_multiple_nodes(reactor_nodes)
for node_id, values in data.items():
print(f"{node_id}: {values['value']}")
# Continuous monitoring (10 seconds)
print("\nStarting continuous monitoring...")
for _ in range(10):
data = opcua_client.read_multiple_nodes(reactor_nodes)
temp_pv = data["ns=2;s=Reactor.R101.Temperature.PV"]['value']
pres_pv = data["ns=2;s=Reactor.R101.Pressure.PV"]['value']
print(f"Temperature: {temp_pv:.1f}°C, Pressure: {pres_pv:.2f} MPa")
time.sleep(1)
finally:
opcua_client.disconnect()
# Expected output example:
# INFO:__main__:Connected to OPC UA server: opc.tcp://192.168.1.50:4840
# ns=2;s=Reactor.R101.Temperature.PV: 85.3
# ns=2;s=Reactor.R101.Temperature.SV: 85.0
# ns=2;s=Reactor.R101.Pressure.PV: 2.45
# ns=2;s=Reactor.R101.Pressure.SV: 2.50
# ns=2;s=Reactor.R101.FlowRate.PV: 120.5
#
# Starting continuous monitoring...
# Temperature: 85.3°C, Pressure: 2.45 MPa
# Temperature: 85.5°C, Pressure: 2.46 MPa
# Temperature: 85.4°C, Pressure: 2.45 MPa
InfluxDB is a high-performance time series database optimal for long-term storage and querying of sensor data.
"""
Example 3: InfluxDB time series database integration
Efficient storage and high-speed queries for sensor data
"""
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime, timedelta
import pandas as pd
from typing import List, Dict
class ReactorDataStore:
"""Storage and retrieval of reactor data using InfluxDB
Stores sensor data in time series database and realizes
high-speed queries for Digital Twin.
"""
def __init__(self, url: str, token: str, org: str, bucket: str):
"""
Args:
url: InfluxDB URL (e.g., "http://localhost:8086")
token: Authentication token
org: Organization name
bucket: Bucket name (equivalent to database name)
"""
self.client = InfluxDBClient(url=url, token=token, org=org)
self.org = org
self.bucket = bucket
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
def write_sensor_data(self, reactor_id: str, sensor_type: str,
value: float, unit: str, quality: str = "Good"):
"""Write sensor data
Args:
reactor_id: Reactor ID (e.g., "R101")
sensor_type: Sensor type (e.g., "temperature")
value: Measured value
unit: Unit (e.g., "degC")
quality: Data quality ("Good", "Bad", "Uncertain")
"""
point = (
Point("reactor_sensor")
.tag("reactor_id", reactor_id)
.tag("sensor_type", sensor_type)
.tag("unit", unit)
.tag("quality", quality)
.field("value", value)
.time(datetime.utcnow())
)
self.write_api.write(bucket=self.bucket, record=point)
def write_batch_data(self, data_points: List[Dict]):
"""Batch write multiple data points (faster)
Args:
data_points: [{'reactor_id': 'R101', 'sensor_type': 'temperature', ...}]
"""
points = []
for dp in data_points:
point = (
Point("reactor_sensor")
.tag("reactor_id", dp['reactor_id'])
.tag("sensor_type", dp['sensor_type'])
.tag("unit", dp.get('unit', ''))
.field("value", dp['value'])
.time(dp.get('timestamp', datetime.utcnow()))
)
points.append(point)
self.write_api.write(bucket=self.bucket, record=points)
def query_latest_value(self, reactor_id: str, sensor_type: str) -> Dict:
"""Retrieve latest sensor value
Args:
reactor_id: Reactor ID
sensor_type: Sensor type
Returns:
{'time': '2025-10-26T10:30:15', 'value': 85.3, 'unit': 'degC'}
"""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "reactor_sensor")
|> filter(fn: (r) => r.reactor_id == "{reactor_id}")
|> filter(fn: (r) => r.sensor_type == "{sensor_type}")
|> last()
'''
result = self.query_api.query(query=query, org=self.org)
if result and len(result) > 0 and len(result[0].records) > 0:
record = result[0].records[0]
return {
'time': record.get_time().isoformat(),
'value': record.get_value(),
'unit': record.values.get('unit', ''),
'quality': record.values.get('quality', 'Unknown')
}
return None
def query_time_range(self, reactor_id: str, sensor_type: str,
start: datetime, end: datetime) -> pd.DataFrame:
"""Retrieve data for specified period and convert to DataFrame
Args:
reactor_id: Reactor ID
sensor_type: Sensor type
start: Start time
end: End time
Returns:
pd.DataFrame with columns ['time', 'value', 'unit']
"""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start.isoformat()}Z, stop: {end.isoformat()}Z)
|> filter(fn: (r) => r._measurement == "reactor_sensor")
|> filter(fn: (r) => r.reactor_id == "{reactor_id}")
|> filter(fn: (r) => r.sensor_type == "{sensor_type}")
'''
result = self.query_api.query_data_frame(query=query, org=self.org)
if not result.empty:
# Extract only necessary columns
df = result[['_time', '_value', 'unit']].copy()
df.columns = ['time', 'value', 'unit']
return df
return pd.DataFrame(columns=['time', 'value', 'unit'])
def query_aggregated_data(self, reactor_id: str, sensor_type: str,
window: str = "1m") -> pd.DataFrame:
"""Retrieve aggregated data (mean, max, min)
Args:
reactor_id: Reactor ID
sensor_type: Sensor type
window: Aggregation window (e.g., "1m", "5m", "1h")
Returns:
pd.DataFrame with columns ['time', 'mean', 'max', 'min']
"""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "reactor_sensor")
|> filter(fn: (r) => r.reactor_id == "{reactor_id}")
|> filter(fn: (r) => r.sensor_type == "{sensor_type}")
|> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
'''
result = self.query_api.query_data_frame(query=query, org=self.org)
if not result.empty:
df = result[['_time', '_value']].copy()
df.columns = ['time', 'mean']
return df
return pd.DataFrame(columns=['time', 'mean'])
def close(self):
"""Close database connection"""
self.client.close()
# Usage example
if __name__ == "__main__":
# Initialize InfluxDB client
db = ReactorDataStore(
url="http://localhost:8086",
token="your_influxdb_token",
org="your_org",
bucket="reactor_data"
)
try:
# Write single data point
db.write_sensor_data(
reactor_id="R101",
sensor_type="temperature",
value=85.3,
unit="degC"
)
# Batch write (efficient)
batch_data = [
{'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.5, 'unit': 'degC'},
{'reactor_id': 'R101', 'sensor_type': 'pressure', 'value': 2.45, 'unit': 'MPa'},
{'reactor_id': 'R101', 'sensor_type': 'flow_rate', 'value': 120.5, 'unit': 'L/min'},
]
db.write_batch_data(batch_data)
# Retrieve latest value
latest = db.query_latest_value("R101", "temperature")
print(f"Latest temperature: {latest['value']} {latest['unit']} at {latest['time']}")
# Retrieve past 1 hour data
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
df = db.query_time_range("R101", "temperature", start_time, end_time)
print(f"\nNumber of data points in past 1 hour: {len(df)}")
print(df.head())
# Aggregated data at 1-minute intervals
agg_df = db.query_aggregated_data("R101", "temperature", window="1m")
print(f"\n1-minute average data (last 24 hours):")
print(agg_df.tail())
finally:
db.close()
# Expected output example:
# Latest temperature: 85.5 degC at 2025-10-26T10:30:15.123456
#
# Number of data points in past 1 hour: 3600
# time value unit
# 0 2025-10-26 09:30:15.123 85.1 degC
# 1 2025-10-26 09:30:16.234 85.2 degC
# 2 2025-10-26 09:30:17.345 85.3 degC
#
# 1-minute average data (last 24 hours):
# time mean
# 1436 2025-10-26 10:26:00 85.2
# 1437 2025-10-26 10:27:00 85.3
# 1438 2025-10-26 10:28:00 85.4
# 1439 2025-10-26 10:29:00 85.5
# 1440 2025-10-26 10:30:00 85.5
Sensor data always contains noise and abnormal values. Quality checks are necessary before sending to Digital Twin.
"""
Example 4: Real-time data validation and cleaning
Quality check of sensor data, outlier detection, missing value imputation
"""
import numpy as np
import pandas as pd
from typing import Optional, Dict, Tuple
from dataclasses import dataclass
from enum import Enum
class DataQuality(Enum):
"""Data quality level"""
GOOD = "Good" # Normal data
UNCERTAIN = "Uncertain" # Uncertain (corrected)
BAD = "Bad" # Abnormal data (unusable)
@dataclass
class SensorReading:
"""Sensor reading value"""
reactor_id: str
sensor_type: str
value: float
unit: str
timestamp: str
quality: DataQuality = DataQuality.GOOD
class DataValidator:
"""Real-time data validation and cleaning
Executes physical constraint checks, statistical outlier detection,
and missing value imputation to ensure data quality.
"""
def __init__(self):
# Physical ranges for each sensor (based on process knowledge)
self.physical_ranges = {
'temperature': {'min': -50, 'max': 400, 'unit': 'degC'},
'pressure': {'min': 0, 'max': 10, 'unit': 'MPa'},
'flow_rate': {'min': 0, 'max': 500, 'unit': 'L/min'},
'ph': {'min': 0, 'max': 14, 'unit': ''},
}
# Historical data for moving average (max 100 points)
self.history: Dict[str, list] = {}
self.max_history = 100
def validate_physical_range(self, reading: SensorReading) -> Tuple[bool, str]:
"""Physical range check
Args:
reading: Sensor reading value
Returns:
(is_valid, message)
"""
if reading.sensor_type not in self.physical_ranges:
return True, "Range not defined"
range_info = self.physical_ranges[reading.sensor_type]
value = reading.value
if value < range_info['min'] or value > range_info['max']:
msg = (f"{reading.sensor_type} outside physical range: {value} "
f"(allowed range: {range_info['min']}-{range_info['max']} {range_info['unit']})")
return False, msg
return True, "OK"
def detect_outlier_zscore(self, reading: SensorReading,
threshold: float = 3.0) -> Tuple[bool, str]:
"""Statistical outlier detection by Z-score
Args:
reading: Sensor reading value
threshold: Z-score threshold (default 3.0)
Returns:
(is_outlier, message)
"""
key = f"{reading.reactor_id}_{reading.sensor_type}"
# Add to historical data
if key not in self.history:
self.history[key] = []
self.history[key].append(reading.value)
# Cannot determine with insufficient history
if len(self.history[key]) < 10:
return False, "Insufficient history"
# Retain only latest 100 points
if len(self.history[key]) > self.max_history:
self.history[key] = self.history[key][-self.max_history:]
# Calculate Z-score
values = np.array(self.history[key])
mean = np.mean(values)
std = np.std(values)
if std < 1e-6: # Standard deviation close to zero
return False, "Constant values"
z_score = abs((reading.value - mean) / std)
if z_score > threshold:
msg = f"Outlier detected: Z-score={z_score:.2f} (threshold={threshold})"
return True, msg
return False, f"OK (Z-score={z_score:.2f})"
def detect_spike(self, reading: SensorReading,
max_change_rate: float = 0.2) -> Tuple[bool, str]:
"""Detect rapid changes (spikes)
Args:
reading: Sensor reading value
max_change_rate: Allowable change rate (e.g., 0.2 = 20%)
Returns:
(is_spike, message)
"""
key = f"{reading.reactor_id}_{reading.sensor_type}"
if key not in self.history or len(self.history[key]) == 0:
return False, "No previous value"
previous_value = self.history[key][-1]
# Avoid division by zero
if abs(previous_value) < 1e-6:
return False, "Previous value near zero"
# Calculate change rate
change_rate = abs((reading.value - previous_value) / previous_value)
if change_rate > max_change_rate:
msg = f"Rapid change detected: {change_rate*100:.1f}% (allowed: {max_change_rate*100:.1f}%)"
return True, msg
return False, f"OK (change rate: {change_rate*100:.1f}%)"
def interpolate_missing_value(self, reactor_id: str,
sensor_type: str) -> Optional[float]:
"""Linear interpolation of missing value
Args:
reactor_id: Reactor ID
sensor_type: Sensor type
Returns:
Interpolated value (None if no history)
"""
key = f"{reactor_id}_{sensor_type}"
if key not in self.history or len(self.history[key]) < 2:
return None
# Interpolate with mean of latest 2 points (simplified)
recent_values = self.history[key][-2:]
interpolated = np.mean(recent_values)
return interpolated
def validate(self, reading: SensorReading) -> SensorReading:
"""Execute comprehensive validation and determine data quality
Args:
reading: Sensor reading value
Returns:
SensorReading with updated quality flag
"""
# 1. Physical range check
is_valid, msg = self.validate_physical_range(reading)
if not is_valid:
print(f"[BAD] {msg}")
reading.quality = DataQuality.BAD
return reading
# 2. Outlier detection
is_outlier, msg = self.detect_outlier_zscore(reading)
if is_outlier:
print(f"[UNCERTAIN] {msg}")
reading.quality = DataQuality.UNCERTAIN
# 3. Spike detection
is_spike, msg = self.detect_spike(reading)
if is_spike:
print(f"[UNCERTAIN] {msg}")
reading.quality = DataQuality.UNCERTAIN
return reading
# Usage example
if __name__ == "__main__":
validator = DataValidator()
# Simulate normal data
print("=== Normal Data ===")
for i in range(5):
reading = SensorReading(
reactor_id="R101",
sensor_type="temperature",
value=85.0 + np.random.normal(0, 0.5), # Mean 85°C, Std 0.5°C
unit="degC",
timestamp=f"2025-10-26T10:30:{i:02d}"
)
validated = validator.validate(reading)
print(f"Time {i}: {validated.value:.2f}°C - Quality: {validated.quality.value}")
# Data outside physical range
print("\n=== Data Outside Physical Range ===")
bad_reading = SensorReading(
reactor_id="R101",
sensor_type="temperature",
value=500.0, # Outside range (max 400°C)
unit="degC",
timestamp="2025-10-26T10:30:10"
)
validated = validator.validate(bad_reading)
print(f"Value: {validated.value}°C - Quality: {validated.quality.value}")
# Outlier (spike)
print("\n=== Spike Data ===")
spike_reading = SensorReading(
reactor_id="R101",
sensor_type="temperature",
value=120.0, # Rapid increase
unit="degC",
timestamp="2025-10-26T10:30:15"
)
validated = validator.validate(spike_reading)
print(f"Value: {validated.value}°C - Quality: {validated.quality.value}")
# Missing value interpolation
print("\n=== Missing Value Interpolation ===")
interpolated = validator.interpolate_missing_value("R101", "temperature")
if interpolated:
print(f"Interpolated value: {interpolated:.2f}°C")
# Expected output example:
# === Normal Data ===
# Time 0: 85.23°C - Quality: Good
# Time 1: 84.87°C - Quality: Good
# Time 2: 85.41°C - Quality: Good
# Time 3: 85.12°C - Quality: Good
# Time 4: 84.95°C - Quality: Good
#
# === Data Outside Physical Range ===
# [BAD] temperature outside physical range: 500.0 (allowed range: -50-400 degC)
# Value: 500.0°C - Quality: Bad
#
# === Spike Data ===
# [UNCERTAIN] Rapid change detected: 41.2% (allowed: 20.0%)
# Value: 120.0°C - Quality: Uncertain
#
# === Missing Value Interpolation ===
# Interpolated value: 85.04°C
Process data from multiple sensors in an integrated manner using Kafka-style streaming processing.
"""
Example 5: Data streaming pipeline
Real-time processing and distribution of data from multiple sensors
"""
from queue import Queue
from threading import Thread, Event
from typing import Callable, List
import time
from datetime import datetime
import json
class DataStream:
"""Real-time data stream
Receives sensor data via queue and executes multiple
processes in pipeline format.
"""
def __init__(self, name: str, buffer_size: int = 1000):
"""
Args:
name: Stream name
buffer_size: Buffer size (maximum queue length)
"""
self.name = name
self.queue = Queue(maxsize=buffer_size)
self.processors: List[Callable] = []
self.is_running = Event()
self.worker_thread = None
def add_processor(self, processor: Callable):
"""Add data processing function
Args:
processor: Function that receives and processes data processor(data)
"""
self.processors.append(processor)
def publish(self, data: dict):
"""Send data to stream
Args:
data: Sensor data dictionary
"""
try:
self.queue.put(data, timeout=1)
except Exception as e:
print(f"[{self.name}] Queue full: {e}")
def _process_loop(self):
"""Data processing loop (executed in separate thread)"""
while self.is_running.is_set():
try:
# Retrieve data from queue (1 second timeout)
data = self.queue.get(timeout=1)
# Execute all processing functions sequentially
for processor in self.processors:
try:
data = processor(data)
if data is None: # Filtered in processing
break
except Exception as e:
print(f"[{self.name}] Processing error: {e}")
break
except Exception:
# Timeout (no data) → continue
continue
def start(self):
"""Start stream processing"""
if self.worker_thread and self.worker_thread.is_alive():
print(f"[{self.name}] Already running")
return
self.is_running.set()
self.worker_thread = Thread(target=self._process_loop, daemon=True)
self.worker_thread.start()
print(f"[{self.name}] Stream processing started")
def stop(self):
"""Stop stream processing"""
self.is_running.clear()
if self.worker_thread:
self.worker_thread.join(timeout=5)
print(f"[{self.name}] Stream processing stopped")
class StreamingPipeline:
"""Pipeline managing multiple streams"""
def __init__(self):
self.streams = {}
def create_stream(self, name: str) -> DataStream:
"""Create new stream
Args:
name: Stream name
Returns:
DataStream object
"""
stream = DataStream(name)
self.streams[name] = stream
return stream
def start_all(self):
"""Start all streams"""
for stream in self.streams.values():
stream.start()
def stop_all(self):
"""Stop all streams"""
for stream in self.streams.values():
stream.stop()
# Examples of data processing functions
def enrich_with_metadata(data: dict) -> dict:
"""Add metadata"""
data['processed_at'] = datetime.now().isoformat()
data['pipeline'] = 'streaming_v1'
return data
def filter_bad_quality(data: dict) -> dict:
"""Filter bad quality data"""
if data.get('quality') == 'Bad':
print(f"Filtering bad data: {data}")
return None # None = discard this data
return data
def aggregate_1sec_window(data: dict) -> dict:
"""Aggregate in 1-second window (simplified)"""
# Actually aggregate multiple data in time window
data['aggregation'] = '1sec_mean'
return data
def send_to_digital_twin(data: dict) -> dict:
"""Send to Digital Twin model"""
print(f"Send to Digital Twin: {data['reactor_id']}.{data['sensor_type']} = {data['value']}")
# Actually call Digital Twin API
return data
def save_to_database(data: dict) -> dict:
"""Save to database"""
# Actually save to InfluxDB, etc.
print(f"Save to DB: {json.dumps(data, ensure_ascii=False)}")
return data
# Usage example
if __name__ == "__main__":
# Create pipeline
pipeline = StreamingPipeline()
# Temperature data stream
temp_stream = pipeline.create_stream("temperature_stream")
temp_stream.add_processor(enrich_with_metadata)
temp_stream.add_processor(filter_bad_quality)
temp_stream.add_processor(aggregate_1sec_window)
temp_stream.add_processor(send_to_digital_twin)
temp_stream.add_processor(save_to_database)
# Pressure data stream
pres_stream = pipeline.create_stream("pressure_stream")
pres_stream.add_processor(enrich_with_metadata)
pres_stream.add_processor(filter_bad_quality)
pres_stream.add_processor(send_to_digital_twin)
# Start all streams
pipeline.start_all()
try:
# Simulate sensor data
for i in range(10):
# Temperature data
temp_data = {
'reactor_id': 'R101',
'sensor_type': 'temperature',
'value': 85.0 + i * 0.1,
'unit': 'degC',
'quality': 'Good' if i < 8 else 'Bad', # After 8th is bad
'timestamp': datetime.now().isoformat()
}
temp_stream.publish(temp_data)
# Pressure data
pres_data = {
'reactor_id': 'R101',
'sensor_type': 'pressure',
'value': 2.45 + i * 0.01,
'unit': 'MPa',
'quality': 'Good',
'timestamp': datetime.now().isoformat()
}
pres_stream.publish(pres_data)
time.sleep(0.5) # Send data at 0.5 second intervals
# Wait for processing to complete
time.sleep(2)
finally:
# Stop all streams
pipeline.stop_all()
# Expected output example:
# [temperature_stream] Stream processing started
# [pressure_stream] Stream processing started
# Send to Digital Twin: R101.temperature = 85.0
# Save to DB: {"reactor_id": "R101", "sensor_type": "temperature", "value": 85.0, ...}
# Send to Digital Twin: R101.pressure = 2.45
# Send to Digital Twin: R101.temperature = 85.1
# Save to DB: {"reactor_id": "R101", "sensor_type": "temperature", "value": 85.1, ...}
# ...
# Filtering bad data: {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.8, 'quality': 'Bad'}
# Filtering bad data: {'reactor_id': 'R101', 'sensor_type': 'temperature', 'value': 85.9, 'quality': 'Bad'}
# [temperature_stream] Stream processing stopped
# [pressure_stream] Stream processing stopped
Implementation of Kalman filter that estimates the true state from sensor data containing noise.
"""
Example 6: State estimation using Kalman filter
Estimate true state of reactor from sensor data containing noise
"""
import numpy as np
from dataclasses import dataclass
from typing import Tuple
@dataclass
class KalmanState:
"""State of Kalman filter"""
x: np.ndarray # State vector [temperature, temperature change rate]
P: np.ndarray # Error covariance matrix
class ReactorKalmanFilter:
"""Kalman filter for reactor state estimation
Removes noise from temperature sensor and estimates true
temperature and temperature change rate.
"""
def __init__(self,
initial_temp: float,
process_noise: float = 0.01,
measurement_noise: float = 0.5):
"""
Args:
initial_temp: Initial temperature [°C]
process_noise: Process noise (system uncertainty)
measurement_noise: Measurement noise (sensor noise)
"""
# State vector: [temperature, temperature change rate]
self.state = KalmanState(
x=np.array([initial_temp, 0.0]), # [°C, °C/s]
P=np.eye(2) * 1.0 # Initial error covariance
)
# System model: x(k+1) = F * x(k) + w
# Assume dt = 1 second
dt = 1.0
self.F = np.array([
[1.0, dt], # temp(k+1) = temp(k) + rate(k) * dt
[0.0, 1.0] # rate(k+1) = rate(k) (assume constant)
])
# Observation model: z(k) = H * x(k) + v
# Only temperature is observable
self.H = np.array([[1.0, 0.0]])
# Process noise covariance
self.Q = np.array([
[process_noise, 0.0],
[0.0, process_noise]
])
# Measurement noise covariance
self.R = np.array([[measurement_noise]])
def predict(self) -> Tuple[float, float]:
"""Prediction step (state prediction based on system model)
Returns:
(predicted_temp, predicted_rate) Predicted temperature and rate
"""
# State prediction: x_pred = F * x
x_pred = self.F @ self.state.x
# Error covariance prediction: P_pred = F * P * F^T + Q
P_pred = self.F @ self.state.P @ self.F.T + self.Q
# Update predicted state
self.state.x = x_pred
self.state.P = P_pred
return self.state.x[0], self.state.x[1]
def update(self, measurement: float) -> Tuple[float, float]:
"""Update step (correct state with measurement value)
Args:
measurement: Sensor measurement value (temperature) [°C]
Returns:
(estimated_temp, estimated_rate) Estimated temperature and rate
"""
# Innovation (difference between prediction and measurement): y = z - H * x_pred
z = np.array([measurement])
y = z - self.H @ self.state.x
# Innovation covariance: S = H * P_pred * H^T + R
S = self.H @ self.state.P @ self.H.T + self.R
# Kalman gain: K = P_pred * H^T * S^-1
K = self.state.P @ self.H.T @ np.linalg.inv(S)
# State update: x = x_pred + K * y
self.state.x = self.state.x + K @ y
# Error covariance update: P = (I - K * H) * P_pred
I = np.eye(2)
self.state.P = (I - K @ self.H) @ self.state.P
return self.state.x[0], self.state.x[1]
def estimate(self, measurement: float) -> Tuple[float, float]:
"""Execute one cycle of prediction → update
Args:
measurement: Sensor measurement value [°C]
Returns:
(estimated_temp, estimated_rate) Estimated temperature and rate
"""
# Predict
self.predict()
# Update
return self.update(measurement)
# Usage example
if __name__ == "__main__":
# Initialize Kalman filter
kf = ReactorKalmanFilter(
initial_temp=85.0,
process_noise=0.01, # Stable process
measurement_noise=0.5 # Moderate sensor noise
)
# True temperature profile (ramp heating: 85°C → 95°C)
time_steps = 50
true_temps = np.linspace(85.0, 95.0, time_steps)
# Sensor measurements (with noise)
np.random.seed(42)
noise = np.random.normal(0, 0.5, time_steps) # Standard deviation 0.5°C
measured_temps = true_temps + noise
print("Time\tTrue\tMeasured\tEstimated\tRate")
print("-" * 60)
estimated_temps = []
estimated_rates = []
for t in range(time_steps):
# Estimate with Kalman filter
est_temp, est_rate = kf.estimate(measured_temps[t])
estimated_temps.append(est_temp)
estimated_rates.append(est_rate)
if t % 5 == 0: # Display every 5 seconds
print(f"{t}\t{true_temps[t]:.2f}\t{measured_temps[t]:.2f}\t\t{est_temp:.2f}\t\t{est_rate:.3f}")
# Accuracy evaluation
mse_raw = np.mean((measured_temps - true_temps) ** 2)
mse_filtered = np.mean((np.array(estimated_temps) - true_temps) ** 2)
print(f"\n=== Accuracy Evaluation ===")
print(f"MSE of raw data: {mse_raw:.4f}")
print(f"MSE after filtering: {mse_filtered:.4f}")
print(f"Accuracy improvement rate: {(1 - mse_filtered/mse_raw) * 100:.1f}%")
# Expected output example:
# Time True Measured Estimated Rate
# ------------------------------------------------------------
# 0 85.00 85.25 85.13 0.051
# 5 86.02 85.78 85.96 0.192
# 10 87.04 86.92 87.01 0.204
# 15 88.06 88.31 88.08 0.206
# 20 89.08 89.23 89.10 0.204
# 25 90.10 90.35 90.13 0.203
# 30 91.12 90.98 91.14 0.204
# 35 92.14 92.45 92.17 0.203
# 40 93.16 93.08 93.18 0.204
# 45 94.18 94.52 94.21 0.203
#
# === Accuracy Evaluation ===
# MSE of raw data: 0.2435
# MSE after filtering: 0.0089
# Accuracy improvement rate: 96.3%
Complete implementation that integrates all elements to realize real-time synchronization with Digital Twin.
"""
Example 7: Digital Twin synchronization loop
Complete cycle of sensor data acquisition → validation → state estimation → model update
"""
import time
from datetime import datetime
from typing import Dict, Optional
import numpy as np
class DigitalTwinModel:
"""Simplified physical model of Digital Twin
Calculates reaction progress from temperature, pressure, flow rate
using reactor heat balance model.
"""
def __init__(self, reactor_id: str):
self.reactor_id = reactor_id
# Model parameters
self.heat_capacity = 4200 # J/(kg·K)
self.mass = 1000 # kg
self.heat_transfer_coeff = 500 # W/(m²·K)
self.area = 10 # m²
# State variables
self.state = {
'temperature': 85.0, # °C
'pressure': 2.5, # MPa
'flow_rate': 120.0, # L/min
'conversion': 0.0, # Reaction progress (0-1)
'last_update': datetime.now()
}
def update_from_sensors(self, sensor_data: Dict[str, float]):
"""Update model with sensor data
Args:
sensor_data: {'temperature': 85.3, 'pressure': 2.45, 'flow_rate': 120.5}
"""
# Update model state with sensor values
if 'temperature' in sensor_data:
self.state['temperature'] = sensor_data['temperature']
if 'pressure' in sensor_data:
self.state['pressure'] = sensor_data['pressure']
if 'flow_rate' in sensor_data:
self.state['flow_rate'] = sensor_data['flow_rate']
self.state['last_update'] = datetime.now()
def calculate_reaction_conversion(self) -> float:
"""Calculate reaction progress (simplified model)
Integration of reaction rate based on Arrhenius equation
Returns:
Reaction progress (0-1)
"""
T = self.state['temperature'] + 273.15 # K
P = self.state['pressure'] * 1e6 # Pa
# Arrhenius parameters (hypothetical values)
A = 1e10 # Frequency factor
Ea = 80000 # Activation energy [J/mol]
R = 8.314 # Gas constant [J/(mol·K)]
# Reaction rate constant
k = A * np.exp(-Ea / (R * T))
# Pressure correction (simplified)
k_eff = k * (P / 2.5e6) ** 0.5
# Reaction progress (approximation of time integration)
dt = 1.0 # 1 second
dX = k_eff * (1 - self.state['conversion']) * dt
new_conversion = min(self.state['conversion'] + dX, 1.0)
self.state['conversion'] = new_conversion
return new_conversion
def predict_next_state(self, time_horizon: float = 60) -> Dict:
"""Predict future state
Args:
time_horizon: Prediction time [seconds]
Returns:
Predicted state dictionary
"""
# Predict future conversion from current reaction rate
predicted_conversion = min(
self.state['conversion'] + 0.01 * time_horizon,
1.0
)
# For exothermic reaction, predict temperature rise
heat_released = predicted_conversion * 100000 # J (hypothetical value)
temp_rise = heat_released / (self.mass * self.heat_capacity)
predicted_temp = self.state['temperature'] + temp_rise
return {
'temperature': predicted_temp,
'conversion': predicted_conversion,
'time_horizon': time_horizon
}
def get_state(self) -> Dict:
"""Get current state"""
return self.state.copy()
class DigitalTwinSyncLoop:
"""Digital Twin synchronization loop
Executes sensor data acquisition → validation → state estimation → model update
in real-time.
"""
def __init__(self, reactor_id: str, update_interval: float = 1.0):
"""
Args:
reactor_id: Reactor ID
update_interval: Update interval [seconds]
"""
self.reactor_id = reactor_id
self.update_interval = update_interval
# Digital Twin model
self.twin_model = DigitalTwinModel(reactor_id)
# Kalman filter (for temperature)
self.temp_filter = ReactorKalmanFilter(
initial_temp=85.0,
process_noise=0.01,
measurement_noise=0.5
)
# Data validator
self.validator = DataValidator()
self.is_running = False
self.cycle_count = 0
def fetch_sensor_data(self) -> Dict[str, float]:
"""Fetch sensor data (actually from MQTT/OPC UA)
Returns:
{'temperature': 85.3, 'pressure': 2.45, 'flow_rate': 120.5}
"""
# Here we generate mock data
# Actually fetch from MQTT client or OPC UA client
base_temp = 85.0 + self.cycle_count * 0.1 # Slowly heat up
noise = np.random.normal(0, 0.5)
return {
'temperature': base_temp + noise,
'pressure': 2.5 + np.random.normal(0, 0.02),
'flow_rate': 120.0 + np.random.normal(0, 1.0)
}
def run_single_cycle(self):
"""Execute one cycle of synchronization loop"""
cycle_start = time.time()
# 1. Fetch sensor data
raw_sensor_data = self.fetch_sensor_data()
# 2. Data validation
temp_reading = SensorReading(
reactor_id=self.reactor_id,
sensor_type="temperature",
value=raw_sensor_data['temperature'],
unit="degC",
timestamp=datetime.now().isoformat()
)
validated_reading = self.validator.validate(temp_reading)
# 3. State estimation with Kalman filter
if validated_reading.quality != DataQuality.BAD:
est_temp, est_rate = self.temp_filter.estimate(validated_reading.value)
filtered_data = {
'temperature': est_temp,
'pressure': raw_sensor_data['pressure'],
'flow_rate': raw_sensor_data['flow_rate']
}
else:
# Use previous value for bad data
filtered_data = None
# 4. Update Digital Twin model
if filtered_data:
self.twin_model.update_from_sensors(filtered_data)
# 5. Calculate reaction progress
conversion = self.twin_model.calculate_reaction_conversion()
# 6. Future prediction
prediction = self.twin_model.predict_next_state(time_horizon=60)
# 7. Display results
state = self.twin_model.get_state()
cycle_time = (time.time() - cycle_start) * 1000 # ms
print(f"[Cycle {self.cycle_count}] "
f"Temp: {state['temperature']:.2f}°C "
f"(filtered: {est_temp:.2f}°C), "
f"Pressure: {state['pressure']:.2f} MPa, "
f"Conversion: {conversion*100:.1f}%, "
f"Prediction(60s): {prediction['conversion']*100:.1f}% "
f"({cycle_time:.1f}ms)")
self.cycle_count += 1
def run(self, duration: Optional[float] = None):
"""Start synchronization loop
Args:
duration: Execution time [seconds] (infinite loop if None)
"""
self.is_running = True
start_time = time.time()
print(f"=== Digital Twin Synchronization Loop Started ===")
print(f"Reactor ID: {self.reactor_id}")
print(f"Update Interval: {self.update_interval}s")
print("-" * 80)
try:
while self.is_running:
# Execute one cycle
self.run_single_cycle()
# Check termination condition
if duration and (time.time() - start_time) >= duration:
break
# Wait until next update
time.sleep(self.update_interval)
except KeyboardInterrupt:
print("\n\nStopped by keyboard interrupt")
finally:
self.is_running = False
print(f"\n=== Synchronization Loop Ended ===")
print(f"Total cycles: {self.cycle_count}")
print(f"Execution time: {time.time() - start_time:.1f} seconds")
# Usage example
if __name__ == "__main__":
# Create Digital Twin synchronization loop
sync_loop = DigitalTwinSyncLoop(
reactor_id="R101",
update_interval=1.0 # Update every 1 second
)
# Execute for 30 seconds
sync_loop.run(duration=30)
# Expected output example:
# === Digital Twin Synchronization Loop Started ===
# Reactor ID: R101
# Update Interval: 1.0s
# --------------------------------------------------------------------------------
# [Cycle 0] Temp: 85.23°C (filtered: 85.13°C), Pressure: 2.51 MPa, Conversion: 0.2%, Prediction(60s): 0.8% (2.3ms)
# [Cycle 1] Temp: 85.34°C (filtered: 85.28°C), Pressure: 2.49 MPa, Conversion: 0.4%, Prediction(60s): 1.0% (1.8ms)
# [Cycle 2] Temp: 85.41°C (filtered: 85.39°C), Pressure: 2.52 MPa, Conversion: 0.6%, Prediction(60s): 1.2% (1.7ms)
# ...
# [Cycle 29] Temp: 88.12°C (filtered: 88.09°C), Pressure: 2.48 MPa, Conversion: 6.2%, Prediction(60s): 6.8% (1.9ms)
#
# === Synchronization Loop Ended ===
# Total cycles: 30
# Execution time: 30.1 seconds
Upon completing this chapter, you will be able to implement the following:
Q1: What is the main difference between MQTT and OPC UA? Choose the most appropriate answer.
a) MQTT is for industrial use, OPC UA is for IoT
b) MQTT is lightweight and suitable for IoT, OPC UA is industrial automation standard
c) MQTT is paid, OPC UA is free
d) MQTT is old standard, OPC UA is new standard
Correct answer: b) MQTT is lightweight and suitable for IoT, OPC UA is industrial automation standard
Explanation:
Q2: If sensor data has a Z-score of 3.5, should this data be treated as an outlier? Explain the judgment and reasons when the Z-score threshold is set to 3.0.
Correct answer: Yes, it should be treated as an outlier.
Reasons:
Practical response:
Q3: In a chemical plant where 1000 sensors send data at 1-second intervals, the Digital Twin synchronization loop update interval is set to 1 second. If the average processing time per cycle is 800ms, will the system operate stably? If there are problems, propose three improvement measures.
Correct answer: High possibility of instability (processing time 800ms ≈ update interval 1000ms with insufficient margin)
Problems:
Improvement measures:
Implementation example (parallel processing):
from concurrent.futures import ThreadPoolExecutor
def process_sensor_group(sensor_ids):
# Process sensor group
pass
# Divide 1000 sensors into 4 groups
groups = [sensors[i:i+250] for i in range(0, 1000, 250)]
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(process_sensor_group, groups)
# Processing time: 800ms → 200ms
Once the foundation of real-time data integration is established, we will next learn Digital Twin modeling methods. Through hybrid modeling combining physical models (first principles) and data-driven models (machine learning), we will realize high-precision prediction and control.