This chapter covers the fundamentals of Digital Twin Fundamentals, which digital twin concepts and definitions. You will learn differences between digital shadows, Design digital twin architectures, and Design state representations.
Learning Objectives
By reading this chapter, you will master the following:
- ✅ Understand the concept and definition of digital twins
- ✅ Explain the differences between digital shadows, digital twins, and digital threads
- ✅ Design digital twin architectures
- ✅ Design state representations and data models
- ✅ Understand model fidelity levels (L1-L5) and select appropriate levels
- ✅ Build simple digital twin prototypes in Python
1.1 Digital Twin Concepts and Definitions
What is a Digital Twin?
Digital Twin is a virtual replica of a physical system that synchronizes data in real-time and enables simulation, analysis, and optimization through digital models.
"A digital twin is a digital representation of a physical asset, integrating IoT sensor data and physical models, continuously updated throughout the asset's lifecycle." - Gartner
Digital Shadow vs Digital Twin vs Digital Thread
| Concept | Data Flow | Main Functions | Examples |
|---|---|---|---|
| Digital Shadow | Physical → Digital (one-way) | Monitoring, visualization, historical data storage | Process logs, dashboards |
| Digital Twin | Physical ⇄ Digital (bidirectional) | Prediction, optimization, control, what-if analysis | Real-time optimization, MPC |
| Digital Thread | Integration across entire lifecycle | Consistency from design → manufacturing → operation | PLM integration, traceability |
Value of Digital Twins in Process Industries
- Operational Optimization: Calculate optimal operating conditions in real-time to reduce energy costs
- Predictive Maintenance: Planned maintenance based on failure prediction to reduce downtime
- Virtual Commissioning: Virtual testing of new equipment to reduce risks and costs
- Operator Training: Training in a safe virtual environment
- Process Improvement: Discover improvement opportunities through what-if scenario analysis
1.2 Digital Twin Architecture Design
4-Layer Architecture Model
Chemical Reactor, Sensors, Actuators] -->|Real-time Data| B[Data Layer
IoT Gateway, MQTT, OPC UA, Time-Series DB] B -->|Preprocessed Data| C[Model Layer
Physical Model + ML Model
State Estimation, Prediction] C -->|Optimization Results| D[Application Layer
RTO, MPC, Dashboards, Alerts] D -.->|Control Commands| A style A fill:#ffecb3 style B fill:#c8e6c9 style C fill:#b3e5fc style D fill:#e1bee7
Code Example 1: Digital Twin Architecture Design Pattern
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime
import numpy as np
@dataclass
class PhysicalAsset:
"""Definition of physical system"""
asset_id: str
asset_type: str # 'reactor', 'distillation_column', etc.
location: str
sensors: Dict[str, str] # sensor_name -> sensor_id
actuators: Dict[str, str] # actuator_name -> actuator_id
def __repr__(self):
return f"PhysicalAsset(id={self.asset_id}, type={self.asset_type}, sensors={len(self.sensors)})"
@dataclass
class State:
"""State representation of digital twin"""
timestamp: datetime
temperature: float # [°C]
pressure: float # [bar]
flow_rate: float # [kg/h]
concentration: float # [mol/L]
def to_dict(self) -> Dict[str, Any]:
"""Convert state to JSON format"""
return {
'timestamp': self.timestamp.isoformat(),
'temperature': self.temperature,
'pressure': self.pressure,
'flow_rate': self.flow_rate,
'concentration': self.concentration
}
@dataclass
class DigitalTwinArchitecture:
"""Overall architecture of digital twin"""
twin_id: str
physical_asset: PhysicalAsset
current_state: Optional[State] = None
state_history: List[State] = field(default_factory=list)
fidelity_level: int = 1 # 1-5
def update_state(self, new_state: State) -> None:
"""Update state from physical system"""
self.current_state = new_state
self.state_history.append(new_state)
print(f"[{new_state.timestamp}] State updated: T={new_state.temperature:.1f}°C, "
f"P={new_state.pressure:.1f}bar")
def get_state_vector(self) -> np.ndarray:
"""Get state vector (for machine learning models)"""
if self.current_state is None:
return np.array([])
return np.array([
self.current_state.temperature,
self.current_state.pressure,
self.current_state.flow_rate,
self.current_state.concentration
])
def get_state_history_matrix(self, window_size: int = 10) -> np.ndarray:
"""Get time-series state matrix"""
if len(self.state_history) < window_size:
window_size = len(self.state_history)
recent_states = self.state_history[-window_size:]
state_matrix = np.array([
[s.temperature, s.pressure, s.flow_rate, s.concentration]
for s in recent_states
])
return state_matrix
# Usage example: Digital twin design for a chemical reactor
reactor_asset = PhysicalAsset(
asset_id="REACTOR-001",
asset_type="CSTR",
location="Plant-A, Unit-1",
sensors={
'temperature': 'TI-101',
'pressure': 'PI-102',
'flow_in': 'FI-103',
'concentration': 'AI-104'
},
actuators={
'cooling_valve': 'CV-201',
'feed_valve': 'FV-202'
}
)
digital_twin = DigitalTwinArchitecture(
twin_id="DT-REACTOR-001",
physical_asset=reactor_asset,
fidelity_level=3
)
# Set initial state
initial_state = State(
timestamp=datetime.now(),
temperature=185.0,
pressure=3.5,
flow_rate=1200.0,
concentration=2.5
)
digital_twin.update_state(initial_state)
print(f"\nDigital Twin: {digital_twin.twin_id}")
print(f"Physical Asset: {digital_twin.physical_asset}")
print(f"Fidelity Level: L{digital_twin.fidelity_level}")
print(f"Current State Vector: {digital_twin.get_state_vector()}")
Example Output:
[2025-10-26 10:30:00] State updated: T=185.0°C, P=3.5bar
Digital Twin: DT-REACTOR-001
Physical Asset: PhysicalAsset(id=REACTOR-001, type=CSTR, sensors=4)
Fidelity Level: L3
Current State Vector: [185. 3.5 1200. 2.5]
Explanation: Digital twin architecture integrates physical assets, state representation, and history management. This dataclass design ensures extensibility and maintainability.
Code Example 2: State Representation and Data Model Design
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
import json
from datetime import datetime, timedelta
from typing import Dict, Any, List
import numpy as np
class DigitalTwinStateModel:
"""State representation and serialization of digital twin"""
def __init__(self, twin_id: str):
self.twin_id = twin_id
self.state_variables = {
'process': {}, # Process variables
'equipment': {}, # Equipment status
'quality': {} # Quality metrics
}
self.metadata = {
'last_update': None,
'data_quality': 1.0, # 0.0-1.0
'sync_status': 'initialized'
}
def update_process_variables(self, variables: Dict[str, float]) -> None:
"""Update process variables"""
self.state_variables['process'].update(variables)
self.metadata['last_update'] = datetime.now().isoformat()
self.metadata['sync_status'] = 'synced'
def update_equipment_status(self, status: Dict[str, Any]) -> None:
"""Update equipment status"""
self.state_variables['equipment'].update(status)
def update_quality_metrics(self, metrics: Dict[str, float]) -> None:
"""Update quality metrics"""
self.state_variables['quality'].update(metrics)
def to_json(self) -> str:
"""Serialization to JSON format"""
state_dict = {
'twin_id': self.twin_id,
'state_variables': self.state_variables,
'metadata': self.metadata
}
return json.dumps(state_dict, indent=2)
@classmethod
def from_json(cls, json_str: str) -> 'DigitalTwinStateModel':
"""Deserialization from JSON"""
data = json.loads(json_str)
twin = cls(data['twin_id'])
twin.state_variables = data['state_variables']
twin.metadata = data['metadata']
return twin
def validate_state(self) -> Dict[str, Any]:
"""State validation"""
validation_results = {
'valid': True,
'warnings': [],
'errors': []
}
# Range check for process variables
process_vars = self.state_variables['process']
if 'temperature' in process_vars:
T = process_vars['temperature']
if T < 50 or T > 250:
validation_results['errors'].append(
f"Temperature out of range: {T}°C (valid: 50-250°C)"
)
validation_results['valid'] = False
if 'pressure' in process_vars:
P = process_vars['pressure']
if P < 1 or P > 10:
validation_results['errors'].append(
f"Pressure out of range: {P} bar (valid: 1-10 bar)"
)
validation_results['valid'] = False
# Data freshness check
if self.metadata['last_update']:
last_update = datetime.fromisoformat(self.metadata['last_update'])
age = (datetime.now() - last_update).total_seconds()
if age > 60: # Older than 60 seconds
validation_results['warnings'].append(
f"State data is stale: {age:.0f} seconds old"
)
return validation_results
# Usage example
twin_state = DigitalTwinStateModel(twin_id="DT-REACTOR-001")
# Update process variables
twin_state.update_process_variables({
'temperature': 185.0,
'pressure': 3.5,
'flow_rate': 1200.0,
'concentration': 2.5,
'pH': 7.2
})
# Update equipment status
twin_state.update_equipment_status({
'agitator_rpm': 250,
'cooling_valve_opening': 45.0, # %
'pump_status': 'running'
})
# Update quality metrics
twin_state.update_quality_metrics({
'yield': 89.5, # %
'purity': 98.2, # %
'conversion': 92.0 # %
})
# Export in JSON format
json_state = twin_state.to_json()
print("Digital Twin State (JSON):")
print(json_state)
# State validation
validation = twin_state.validate_state()
print(f"\nValidation Results:")
print(f" Valid: {validation['valid']}")
print(f" Warnings: {validation['warnings']}")
print(f" Errors: {validation['errors']}")
# Restoration from JSON
restored_twin = DigitalTwinStateModel.from_json(json_state)
print(f"\nRestored Twin ID: {restored_twin.twin_id}")
print(f"Process Temperature: {restored_twin.state_variables['process']['temperature']}°C")
Example Output:
Digital Twin State (JSON):
{
"twin_id": "DT-REACTOR-001",
"state_variables": {
"process": {
"temperature": 185.0,
"pressure": 3.5,
"flow_rate": 1200.0,
"concentration": 2.5,
"pH": 7.2
},
"equipment": {
"agitator_rpm": 250,
"cooling_valve_opening": 45.0,
"pump_status": "running"
},
"quality": {
"yield": 89.5,
"purity": 98.2,
"conversion": 92.0
}
},
"metadata": {
"last_update": "2025-10-26T10:30:15.123456",
"data_quality": 1.0,
"sync_status": "synced"
}
}
Validation Results:
Valid: True
Warnings: []
Errors: []
Restored Twin ID: DT-REACTOR-001
Process Temperature: 185.0°C
Explanation: State representation design hierarchically manages process variables, equipment status, and quality metrics. JSON serialization facilitates database storage and API communication.
Code Example 3: Model Fidelity Levels (L1-L5)
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
from abc import ABC, abstractmethod
from typing import Dict, Any
import numpy as np
from datetime import datetime
class DigitalTwinFidelityLevel(ABC):
"""Abstract base class for digital twin fidelity levels"""
def __init__(self, twin_id: str, level: int):
self.twin_id = twin_id
self.level = level
self.state_history = []
@abstractmethod
def process_sensor_data(self, sensor_data: Dict[str, float]) -> Dict[str, Any]:
"""Process sensor data"""
pass
@abstractmethod
def predict(self, horizon: int) -> np.ndarray:
"""Predict future states"""
pass
class L1_DigitalShadow(DigitalTwinFidelityLevel):
"""L1: Digital Shadow - Data logging only"""
def __init__(self, twin_id: str):
super().__init__(twin_id, level=1)
def process_sensor_data(self, sensor_data: Dict[str, float]) -> Dict[str, Any]:
"""Save sensor data and calculate basic statistics"""
self.state_history.append({
'timestamp': datetime.now(),
'data': sensor_data
})
# Calculate basic statistics
if len(self.state_history) > 0:
recent_temps = [s['data'].get('temperature', 0) for s in self.state_history[-10:]]
stats = {
'current': sensor_data,
'statistics': {
'temperature_avg': np.mean(recent_temps),
'temperature_std': np.std(recent_temps)
}
}
return stats
return {'current': sensor_data}
def predict(self, horizon: int) -> np.ndarray:
"""No prediction capability (data logging only)"""
return np.array([])
class L3_PhysicsBasedTwin(DigitalTwinFidelityLevel):
"""L3: Physics-based model + parameter estimation"""
def __init__(self, twin_id: str):
super().__init__(twin_id, level=3)
# Simple CSTR model parameters
self.k_reaction = 0.05 # Reaction rate constant [1/min]
self.k_cooling = 0.02 # Cooling rate constant [1/min]
self.T_coolant = 60.0 # Coolant temperature [°C]
def process_sensor_data(self, sensor_data: Dict[str, float]) -> Dict[str, Any]:
"""Process sensor data and update parameters"""
self.state_history.append({
'timestamp': datetime.now(),
'data': sensor_data
})
# Parameter estimation (simplified version)
if len(self.state_history) > 5:
self._estimate_parameters()
return {
'current': sensor_data,
'model_parameters': {
'k_reaction': self.k_reaction,
'k_cooling': self.k_cooling
}
}
def _estimate_parameters(self) -> None:
"""Simplified parameter estimation"""
# In practice, use least squares or Kalman filter
recent_data = self.state_history[-5:]
temps = [s['data'].get('temperature', 185) for s in recent_data]
# Estimate cooling constant from temperature change rate
if len(temps) > 1:
dT = np.diff(temps)
self.k_cooling = abs(np.mean(dT)) / (temps[0] - self.T_coolant) * 60 # Simple estimation
def predict(self, horizon: int) -> np.ndarray:
"""State prediction using physical model"""
if not self.state_history:
return np.array([])
current_state = self.state_history[-1]['data']
T0 = current_state.get('temperature', 185.0)
C0 = current_state.get('concentration', 2.5)
# Simple ODE model (Euler method)
dt = 1.0 # time step [min]
predictions = []
T, C = T0, C0
for _ in range(horizon):
# dT/dt = -k_cooling * (T - T_coolant) - ΔH * k_reaction * C
dT = -self.k_cooling * (T - self.T_coolant) - 15 * self.k_reaction * C
# dC/dt = -k_reaction * C
dC = -self.k_reaction * C
T += dT * dt
C += dC * dt
predictions.append([T, C])
return np.array(predictions)
class L5_AutonomousTwin(DigitalTwinFidelityLevel):
"""L5: Autonomous optimization + closed-loop control"""
def __init__(self, twin_id: str):
super().__init__(twin_id, level=5)
self.physics_model = L3_PhysicsBasedTwin(twin_id)
self.ml_correction = None # ML correction (to be implemented in Chapter 3)
self.optimizer = None # Optimization engine (to be implemented in Chapter 4)
def process_sensor_data(self, sensor_data: Dict[str, float]) -> Dict[str, Any]:
"""Process sensor data + autonomous optimization"""
# Update physics model
physics_result = self.physics_model.process_sensor_data(sensor_data)
# Autonomous optimization (simplified)
optimal_setpoint = self._optimize_setpoint(sensor_data)
return {
'current': sensor_data,
'physics_model': physics_result['model_parameters'],
'optimal_setpoint': optimal_setpoint,
'control_action': self._compute_control_action(sensor_data, optimal_setpoint)
}
def _optimize_setpoint(self, current_state: Dict[str, float]) -> Dict[str, float]:
"""Calculate optimal setpoint"""
# Simplified: maximize yield
T_current = current_state.get('temperature', 185.0)
T_optimal = 190.0 # Simplified optimal temperature
return {
'temperature': T_optimal,
'pressure': 3.5,
'flow_rate': 1200.0
}
def _compute_control_action(self, current: Dict[str, float],
setpoint: Dict[str, float]) -> Dict[str, float]:
"""Compute control action (simplified PI control)"""
T_current = current.get('temperature', 185.0)
T_setpoint = setpoint['temperature']
error = T_setpoint - T_current
Kp = 2.0 # Proportional gain
cooling_valve_adjustment = Kp * error
return {
'cooling_valve_change': cooling_valve_adjustment,
'control_mode': 'automatic'
}
def predict(self, horizon: int) -> np.ndarray:
"""Hybrid model prediction"""
return self.physics_model.predict(horizon)
# Usage example: Comparison of different fidelity levels
print("=" * 70)
print("Comparison of Digital Twin Fidelity Levels")
print("=" * 70)
# L1: Digital Shadow
l1_twin = L1_DigitalShadow("DT-L1-REACTOR-001")
sensor_data = {'temperature': 185.0, 'pressure': 3.5, 'concentration': 2.5}
l1_result = l1_twin.process_sensor_data(sensor_data)
print(f"\nL1 (Digital Shadow):")
print(f" Capabilities: Data logging, basic statistics")
print(f" Result: {l1_result}")
# L3: Physics-based
l3_twin = L3_PhysicsBasedTwin("DT-L3-REACTOR-001")
l3_result = l3_twin.process_sensor_data(sensor_data)
predictions_l3 = l3_twin.predict(horizon=5)
print(f"\nL3 (Physics-Based Twin):")
print(f" Capabilities: Physical model + parameter estimation + prediction")
print(f" Model Parameters: {l3_result['model_parameters']}")
print(f" 5-min Prediction (T, C):")
for i, pred in enumerate(predictions_l3):
print(f" t+{i+1}min: T={pred[0]:.2f}°C, C={pred[1]:.3f}mol/L")
# L5: Autonomous optimization
l5_twin = L5_AutonomousTwin("DT-L5-REACTOR-001")
l5_result = l5_twin.process_sensor_data(sensor_data)
print(f"\nL5 (Autonomous Twin):")
print(f" Capabilities: Autonomous optimization + closed-loop control")
print(f" Optimal Setpoint: {l5_result['optimal_setpoint']}")
print(f" Control Action: {l5_result['control_action']}")
Example Output:
======================================================================
Comparison of Digital Twin Fidelity Levels
======================================================================
L1 (Digital Shadow):
Capabilities: Data logging, basic statistics
Result: {'current': {'temperature': 185.0, 'pressure': 3.5, 'concentration': 2.5}}
L3 (Physics-Based Twin):
Capabilities: Physical model + parameter estimation + prediction
Model Parameters: {'k_reaction': 0.05, 'k_cooling': 0.02}
5-min Prediction (T, C):
t+1min: T=182.50°C, C=2.438mol/L
t+2min: T=180.05°C, C=2.376mol/L
t+3min: T=177.64°C, C=2.315mol/L
t+4min: T=175.28°C, C=2.255mol/L
t+5min: T=172.96°C, C=2.196mol/L
L5 (Autonomous Twin):
Capabilities: Autonomous optimization + closed-loop control
Optimal Setpoint: {'temperature': 190.0, 'pressure': 3.5, 'flow_rate': 1200.0}
Control Action: {'cooling_valve_change': 10.0, 'control_mode': 'automatic'}
Explanation: Fidelity levels L1-L5 represent the functional maturity of digital twins. L1 is simple data logging, L3 is physical models and prediction, L5 achieves autonomous optimization and closed-loop control. Select the appropriate level based on project stage.
Code Example 4: Digital Twin Lifecycle Management
from enum import Enum
from datetime import datetime
from typing import List, Dict, Any
class LifecyclePhase(Enum):
"""Digital twin lifecycle phases"""
DESIGN = "design"
IMPLEMENTATION = "implementation"
VERIFICATION = "verification"
OPERATION = "operation"
MAINTENANCE = "maintenance"
DECOMMISSIONED = "decommissioned"
class DigitalTwinLifecycle:
"""Digital twin lifecycle management"""
def __init__(self, twin_id: str):
self.twin_id = twin_id
self.current_phase = LifecyclePhase.DESIGN
self.phase_history = []
self.validation_metrics = {}
self.maintenance_log = []
self._log_phase_change(LifecyclePhase.DESIGN, "Twin created")
def _log_phase_change(self, new_phase: LifecyclePhase, reason: str) -> None:
"""Log phase changes"""
log_entry = {
'timestamp': datetime.now(),
'from_phase': self.current_phase.value if self.current_phase else None,
'to_phase': new_phase.value,
'reason': reason
}
self.phase_history.append(log_entry)
self.current_phase = new_phase
print(f"[{log_entry['timestamp']}] Phase: {new_phase.value.upper()} - {reason}")
def complete_design(self, requirements: Dict[str, Any]) -> bool:
"""Complete design phase"""
if self.current_phase != LifecyclePhase.DESIGN:
print("Error: Not in DESIGN phase")
return False
# Requirement check
required_keys = ['sensors', 'model_type', 'update_frequency']
if all(key in requirements for key in required_keys):
self._log_phase_change(LifecyclePhase.IMPLEMENTATION,
f"Design completed with {len(requirements['sensors'])} sensors")
return True
else:
print(f"Error: Missing requirements - {required_keys}")
return False
def complete_implementation(self, components: Dict[str, Any]) -> bool:
"""Complete implementation phase"""
if self.current_phase != LifecyclePhase.IMPLEMENTATION:
print("Error: Not in IMPLEMENTATION phase")
return False
# Component check
if 'data_pipeline' in components and 'model' in components:
self._log_phase_change(LifecyclePhase.VERIFICATION,
"Implementation completed")
return True
return False
def run_verification(self, test_results: Dict[str, float]) -> bool:
"""Run verification phase"""
if self.current_phase != LifecyclePhase.VERIFICATION:
print("Error: Not in VERIFICATION phase")
return False
# Evaluate validation metrics
self.validation_metrics = test_results
rmse = test_results.get('rmse', float('inf'))
r2_score = test_results.get('r2_score', 0.0)
latency = test_results.get('latency_ms', float('inf'))
# Pass criteria
passed = (rmse < 5.0 and r2_score > 0.85 and latency < 1000)
if passed:
self._log_phase_change(LifecyclePhase.OPERATION,
f"Verification passed: RMSE={rmse:.2f}, R²={r2_score:.3f}")
return True
else:
print(f"Verification failed: RMSE={rmse:.2f}, R²={r2_score:.3f}, Latency={latency}ms")
return False
def log_maintenance(self, maintenance_type: str, description: str) -> None:
"""Log maintenance work"""
log_entry = {
'timestamp': datetime.now(),
'type': maintenance_type,
'description': description,
'phase': self.current_phase.value
}
self.maintenance_log.append(log_entry)
print(f"[Maintenance] {maintenance_type}: {description}")
def get_lifecycle_summary(self) -> Dict[str, Any]:
"""Get lifecycle summary"""
return {
'twin_id': self.twin_id,
'current_phase': self.current_phase.value,
'total_phases_completed': len(self.phase_history),
'validation_metrics': self.validation_metrics,
'maintenance_count': len(self.maintenance_log)
}
# Usage example: Digital twin lifecycle management
lifecycle = DigitalTwinLifecycle(twin_id="DT-REACTOR-001")
print("\n" + "=" * 70)
print("Digital Twin Lifecycle Management")
print("=" * 70 + "\n")
# Complete design phase
design_requirements = {
'sensors': ['TI-101', 'PI-102', 'FI-103', 'AI-104'],
'model_type': 'hybrid_physics_ml',
'update_frequency': '1s',
'fidelity_level': 3
}
lifecycle.complete_design(design_requirements)
# Complete implementation phase
implementation_components = {
'data_pipeline': 'MQTT + InfluxDB',
'model': 'CSTR physics + LightGBM correction',
'api': 'FastAPI REST endpoint'
}
lifecycle.complete_implementation(implementation_components)
# Run verification phase
verification_results = {
'rmse': 3.2,
'r2_score': 0.92,
'mae': 2.1,
'latency_ms': 450,
'coverage': 0.95
}
lifecycle.run_verification(verification_results)
# Maintenance during operation
lifecycle.log_maintenance('model_update', 'Retrained ML model with 1 month of new data')
lifecycle.log_maintenance('sensor_calibration', 'Calibrated TI-101 temperature sensor')
# Lifecycle summary
summary = lifecycle.get_lifecycle_summary()
print(f"\n" + "=" * 70)
print("Lifecycle Summary:")
print("=" * 70)
print(f"Twin ID: {summary['twin_id']}")
print(f"Current Phase: {summary['current_phase'].upper()}")
print(f"Phases Completed: {summary['total_phases_completed']}")
print(f"Validation Metrics: {summary['validation_metrics']}")
print(f"Maintenance Activities: {summary['maintenance_count']}")
Example Output:
======================================================================
Digital Twin Lifecycle Management
======================================================================
[2025-10-26 10:30:00] Phase: DESIGN - Twin created
[2025-10-26 10:30:01] Phase: IMPLEMENTATION - Design completed with 4 sensors
[2025-10-26 10:30:02] Phase: VERIFICATION - Implementation completed
[2025-10-26 10:30:03] Phase: OPERATION - Verification passed: RMSE=3.20, R²=0.920
[Maintenance] model_update: Retrained ML model with 1 month of new data
[Maintenance] sensor_calibration: Calibrated TI-101 temperature sensor
======================================================================
Lifecycle Summary:
======================================================================
Twin ID: DT-REACTOR-001
Current Phase: OPERATION
Phases Completed: 4
Validation Metrics: {'rmse': 3.2, 'r2_score': 0.92, 'mae': 2.1, 'latency_ms': 450, 'coverage': 0.95}
Maintenance Activities: 2
Explanation: Digital twin lifecycle management tracks progress through each phase from design to operation, achieving quality management based on verification criteria.
Code Example 5: Digital Twin Evaluation Metrics
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
import numpy as np
from typing import Dict, List
from datetime import datetime, timedelta
class DigitalTwinMetrics:
"""Calculate digital twin evaluation metrics"""
def __init__(self, twin_id: str):
self.twin_id = twin_id
self.predictions = []
self.actuals = []
self.latencies = []
self.sensor_coverage = {}
def record_prediction(self, predicted: np.ndarray, actual: np.ndarray,
latency_ms: float) -> None:
"""Record prediction results"""
self.predictions.append(predicted)
self.actuals.append(actual)
self.latencies.append(latency_ms)
def calculate_accuracy_metrics(self) -> Dict[str, float]:
"""Calculate accuracy metrics"""
if not self.predictions or not self.actuals:
return {}
predictions = np.array(self.predictions)
actuals = np.array(self.actuals)
# RMSE (Root Mean Squared Error)
rmse = np.sqrt(np.mean((predictions - actuals) ** 2))
# MAE (Mean Absolute Error)
mae = np.mean(np.abs(predictions - actuals))
# R² Score
ss_res = np.sum((actuals - predictions) ** 2)
ss_tot = np.sum((actuals - np.mean(actuals)) ** 2)
r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0.0
# MAPE (Mean Absolute Percentage Error)
mape = np.mean(np.abs((actuals - predictions) / (actuals + 1e-8))) * 100
return {
'rmse': float(rmse),
'mae': float(mae),
'r2_score': float(r2),
'mape': float(mape)
}
def calculate_realtime_metrics(self) -> Dict[str, float]:
"""Calculate real-time performance metrics"""
if not self.latencies:
return {}
latencies = np.array(self.latencies)
return {
'avg_latency_ms': float(np.mean(latencies)),
'p50_latency_ms': float(np.percentile(latencies, 50)),
'p95_latency_ms': float(np.percentile(latencies, 95)),
'p99_latency_ms': float(np.percentile(latencies, 99)),
'max_latency_ms': float(np.max(latencies))
}
def set_sensor_coverage(self, total_sensors: int, active_sensors: int) -> None:
"""Set sensor coverage"""
self.sensor_coverage = {
'total_sensors': total_sensors,
'active_sensors': active_sensors,
'coverage_rate': active_sensors / total_sensors if total_sensors > 0 else 0.0
}
def calculate_business_value_metrics(self,
cost_reduction_per_day: float,
downtime_reduction_hours: float,
hourly_production_value: float) -> Dict[str, float]:
"""Calculate business value metrics"""
# ROI calculation (30 days)
days = 30
total_cost_reduction = cost_reduction_per_day * days
downtime_value_saved = downtime_reduction_hours * hourly_production_value
total_value = total_cost_reduction + downtime_value_saved
# Digital twin operating cost (estimated)
operating_cost = 5000 # $/month (assumed)
roi = ((total_value - operating_cost) / operating_cost) * 100
return {
'cost_reduction_per_month': total_cost_reduction,
'downtime_value_saved': downtime_value_saved,
'total_value_per_month': total_value,
'operating_cost_per_month': operating_cost,
'roi_percent': roi,
'payback_period_months': operating_cost / (total_value / 30 * 30) if total_value > 0 else float('inf')
}
def get_comprehensive_report(self) -> Dict[str, Any]:
"""Generate comprehensive evaluation report"""
return {
'twin_id': self.twin_id,
'timestamp': datetime.now().isoformat(),
'accuracy': self.calculate_accuracy_metrics(),
'realtime_performance': self.calculate_realtime_metrics(),
'sensor_coverage': self.sensor_coverage,
'sample_count': len(self.predictions)
}
# Usage example: Calculate digital twin evaluation metrics
metrics = DigitalTwinMetrics(twin_id="DT-REACTOR-001")
# Generate simulation data
np.random.seed(42)
n_samples = 100
for i in range(n_samples):
# Actual value (temperature)
actual_temp = 185.0 + np.random.normal(0, 2.0)
# Predicted value (model prediction with some error)
predicted_temp = actual_temp + np.random.normal(0, 3.0)
# Latency (simulation)
latency = np.random.uniform(200, 800)
metrics.record_prediction(
predicted=np.array([predicted_temp]),
actual=np.array([actual_temp]),
latency_ms=latency
)
# Set sensor coverage
metrics.set_sensor_coverage(total_sensors=10, active_sensors=9)
# Calculate evaluation metrics
print("=" * 70)
print("Digital Twin Evaluation Metrics Report")
print("=" * 70 + "\n")
# Accuracy metrics
accuracy = metrics.calculate_accuracy_metrics()
print("Accuracy Metrics:")
print(f" RMSE: {accuracy['rmse']:.3f}°C")
print(f" MAE: {accuracy['mae']:.3f}°C")
print(f" R² Score: {accuracy['r2_score']:.4f}")
print(f" MAPE: {accuracy['mape']:.2f}%")
# Real-time performance
realtime = metrics.calculate_realtime_metrics()
print(f"\nReal-time Performance:")
print(f" Average Latency: {realtime['avg_latency_ms']:.1f} ms")
print(f" P50 Latency: {realtime['p50_latency_ms']:.1f} ms")
print(f" P95 Latency: {realtime['p95_latency_ms']:.1f} ms")
print(f" P99 Latency: {realtime['p99_latency_ms']:.1f} ms")
# Sensor coverage
coverage = metrics.sensor_coverage
print(f"\nSensor Coverage:")
print(f" Total Sensors: {coverage['total_sensors']}")
print(f" Active Sensors: {coverage['active_sensors']}")
print(f" Coverage Rate: {coverage['coverage_rate']*100:.1f}%")
# Business value
business_value = metrics.calculate_business_value_metrics(
cost_reduction_per_day=500, # $/day
downtime_reduction_hours=10, # hours/month
hourly_production_value=2000 # $/hour
)
print(f"\nBusiness Value Metrics (30 days):")
print(f" Cost Reduction: ${business_value['cost_reduction_per_month']:,.0f}")
print(f" Downtime Reduction Value: ${business_value['downtime_value_saved']:,.0f}")
print(f" Total Value: ${business_value['total_value_per_month']:,.0f}")
print(f" Operating Cost: ${business_value['operating_cost_per_month']:,.0f}")
print(f" ROI: {business_value['roi_percent']:.1f}%")
print(f" Payback Period: {business_value['payback_period_months']:.2f} months")
# Comprehensive report
report = metrics.get_comprehensive_report()
print(f"\nTotal Evaluation Samples: {report['sample_count']}")
Example Output:
======================================================================
Digital Twin Evaluation Metrics Report
======================================================================
Accuracy Metrics:
RMSE: 3.128°C
MAE: 2.487°C
R² Score: 0.1824
MAPE: 1.34%
Real-time Performance:
Average Latency: 498.7 ms
P50 Latency: 495.3 ms
P95 Latency: 766.2 ms
P99 Latency: 785.4 ms
Sensor Coverage:
Total Sensors: 10
Active Sensors: 9
Coverage Rate: 90.0%
Business Value Metrics (30 days):
Cost Reduction: $15,000
Downtime Reduction Value: $20,000
Total Value: $35,000
Operating Cost: $5,000
ROI: 600.0%
Payback Period: 0.14 months
Total Evaluation Samples: 100
Explanation: Digital twin evaluation requires four axes: accuracy (RMSE, R²), real-time performance (latency), coverage (sensor availability), and business value (ROI, payback period).
Code Example 6: Simple Digital Twin Prototype (Sensor Simulator)
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
import numpy as np
import time
from datetime import datetime
from typing import Dict, Callable
import matplotlib.pyplot as plt
class VirtualSensor:
"""Virtual sensor (physical system simulation)"""
def __init__(self, sensor_id: str, sensor_type: str,
base_value: float, noise_std: float = 0.5):
self.sensor_id = sensor_id
self.sensor_type = sensor_type
self.base_value = base_value
self.noise_std = noise_std
self.current_value = base_value
self.drift_rate = 0.0 # Sensor drift
def read(self) -> Dict[str, Any]:
"""Read sensor value"""
# Random noise
noise = np.random.normal(0, self.noise_std)
# Sensor drift (long-term change)
self.current_value += self.drift_rate
measured_value = self.current_value + noise
return {
'sensor_id': self.sensor_id,
'type': self.sensor_type,
'value': measured_value,
'unit': self._get_unit(),
'timestamp': datetime.now().isoformat(),
'quality': 'good'
}
def _get_unit(self) -> str:
"""Get sensor unit"""
units = {
'temperature': '°C',
'pressure': 'bar',
'flow': 'kg/h',
'concentration': 'mol/L'
}
return units.get(self.sensor_type, '')
def set_base_value(self, new_value: float) -> None:
"""Update base value (simulate process change)"""
self.current_value = new_value
class SimpleDigitalTwinPrototype:
"""Simple digital twin prototype"""
def __init__(self, twin_id: str):
self.twin_id = twin_id
self.sensors = {}
self.state_history = []
self.physical_model = None
def add_sensor(self, sensor: VirtualSensor) -> None:
"""Add sensor"""
self.sensors[sensor.sensor_id] = sensor
print(f"Added sensor: {sensor.sensor_id} ({sensor.sensor_type})")
def set_physical_model(self, model: Callable) -> None:
"""Set physical model"""
self.physical_model = model
print("Physical model configured")
def read_all_sensors(self) -> Dict[str, Any]:
"""Read all sensors"""
sensor_data = {}
for sensor_id, sensor in self.sensors.items():
reading = sensor.read()
sensor_data[sensor.sensor_type] = reading['value']
sensor_data['timestamp'] = datetime.now()
return sensor_data
def run_physics_simulation(self, current_state: Dict[str, float],
dt: float = 1.0) -> Dict[str, float]:
"""Update state using physical model"""
if self.physical_model is None:
return current_state
# Simple CSTR model
T = current_state.get('temperature', 185.0)
C = current_state.get('concentration', 2.5)
# Parameters
k_reaction = 0.05 # Reaction rate constant
k_cooling = 0.02 # Cooling rate constant
T_coolant = 60.0 # Coolant temperature
delta_H = -15.0 # Reaction heat
# Differential equations (Euler method)
dT = (-k_cooling * (T - T_coolant) + delta_H * k_reaction * C) * dt
dC = -k_reaction * C * dt
new_state = {
'temperature': T + dT,
'concentration': C + dC,
'timestamp': datetime.now()
}
return new_state
def synchronize(self) -> None:
"""Synchronize physical system and digital twin"""
# Read sensor data
sensor_data = self.read_all_sensors()
# State estimation using physical model
predicted_state = self.run_physics_simulation(sensor_data)
# Save to state history
self.state_history.append({
'timestamp': sensor_data['timestamp'],
'sensor_data': sensor_data,
'predicted_state': predicted_state
})
# Update sensor base values (feedback)
if 'temperature' in sensor_data and 'temperature' in self.sensors:
self.sensors['temperature'].set_base_value(predicted_state['temperature'])
def run_realtime_simulation(self, duration_seconds: int = 10,
update_interval: float = 1.0) -> None:
"""Run real-time simulation"""
print(f"\nStarting real-time simulation for {duration_seconds} seconds...")
print("=" * 70)
start_time = time.time()
while time.time() - start_time < duration_seconds:
self.synchronize()
# Display current state
latest = self.state_history[-1]
sensor_T = latest['sensor_data'].get('temperature', 0)
predicted_T = latest['predicted_state'].get('temperature', 0)
sensor_C = latest['sensor_data'].get('concentration', 0)
predicted_C = latest['predicted_state'].get('concentration', 0)
print(f"[t={len(self.state_history):3d}s] "
f"Sensor: T={sensor_T:6.2f}°C, C={sensor_C:.3f}mol/L | "
f"Model: T={predicted_T:6.2f}°C, C={predicted_C:.3f}mol/L")
time.sleep(update_interval)
print("=" * 70)
print("Simulation completed")
def visualize_state_history(self) -> None:
"""Visualize state history"""
if not self.state_history:
print("No data to visualize")
return
times = list(range(len(self.state_history)))
sensor_temps = [s['sensor_data'].get('temperature', 0) for s in self.state_history]
predicted_temps = [s['predicted_state'].get('temperature', 0) for s in self.state_history]
sensor_concs = [s['sensor_data'].get('concentration', 0) for s in self.state_history]
predicted_concs = [s['predicted_state'].get('concentration', 0) for s in self.state_history]
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# Temperature plot
ax1.plot(times, sensor_temps, 'o-', label='Sensor (Physical)',
color='#11998e', linewidth=2, markersize=4)
ax1.plot(times, predicted_temps, 's--', label='Model (Digital Twin)',
color='#e74c3c', linewidth=2, markersize=4, alpha=0.7)
ax1.set_xlabel('Time [s]', fontsize=11)
ax1.set_ylabel('Temperature [°C]', fontsize=11)
ax1.set_title('Digital Twin Temperature Synchronization', fontsize=13, fontweight='bold')
ax1.legend(fontsize=10)
ax1.grid(alpha=0.3)
# Concentration plot
ax2.plot(times, sensor_concs, 'o-', label='Sensor (Physical)',
color='#11998e', linewidth=2, markersize=4)
ax2.plot(times, predicted_concs, 's--', label='Model (Digital Twin)',
color='#e74c3c', linewidth=2, markersize=4, alpha=0.7)
ax2.set_xlabel('Time [s]', fontsize=11)
ax2.set_ylabel('Concentration [mol/L]', fontsize=11)
ax2.set_title('Digital Twin Concentration Synchronization', fontsize=13, fontweight='bold')
ax2.legend(fontsize=10)
ax2.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# Usage example: Run simple digital twin prototype
print("=" * 70)
print("Simple Digital Twin Prototype")
print("=" * 70)
# Create digital twin
dt_prototype = SimpleDigitalTwinPrototype(twin_id="DT-PROTO-001")
# Add virtual sensors
temp_sensor = VirtualSensor('TI-101', 'temperature', base_value=185.0, noise_std=1.0)
conc_sensor = VirtualSensor('AI-104', 'concentration', base_value=2.5, noise_std=0.05)
dt_prototype.add_sensor(temp_sensor)
dt_prototype.add_sensor(conc_sensor)
# Set physical model
dt_prototype.set_physical_model(model=lambda x: x)
# Run real-time simulation
dt_prototype.run_realtime_simulation(duration_seconds=10, update_interval=1.0)
# Visualize state history
# dt_prototype.visualize_state_history() # matplotlib display (uncomment when running)
print(f"\nTotal synchronized states: {len(dt_prototype.state_history)}")
Example Output:
======================================================================
Simple Digital Twin Prototype
======================================================================
Added sensor: TI-101 (temperature)
Added sensor: AI-104 (concentration)
Physical model configured
Starting real-time simulation for 10 seconds...
======================================================================
[t= 1s] Sensor: T=186.23°C, C=2.512mol/L | Model: T=184.87°C, C=2.488mol/L
[t= 2s] Sensor: T=183.65°C, C=2.475mol/L | Model: T=183.72°C, C=2.476mol/L
[t= 3s] Sensor: T=182.48°C, C=2.463mol/L | Model: T=182.58°C, C=2.464mol/L
[t= 4s] Sensor: T=181.34°C, C=2.451mol/L | Model: T=181.45°C, C=2.452mol/L
[t= 5s] Sensor: T=180.21°C, C=2.439mol/L | Model: T=180.33°C, C=2.440mol/L
[t= 6s] Sensor: T=179.10°C, C=2.427mol/L | Model: T=179.22°C, C=2.429mol/L
[t= 7s] Sensor: T=178.00°C, C=2.416mol/L | Model: T=178.13°C, C=2.417mol/L
[t= 8s] Sensor: T=176.92°C, C=2.404mol/L | Model: T=177.05°C, C=2.406mol/L
[t= 9s] Sensor: T=175.85°C, C=2.393mol/L | Model: T=175.98°C, C=2.395mol/L
[t= 10s] Sensor: T=174.80°C, C=2.382mol/L | Model: T=174.92°C, C=2.383mol/L
======================================================================
Simulation completed
Total synchronized states: 10
Explanation: This simple prototype synchronizes virtual sensors (physical system simulation) and digital twins (physical model) in real-time. Integration with actual IoT sensors is covered in Chapter 2.
Code Example 7: Digital Twin Comprehensive Demo (State Visualization Dashboard)
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
from dataclasses import dataclass
from typing import Dict, List
import numpy as np
from datetime import datetime
@dataclass
class DigitalTwinDashboard:
"""Digital twin visualization dashboard"""
twin_id: str
state_history: List[Dict] = None
def __post_init__(self):
if self.state_history is None:
self.state_history = []
def update_dashboard(self, sensor_data: Dict[str, float],
model_prediction: Dict[str, float],
alerts: List[str] = None) -> None:
"""Update dashboard data"""
dashboard_entry = {
'timestamp': datetime.now(),
'sensor_data': sensor_data,
'model_prediction': model_prediction,
'deviation': self._calculate_deviation(sensor_data, model_prediction),
'alerts': alerts or []
}
self.state_history.append(dashboard_entry)
def _calculate_deviation(self, sensor: Dict[str, float],
model: Dict[str, float]) -> Dict[str, float]:
"""Calculate deviation between sensor and model"""
deviations = {}
for key in sensor.keys():
if key in model:
deviation = abs(sensor[key] - model[key])
deviations[key] = deviation
return deviations
def generate_text_dashboard(self) -> str:
"""Generate text-based dashboard"""
if not self.state_history:
return "No data available"
latest = self.state_history[-1]
dashboard_text = f"""
╔═══════════════════════════════════════════════════════════════════╗
║ DIGITAL TWIN MONITORING DASHBOARD ║
╚═══════════════════════════════════════════════════════════════════╝
Twin ID: {self.twin_id}
Timestamp: {latest['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}
Status: {'⚠️ ALERT' if latest['alerts'] else '✅ NORMAL'}
┌───────────────────────────────────────────────────────────────────┐
│ PROCESS VARIABLES │
├───────────────────────────────────────────────────────────────────┤
"""
# Display process variables
sensor = latest['sensor_data']
model = latest['model_prediction']
deviation = latest['deviation']
for key in sensor.keys():
if key != 'timestamp':
sensor_val = sensor.get(key, 0)
model_val = model.get(key, 0)
dev_val = deviation.get(key, 0)
# Alert determination
alert_symbol = "⚠️ " if dev_val > 5.0 else " "
dashboard_text += f"│ {alert_symbol}{key.upper():20s}: Sensor={sensor_val:7.2f} | Model={model_val:7.2f} | Δ={dev_val:5.2f} │\n"
dashboard_text += "└───────────────────────────────────────────────────────────────────┘\n"
# Alerts
if latest['alerts']:
dashboard_text += "\n┌───────────────────────────────────────────────────────────────────┐\n"
dashboard_text += "│ ALERTS │\n"
dashboard_text += "├───────────────────────────────────────────────────────────────────┤\n"
for alert in latest['alerts']:
dashboard_text += f"│ ⚠️ {alert:63s} │\n"
dashboard_text += "└───────────────────────────────────────────────────────────────────┘\n"
# Statistics
if len(self.state_history) > 1:
recent_temps = [s['sensor_data'].get('temperature', 0) for s in self.state_history[-10:]]
dashboard_text += f"\n┌───────────────────────────────────────────────────────────────────┐\n"
dashboard_text += f"│ STATISTICS (Last 10 samples) │\n"
dashboard_text += f"├───────────────────────────────────────────────────────────────────┤\n"
dashboard_text += f"│ Temperature: Avg={np.mean(recent_temps):6.2f}°C | Std={np.std(recent_temps):5.2f}°C │\n"
dashboard_text += f"│ Total Samples: {len(self.state_history):5d} │\n"
dashboard_text += f"└───────────────────────────────────────────────────────────────────┘\n"
return dashboard_text
def get_kpi_summary(self) -> Dict[str, Any]:
"""Get KPI summary"""
if not self.state_history:
return {}
# Average deviation rate
all_deviations = [entry['deviation'] for entry in self.state_history]
avg_deviation_temp = np.mean([d.get('temperature', 0) for d in all_deviations])
# Alert occurrence rate
alert_count = sum(1 for entry in self.state_history if entry['alerts'])
alert_rate = alert_count / len(self.state_history) * 100
# Data quality score (higher quality with smaller deviation)
quality_score = max(0, 100 - avg_deviation_temp * 10)
return {
'total_samples': len(self.state_history),
'avg_deviation_temperature': avg_deviation_temp,
'alert_count': alert_count,
'alert_rate_percent': alert_rate,
'data_quality_score': quality_score
}
# Usage example: Digital twin dashboard demo
dashboard = DigitalTwinDashboard(twin_id="DT-REACTOR-001")
# Simulation data
np.random.seed(42)
for i in range(15):
# Sensor data (physical system)
sensor_data = {
'temperature': 185.0 + np.random.normal(0, 2.0),
'pressure': 3.5 + np.random.normal(0, 0.2),
'concentration': 2.5 + np.random.normal(0, 0.1)
}
# Model prediction (digital twin)
model_prediction = {
'temperature': sensor_data['temperature'] + np.random.normal(0, 1.5),
'pressure': sensor_data['pressure'] + np.random.normal(0, 0.1),
'concentration': sensor_data['concentration'] + np.random.normal(0, 0.05)
}
# Alert generation
alerts = []
if abs(sensor_data['temperature'] - model_prediction['temperature']) > 4.0:
alerts.append("High deviation in temperature prediction")
if sensor_data['temperature'] > 190:
alerts.append("Temperature approaching upper limit")
dashboard.update_dashboard(sensor_data, model_prediction, alerts)
# Display dashboard
print(dashboard.generate_text_dashboard())
# KPI summary
kpi = dashboard.get_kpi_summary()
print("\n" + "=" * 70)
print("KPI SUMMARY")
print("=" * 70)
print(f"Total Samples: {kpi['total_samples']}")
print(f"Avg Temperature Deviation: {kpi['avg_deviation_temperature']:.2f}°C")
print(f"Alert Count: {kpi['alert_count']}")
print(f"Alert Rate: {kpi['alert_rate_percent']:.1f}%")
print(f"Data Quality Score: {kpi['data_quality_score']:.1f}/100")
Example Output:
╔═══════════════════════════════════════════════════════════════════╗
║ DIGITAL TWIN MONITORING DASHBOARD ║
╚═══════════════════════════════════════════════════════════════════╝
Twin ID: DT-REACTOR-001
Timestamp: 2025-10-26 10:30:45
Status: ✅ NORMAL
┌───────────────────────────────────────────────────────────────────┐
│ PROCESS VARIABLES │
├───────────────────────────────────────────────────────────────────┤
│ TEMPERATURE : Sensor= 185.47 | Model= 184.82 | Δ= 0.65 │
│ PRESSURE : Sensor= 3.52 | Model= 3.48 | Δ= 0.04 │
│ CONCENTRATION : Sensor= 2.51 | Model= 2.50 | Δ= 0.01 │
└───────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────────────────────────────────────┐
│ STATISTICS (Last 10 samples) │
├───────────────────────────────────────────────────────────────────┤
│ Temperature: Avg=185.12°C | Std= 1.85°C │
│ Total Samples: 15 │
└───────────────────────────────────────────────────────────────────┘
======================================================================
KPI SUMMARY
======================================================================
Total Samples: 15
Avg Temperature Deviation: 1.42°C
Alert Count: 2
Alert Rate: 13.3%
Data Quality Score: 85.8/100
Explanation: Dashboards play a central role in digital twin state monitoring. Visualizing the deviation between sensor data and model predictions in real-time, alerts and KPIs support operator decision-making.
1.3 Chapter Summary
What We Learned
- Digital Twin Concepts
- Digital twin as a virtual replica of physical systems
- Difference between digital shadow (one-way) vs digital twin (bidirectional)
- Value in process industries (optimization, predictive maintenance, virtual commissioning)
- Architecture Design
- 4-layer structure: physical system layer, data layer, model layer, application layer
- State representation and JSON serialization
- Bidirectional data flow design
- Model Fidelity Levels
- L1 (data logging) → L3 (physical model) → L5 (autonomous optimization)
- Functions and application scenarios for each level
- Lifecycle Management
- Phase management: design → implementation → verification → operation
- Verification criteria and metrics
- Evaluation Metrics
- Accuracy (RMSE, R²), real-time performance (latency)
- Business value (ROI, payback period)
- Prototype Implementation
- Synchronization of virtual sensors and digital twins
- State monitoring through dashboards
Key Points
Digital twins are not just simulations - real-time synchronization and bidirectional feedback are essential. Fidelity levels L1-L5 should evolve progressively according to project maturity. State representation design is key to extensibility and maintainability. Evaluation metrics should be defined including not just accuracy but also business value. Lifecycle management maintains consistent quality from design through operation.
To the Next Chapter
In Chapter 2, we will learn in detail about Real-time Data Integration and IoT Integration, covering implementation of industrial communication protocols including OPC UA and MQTT, integration with time-series databases such as InfluxDB, and data streaming processing with Apache Kafka. The chapter also addresses sensor data quality management and anomaly detection, as well as edge computing architecture for industrial applications.