🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 1: Digital Twin Fundamentals

Understanding Digital Twin Concepts, Architecture, and State Representation

📖 Reading Time: 25-30 minutes 📊 Difficulty: Advanced 💻 Code Examples: 7

This chapter covers the fundamentals of Digital Twin Fundamentals, which digital twin concepts and definitions. You will learn differences between digital shadows, Design digital twin architectures, and Design state representations.

Learning Objectives

By reading this chapter, you will master the following:


1.1 Digital Twin Concepts and Definitions

What is a Digital Twin?

Digital Twin is a virtual replica of a physical system that synchronizes data in real-time and enables simulation, analysis, and optimization through digital models.

"A digital twin is a digital representation of a physical asset, integrating IoT sensor data and physical models, continuously updated throughout the asset's lifecycle." - Gartner

Digital Shadow vs Digital Twin vs Digital Thread

Concept Data Flow Main Functions Examples
Digital Shadow Physical → Digital (one-way) Monitoring, visualization, historical data storage Process logs, dashboards
Digital Twin Physical ⇄ Digital (bidirectional) Prediction, optimization, control, what-if analysis Real-time optimization, MPC
Digital Thread Integration across entire lifecycle Consistency from design → manufacturing → operation PLM integration, traceability

Value of Digital Twins in Process Industries


1.2 Digital Twin Architecture Design

4-Layer Architecture Model

graph TD A[Physical System Layer
Chemical Reactor, Sensors, Actuators] -->|Real-time Data| B[Data Layer
IoT Gateway, MQTT, OPC UA, Time-Series DB] B -->|Preprocessed Data| C[Model Layer
Physical Model + ML Model
State Estimation, Prediction] C -->|Optimization Results| D[Application Layer
RTO, MPC, Dashboards, Alerts] D -.->|Control Commands| A style A fill:#ffecb3 style B fill:#c8e6c9 style C fill:#b3e5fc style D fill:#e1bee7

Code Example 1: Digital Twin Architecture Design Pattern

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime
import numpy as np

@dataclass
class PhysicalAsset:
    """Definition of physical system"""
    asset_id: str
    asset_type: str  # 'reactor', 'distillation_column', etc.
    location: str
    sensors: Dict[str, str]  # sensor_name -> sensor_id
    actuators: Dict[str, str]  # actuator_name -> actuator_id

    def __repr__(self):
        return f"PhysicalAsset(id={self.asset_id}, type={self.asset_type}, sensors={len(self.sensors)})"


@dataclass
class State:
    """State representation of digital twin"""
    timestamp: datetime
    temperature: float  # [°C]
    pressure: float     # [bar]
    flow_rate: float    # [kg/h]
    concentration: float  # [mol/L]

    def to_dict(self) -> Dict[str, Any]:
        """Convert state to JSON format"""
        return {
            'timestamp': self.timestamp.isoformat(),
            'temperature': self.temperature,
            'pressure': self.pressure,
            'flow_rate': self.flow_rate,
            'concentration': self.concentration
        }


@dataclass
class DigitalTwinArchitecture:
    """Overall architecture of digital twin"""
    twin_id: str
    physical_asset: PhysicalAsset
    current_state: Optional[State] = None
    state_history: List[State] = field(default_factory=list)
    fidelity_level: int = 1  # 1-5

    def update_state(self, new_state: State) -> None:
        """Update state from physical system"""
        self.current_state = new_state
        self.state_history.append(new_state)
        print(f"[{new_state.timestamp}] State updated: T={new_state.temperature:.1f}°C, "
              f"P={new_state.pressure:.1f}bar")

    def get_state_vector(self) -> np.ndarray:
        """Get state vector (for machine learning models)"""
        if self.current_state is None:
            return np.array([])

        return np.array([
            self.current_state.temperature,
            self.current_state.pressure,
            self.current_state.flow_rate,
            self.current_state.concentration
        ])

    def get_state_history_matrix(self, window_size: int = 10) -> np.ndarray:
        """Get time-series state matrix"""
        if len(self.state_history) < window_size:
            window_size = len(self.state_history)

        recent_states = self.state_history[-window_size:]
        state_matrix = np.array([
            [s.temperature, s.pressure, s.flow_rate, s.concentration]
            for s in recent_states
        ])
        return state_matrix


# Usage example: Digital twin design for a chemical reactor
reactor_asset = PhysicalAsset(
    asset_id="REACTOR-001",
    asset_type="CSTR",
    location="Plant-A, Unit-1",
    sensors={
        'temperature': 'TI-101',
        'pressure': 'PI-102',
        'flow_in': 'FI-103',
        'concentration': 'AI-104'
    },
    actuators={
        'cooling_valve': 'CV-201',
        'feed_valve': 'FV-202'
    }
)

digital_twin = DigitalTwinArchitecture(
    twin_id="DT-REACTOR-001",
    physical_asset=reactor_asset,
    fidelity_level=3
)

# Set initial state
initial_state = State(
    timestamp=datetime.now(),
    temperature=185.0,
    pressure=3.5,
    flow_rate=1200.0,
    concentration=2.5
)

digital_twin.update_state(initial_state)

print(f"\nDigital Twin: {digital_twin.twin_id}")
print(f"Physical Asset: {digital_twin.physical_asset}")
print(f"Fidelity Level: L{digital_twin.fidelity_level}")
print(f"Current State Vector: {digital_twin.get_state_vector()}")

Example Output:

[2025-10-26 10:30:00] State updated: T=185.0°C, P=3.5bar

Digital Twin: DT-REACTOR-001
Physical Asset: PhysicalAsset(id=REACTOR-001, type=CSTR, sensors=4)
Fidelity Level: L3
Current State Vector: [185.    3.5 1200.    2.5]

Explanation: Digital twin architecture integrates physical assets, state representation, and history management. This dataclass design ensures extensibility and maintainability.


Code Example 2: State Representation and Data Model Design

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

import json
from datetime import datetime, timedelta
from typing import Dict, Any, List
import numpy as np

class DigitalTwinStateModel:
    """State representation and serialization of digital twin"""

    def __init__(self, twin_id: str):
        self.twin_id = twin_id
        self.state_variables = {
            'process': {},  # Process variables
            'equipment': {},  # Equipment status
            'quality': {}   # Quality metrics
        }
        self.metadata = {
            'last_update': None,
            'data_quality': 1.0,  # 0.0-1.0
            'sync_status': 'initialized'
        }

    def update_process_variables(self, variables: Dict[str, float]) -> None:
        """Update process variables"""
        self.state_variables['process'].update(variables)
        self.metadata['last_update'] = datetime.now().isoformat()
        self.metadata['sync_status'] = 'synced'

    def update_equipment_status(self, status: Dict[str, Any]) -> None:
        """Update equipment status"""
        self.state_variables['equipment'].update(status)

    def update_quality_metrics(self, metrics: Dict[str, float]) -> None:
        """Update quality metrics"""
        self.state_variables['quality'].update(metrics)

    def to_json(self) -> str:
        """Serialization to JSON format"""
        state_dict = {
            'twin_id': self.twin_id,
            'state_variables': self.state_variables,
            'metadata': self.metadata
        }
        return json.dumps(state_dict, indent=2)

    @classmethod
    def from_json(cls, json_str: str) -> 'DigitalTwinStateModel':
        """Deserialization from JSON"""
        data = json.loads(json_str)
        twin = cls(data['twin_id'])
        twin.state_variables = data['state_variables']
        twin.metadata = data['metadata']
        return twin

    def validate_state(self) -> Dict[str, Any]:
        """State validation"""
        validation_results = {
            'valid': True,
            'warnings': [],
            'errors': []
        }

        # Range check for process variables
        process_vars = self.state_variables['process']

        if 'temperature' in process_vars:
            T = process_vars['temperature']
            if T < 50 or T > 250:
                validation_results['errors'].append(
                    f"Temperature out of range: {T}°C (valid: 50-250°C)"
                )
                validation_results['valid'] = False

        if 'pressure' in process_vars:
            P = process_vars['pressure']
            if P < 1 or P > 10:
                validation_results['errors'].append(
                    f"Pressure out of range: {P} bar (valid: 1-10 bar)"
                )
                validation_results['valid'] = False

        # Data freshness check
        if self.metadata['last_update']:
            last_update = datetime.fromisoformat(self.metadata['last_update'])
            age = (datetime.now() - last_update).total_seconds()
            if age > 60:  # Older than 60 seconds
                validation_results['warnings'].append(
                    f"State data is stale: {age:.0f} seconds old"
                )

        return validation_results


# Usage example
twin_state = DigitalTwinStateModel(twin_id="DT-REACTOR-001")

# Update process variables
twin_state.update_process_variables({
    'temperature': 185.0,
    'pressure': 3.5,
    'flow_rate': 1200.0,
    'concentration': 2.5,
    'pH': 7.2
})

# Update equipment status
twin_state.update_equipment_status({
    'agitator_rpm': 250,
    'cooling_valve_opening': 45.0,  # %
    'pump_status': 'running'
})

# Update quality metrics
twin_state.update_quality_metrics({
    'yield': 89.5,  # %
    'purity': 98.2,  # %
    'conversion': 92.0  # %
})

# Export in JSON format
json_state = twin_state.to_json()
print("Digital Twin State (JSON):")
print(json_state)

# State validation
validation = twin_state.validate_state()
print(f"\nValidation Results:")
print(f"  Valid: {validation['valid']}")
print(f"  Warnings: {validation['warnings']}")
print(f"  Errors: {validation['errors']}")

# Restoration from JSON
restored_twin = DigitalTwinStateModel.from_json(json_state)
print(f"\nRestored Twin ID: {restored_twin.twin_id}")
print(f"Process Temperature: {restored_twin.state_variables['process']['temperature']}°C")

Example Output:

Digital Twin State (JSON):
{
  "twin_id": "DT-REACTOR-001",
  "state_variables": {
    "process": {
      "temperature": 185.0,
      "pressure": 3.5,
      "flow_rate": 1200.0,
      "concentration": 2.5,
      "pH": 7.2
    },
    "equipment": {
      "agitator_rpm": 250,
      "cooling_valve_opening": 45.0,
      "pump_status": "running"
    },
    "quality": {
      "yield": 89.5,
      "purity": 98.2,
      "conversion": 92.0
    }
  },
  "metadata": {
    "last_update": "2025-10-26T10:30:15.123456",
    "data_quality": 1.0,
    "sync_status": "synced"
  }
}

Validation Results:
  Valid: True
  Warnings: []
  Errors: []

Restored Twin ID: DT-REACTOR-001
Process Temperature: 185.0°C

Explanation: State representation design hierarchically manages process variables, equipment status, and quality metrics. JSON serialization facilitates database storage and API communication.


Code Example 3: Model Fidelity Levels (L1-L5)

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

from abc import ABC, abstractmethod
from typing import Dict, Any
import numpy as np
from datetime import datetime

class DigitalTwinFidelityLevel(ABC):
    """Abstract base class for digital twin fidelity levels"""

    def __init__(self, twin_id: str, level: int):
        self.twin_id = twin_id
        self.level = level
        self.state_history = []

    @abstractmethod
    def process_sensor_data(self, sensor_data: Dict[str, float]) -> Dict[str, Any]:
        """Process sensor data"""
        pass

    @abstractmethod
    def predict(self, horizon: int) -> np.ndarray:
        """Predict future states"""
        pass


class L1_DigitalShadow(DigitalTwinFidelityLevel):
    """L1: Digital Shadow - Data logging only"""

    def __init__(self, twin_id: str):
        super().__init__(twin_id, level=1)

    def process_sensor_data(self, sensor_data: Dict[str, float]) -> Dict[str, Any]:
        """Save sensor data and calculate basic statistics"""
        self.state_history.append({
            'timestamp': datetime.now(),
            'data': sensor_data
        })

        # Calculate basic statistics
        if len(self.state_history) > 0:
            recent_temps = [s['data'].get('temperature', 0) for s in self.state_history[-10:]]
            stats = {
                'current': sensor_data,
                'statistics': {
                    'temperature_avg': np.mean(recent_temps),
                    'temperature_std': np.std(recent_temps)
                }
            }
            return stats

        return {'current': sensor_data}

    def predict(self, horizon: int) -> np.ndarray:
        """No prediction capability (data logging only)"""
        return np.array([])


class L3_PhysicsBasedTwin(DigitalTwinFidelityLevel):
    """L3: Physics-based model + parameter estimation"""

    def __init__(self, twin_id: str):
        super().__init__(twin_id, level=3)
        # Simple CSTR model parameters
        self.k_reaction = 0.05  # Reaction rate constant [1/min]
        self.k_cooling = 0.02   # Cooling rate constant [1/min]
        self.T_coolant = 60.0   # Coolant temperature [°C]

    def process_sensor_data(self, sensor_data: Dict[str, float]) -> Dict[str, Any]:
        """Process sensor data and update parameters"""
        self.state_history.append({
            'timestamp': datetime.now(),
            'data': sensor_data
        })

        # Parameter estimation (simplified version)
        if len(self.state_history) > 5:
            self._estimate_parameters()

        return {
            'current': sensor_data,
            'model_parameters': {
                'k_reaction': self.k_reaction,
                'k_cooling': self.k_cooling
            }
        }

    def _estimate_parameters(self) -> None:
        """Simplified parameter estimation"""
        # In practice, use least squares or Kalman filter
        recent_data = self.state_history[-5:]
        temps = [s['data'].get('temperature', 185) for s in recent_data]

        # Estimate cooling constant from temperature change rate
        if len(temps) > 1:
            dT = np.diff(temps)
            self.k_cooling = abs(np.mean(dT)) / (temps[0] - self.T_coolant) * 60  # Simple estimation

    def predict(self, horizon: int) -> np.ndarray:
        """State prediction using physical model"""
        if not self.state_history:
            return np.array([])

        current_state = self.state_history[-1]['data']
        T0 = current_state.get('temperature', 185.0)
        C0 = current_state.get('concentration', 2.5)

        # Simple ODE model (Euler method)
        dt = 1.0  # time step [min]
        predictions = []
        T, C = T0, C0

        for _ in range(horizon):
            # dT/dt = -k_cooling * (T - T_coolant) - ΔH * k_reaction * C
            dT = -self.k_cooling * (T - self.T_coolant) - 15 * self.k_reaction * C
            # dC/dt = -k_reaction * C
            dC = -self.k_reaction * C

            T += dT * dt
            C += dC * dt

            predictions.append([T, C])

        return np.array(predictions)


class L5_AutonomousTwin(DigitalTwinFidelityLevel):
    """L5: Autonomous optimization + closed-loop control"""

    def __init__(self, twin_id: str):
        super().__init__(twin_id, level=5)
        self.physics_model = L3_PhysicsBasedTwin(twin_id)
        self.ml_correction = None  # ML correction (to be implemented in Chapter 3)
        self.optimizer = None      # Optimization engine (to be implemented in Chapter 4)

    def process_sensor_data(self, sensor_data: Dict[str, float]) -> Dict[str, Any]:
        """Process sensor data + autonomous optimization"""
        # Update physics model
        physics_result = self.physics_model.process_sensor_data(sensor_data)

        # Autonomous optimization (simplified)
        optimal_setpoint = self._optimize_setpoint(sensor_data)

        return {
            'current': sensor_data,
            'physics_model': physics_result['model_parameters'],
            'optimal_setpoint': optimal_setpoint,
            'control_action': self._compute_control_action(sensor_data, optimal_setpoint)
        }

    def _optimize_setpoint(self, current_state: Dict[str, float]) -> Dict[str, float]:
        """Calculate optimal setpoint"""
        # Simplified: maximize yield
        T_current = current_state.get('temperature', 185.0)
        T_optimal = 190.0  # Simplified optimal temperature

        return {
            'temperature': T_optimal,
            'pressure': 3.5,
            'flow_rate': 1200.0
        }

    def _compute_control_action(self, current: Dict[str, float],
                                  setpoint: Dict[str, float]) -> Dict[str, float]:
        """Compute control action (simplified PI control)"""
        T_current = current.get('temperature', 185.0)
        T_setpoint = setpoint['temperature']

        error = T_setpoint - T_current
        Kp = 2.0  # Proportional gain

        cooling_valve_adjustment = Kp * error

        return {
            'cooling_valve_change': cooling_valve_adjustment,
            'control_mode': 'automatic'
        }

    def predict(self, horizon: int) -> np.ndarray:
        """Hybrid model prediction"""
        return self.physics_model.predict(horizon)


# Usage example: Comparison of different fidelity levels
print("=" * 70)
print("Comparison of Digital Twin Fidelity Levels")
print("=" * 70)

# L1: Digital Shadow
l1_twin = L1_DigitalShadow("DT-L1-REACTOR-001")
sensor_data = {'temperature': 185.0, 'pressure': 3.5, 'concentration': 2.5}
l1_result = l1_twin.process_sensor_data(sensor_data)
print(f"\nL1 (Digital Shadow):")
print(f"  Capabilities: Data logging, basic statistics")
print(f"  Result: {l1_result}")

# L3: Physics-based
l3_twin = L3_PhysicsBasedTwin("DT-L3-REACTOR-001")
l3_result = l3_twin.process_sensor_data(sensor_data)
predictions_l3 = l3_twin.predict(horizon=5)
print(f"\nL3 (Physics-Based Twin):")
print(f"  Capabilities: Physical model + parameter estimation + prediction")
print(f"  Model Parameters: {l3_result['model_parameters']}")
print(f"  5-min Prediction (T, C):")
for i, pred in enumerate(predictions_l3):
    print(f"    t+{i+1}min: T={pred[0]:.2f}°C, C={pred[1]:.3f}mol/L")

# L5: Autonomous optimization
l5_twin = L5_AutonomousTwin("DT-L5-REACTOR-001")
l5_result = l5_twin.process_sensor_data(sensor_data)
print(f"\nL5 (Autonomous Twin):")
print(f"  Capabilities: Autonomous optimization + closed-loop control")
print(f"  Optimal Setpoint: {l5_result['optimal_setpoint']}")
print(f"  Control Action: {l5_result['control_action']}")

Example Output:

======================================================================
Comparison of Digital Twin Fidelity Levels
======================================================================

L1 (Digital Shadow):
  Capabilities: Data logging, basic statistics
  Result: {'current': {'temperature': 185.0, 'pressure': 3.5, 'concentration': 2.5}}

L3 (Physics-Based Twin):
  Capabilities: Physical model + parameter estimation + prediction
  Model Parameters: {'k_reaction': 0.05, 'k_cooling': 0.02}
  5-min Prediction (T, C):
    t+1min: T=182.50°C, C=2.438mol/L
    t+2min: T=180.05°C, C=2.376mol/L
    t+3min: T=177.64°C, C=2.315mol/L
    t+4min: T=175.28°C, C=2.255mol/L
    t+5min: T=172.96°C, C=2.196mol/L

L5 (Autonomous Twin):
  Capabilities: Autonomous optimization + closed-loop control
  Optimal Setpoint: {'temperature': 190.0, 'pressure': 3.5, 'flow_rate': 1200.0}
  Control Action: {'cooling_valve_change': 10.0, 'control_mode': 'automatic'}

Explanation: Fidelity levels L1-L5 represent the functional maturity of digital twins. L1 is simple data logging, L3 is physical models and prediction, L5 achieves autonomous optimization and closed-loop control. Select the appropriate level based on project stage.


Code Example 4: Digital Twin Lifecycle Management

from enum import Enum
from datetime import datetime
from typing import List, Dict, Any

class LifecyclePhase(Enum):
    """Digital twin lifecycle phases"""
    DESIGN = "design"
    IMPLEMENTATION = "implementation"
    VERIFICATION = "verification"
    OPERATION = "operation"
    MAINTENANCE = "maintenance"
    DECOMMISSIONED = "decommissioned"


class DigitalTwinLifecycle:
    """Digital twin lifecycle management"""

    def __init__(self, twin_id: str):
        self.twin_id = twin_id
        self.current_phase = LifecyclePhase.DESIGN
        self.phase_history = []
        self.validation_metrics = {}
        self.maintenance_log = []

        self._log_phase_change(LifecyclePhase.DESIGN, "Twin created")

    def _log_phase_change(self, new_phase: LifecyclePhase, reason: str) -> None:
        """Log phase changes"""
        log_entry = {
            'timestamp': datetime.now(),
            'from_phase': self.current_phase.value if self.current_phase else None,
            'to_phase': new_phase.value,
            'reason': reason
        }
        self.phase_history.append(log_entry)
        self.current_phase = new_phase
        print(f"[{log_entry['timestamp']}] Phase: {new_phase.value.upper()} - {reason}")

    def complete_design(self, requirements: Dict[str, Any]) -> bool:
        """Complete design phase"""
        if self.current_phase != LifecyclePhase.DESIGN:
            print("Error: Not in DESIGN phase")
            return False

        # Requirement check
        required_keys = ['sensors', 'model_type', 'update_frequency']
        if all(key in requirements for key in required_keys):
            self._log_phase_change(LifecyclePhase.IMPLEMENTATION,
                                    f"Design completed with {len(requirements['sensors'])} sensors")
            return True
        else:
            print(f"Error: Missing requirements - {required_keys}")
            return False

    def complete_implementation(self, components: Dict[str, Any]) -> bool:
        """Complete implementation phase"""
        if self.current_phase != LifecyclePhase.IMPLEMENTATION:
            print("Error: Not in IMPLEMENTATION phase")
            return False

        # Component check
        if 'data_pipeline' in components and 'model' in components:
            self._log_phase_change(LifecyclePhase.VERIFICATION,
                                    "Implementation completed")
            return True
        return False

    def run_verification(self, test_results: Dict[str, float]) -> bool:
        """Run verification phase"""
        if self.current_phase != LifecyclePhase.VERIFICATION:
            print("Error: Not in VERIFICATION phase")
            return False

        # Evaluate validation metrics
        self.validation_metrics = test_results

        rmse = test_results.get('rmse', float('inf'))
        r2_score = test_results.get('r2_score', 0.0)
        latency = test_results.get('latency_ms', float('inf'))

        # Pass criteria
        passed = (rmse < 5.0 and r2_score > 0.85 and latency < 1000)

        if passed:
            self._log_phase_change(LifecyclePhase.OPERATION,
                                    f"Verification passed: RMSE={rmse:.2f}, R²={r2_score:.3f}")
            return True
        else:
            print(f"Verification failed: RMSE={rmse:.2f}, R²={r2_score:.3f}, Latency={latency}ms")
            return False

    def log_maintenance(self, maintenance_type: str, description: str) -> None:
        """Log maintenance work"""
        log_entry = {
            'timestamp': datetime.now(),
            'type': maintenance_type,
            'description': description,
            'phase': self.current_phase.value
        }
        self.maintenance_log.append(log_entry)
        print(f"[Maintenance] {maintenance_type}: {description}")

    def get_lifecycle_summary(self) -> Dict[str, Any]:
        """Get lifecycle summary"""
        return {
            'twin_id': self.twin_id,
            'current_phase': self.current_phase.value,
            'total_phases_completed': len(self.phase_history),
            'validation_metrics': self.validation_metrics,
            'maintenance_count': len(self.maintenance_log)
        }


# Usage example: Digital twin lifecycle management
lifecycle = DigitalTwinLifecycle(twin_id="DT-REACTOR-001")

print("\n" + "=" * 70)
print("Digital Twin Lifecycle Management")
print("=" * 70 + "\n")

# Complete design phase
design_requirements = {
    'sensors': ['TI-101', 'PI-102', 'FI-103', 'AI-104'],
    'model_type': 'hybrid_physics_ml',
    'update_frequency': '1s',
    'fidelity_level': 3
}
lifecycle.complete_design(design_requirements)

# Complete implementation phase
implementation_components = {
    'data_pipeline': 'MQTT + InfluxDB',
    'model': 'CSTR physics + LightGBM correction',
    'api': 'FastAPI REST endpoint'
}
lifecycle.complete_implementation(implementation_components)

# Run verification phase
verification_results = {
    'rmse': 3.2,
    'r2_score': 0.92,
    'mae': 2.1,
    'latency_ms': 450,
    'coverage': 0.95
}
lifecycle.run_verification(verification_results)

# Maintenance during operation
lifecycle.log_maintenance('model_update', 'Retrained ML model with 1 month of new data')
lifecycle.log_maintenance('sensor_calibration', 'Calibrated TI-101 temperature sensor')

# Lifecycle summary
summary = lifecycle.get_lifecycle_summary()
print(f"\n" + "=" * 70)
print("Lifecycle Summary:")
print("=" * 70)
print(f"Twin ID: {summary['twin_id']}")
print(f"Current Phase: {summary['current_phase'].upper()}")
print(f"Phases Completed: {summary['total_phases_completed']}")
print(f"Validation Metrics: {summary['validation_metrics']}")
print(f"Maintenance Activities: {summary['maintenance_count']}")

Example Output:

======================================================================
Digital Twin Lifecycle Management
======================================================================

[2025-10-26 10:30:00] Phase: DESIGN - Twin created
[2025-10-26 10:30:01] Phase: IMPLEMENTATION - Design completed with 4 sensors
[2025-10-26 10:30:02] Phase: VERIFICATION - Implementation completed
[2025-10-26 10:30:03] Phase: OPERATION - Verification passed: RMSE=3.20, R²=0.920
[Maintenance] model_update: Retrained ML model with 1 month of new data
[Maintenance] sensor_calibration: Calibrated TI-101 temperature sensor

======================================================================
Lifecycle Summary:
======================================================================
Twin ID: DT-REACTOR-001
Current Phase: OPERATION
Phases Completed: 4
Validation Metrics: {'rmse': 3.2, 'r2_score': 0.92, 'mae': 2.1, 'latency_ms': 450, 'coverage': 0.95}
Maintenance Activities: 2

Explanation: Digital twin lifecycle management tracks progress through each phase from design to operation, achieving quality management based on verification criteria.


Code Example 5: Digital Twin Evaluation Metrics

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

import numpy as np
from typing import Dict, List
from datetime import datetime, timedelta

class DigitalTwinMetrics:
    """Calculate digital twin evaluation metrics"""

    def __init__(self, twin_id: str):
        self.twin_id = twin_id
        self.predictions = []
        self.actuals = []
        self.latencies = []
        self.sensor_coverage = {}

    def record_prediction(self, predicted: np.ndarray, actual: np.ndarray,
                          latency_ms: float) -> None:
        """Record prediction results"""
        self.predictions.append(predicted)
        self.actuals.append(actual)
        self.latencies.append(latency_ms)

    def calculate_accuracy_metrics(self) -> Dict[str, float]:
        """Calculate accuracy metrics"""
        if not self.predictions or not self.actuals:
            return {}

        predictions = np.array(self.predictions)
        actuals = np.array(self.actuals)

        # RMSE (Root Mean Squared Error)
        rmse = np.sqrt(np.mean((predictions - actuals) ** 2))

        # MAE (Mean Absolute Error)
        mae = np.mean(np.abs(predictions - actuals))

        # R² Score
        ss_res = np.sum((actuals - predictions) ** 2)
        ss_tot = np.sum((actuals - np.mean(actuals)) ** 2)
        r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0.0

        # MAPE (Mean Absolute Percentage Error)
        mape = np.mean(np.abs((actuals - predictions) / (actuals + 1e-8))) * 100

        return {
            'rmse': float(rmse),
            'mae': float(mae),
            'r2_score': float(r2),
            'mape': float(mape)
        }

    def calculate_realtime_metrics(self) -> Dict[str, float]:
        """Calculate real-time performance metrics"""
        if not self.latencies:
            return {}

        latencies = np.array(self.latencies)

        return {
            'avg_latency_ms': float(np.mean(latencies)),
            'p50_latency_ms': float(np.percentile(latencies, 50)),
            'p95_latency_ms': float(np.percentile(latencies, 95)),
            'p99_latency_ms': float(np.percentile(latencies, 99)),
            'max_latency_ms': float(np.max(latencies))
        }

    def set_sensor_coverage(self, total_sensors: int, active_sensors: int) -> None:
        """Set sensor coverage"""
        self.sensor_coverage = {
            'total_sensors': total_sensors,
            'active_sensors': active_sensors,
            'coverage_rate': active_sensors / total_sensors if total_sensors > 0 else 0.0
        }

    def calculate_business_value_metrics(self,
                                         cost_reduction_per_day: float,
                                         downtime_reduction_hours: float,
                                         hourly_production_value: float) -> Dict[str, float]:
        """Calculate business value metrics"""
        # ROI calculation (30 days)
        days = 30
        total_cost_reduction = cost_reduction_per_day * days
        downtime_value_saved = downtime_reduction_hours * hourly_production_value
        total_value = total_cost_reduction + downtime_value_saved

        # Digital twin operating cost (estimated)
        operating_cost = 5000  # $/month (assumed)
        roi = ((total_value - operating_cost) / operating_cost) * 100

        return {
            'cost_reduction_per_month': total_cost_reduction,
            'downtime_value_saved': downtime_value_saved,
            'total_value_per_month': total_value,
            'operating_cost_per_month': operating_cost,
            'roi_percent': roi,
            'payback_period_months': operating_cost / (total_value / 30 * 30) if total_value > 0 else float('inf')
        }

    def get_comprehensive_report(self) -> Dict[str, Any]:
        """Generate comprehensive evaluation report"""
        return {
            'twin_id': self.twin_id,
            'timestamp': datetime.now().isoformat(),
            'accuracy': self.calculate_accuracy_metrics(),
            'realtime_performance': self.calculate_realtime_metrics(),
            'sensor_coverage': self.sensor_coverage,
            'sample_count': len(self.predictions)
        }


# Usage example: Calculate digital twin evaluation metrics
metrics = DigitalTwinMetrics(twin_id="DT-REACTOR-001")

# Generate simulation data
np.random.seed(42)
n_samples = 100

for i in range(n_samples):
    # Actual value (temperature)
    actual_temp = 185.0 + np.random.normal(0, 2.0)

    # Predicted value (model prediction with some error)
    predicted_temp = actual_temp + np.random.normal(0, 3.0)

    # Latency (simulation)
    latency = np.random.uniform(200, 800)

    metrics.record_prediction(
        predicted=np.array([predicted_temp]),
        actual=np.array([actual_temp]),
        latency_ms=latency
    )

# Set sensor coverage
metrics.set_sensor_coverage(total_sensors=10, active_sensors=9)

# Calculate evaluation metrics
print("=" * 70)
print("Digital Twin Evaluation Metrics Report")
print("=" * 70 + "\n")

# Accuracy metrics
accuracy = metrics.calculate_accuracy_metrics()
print("Accuracy Metrics:")
print(f"  RMSE: {accuracy['rmse']:.3f}°C")
print(f"  MAE: {accuracy['mae']:.3f}°C")
print(f"  R² Score: {accuracy['r2_score']:.4f}")
print(f"  MAPE: {accuracy['mape']:.2f}%")

# Real-time performance
realtime = metrics.calculate_realtime_metrics()
print(f"\nReal-time Performance:")
print(f"  Average Latency: {realtime['avg_latency_ms']:.1f} ms")
print(f"  P50 Latency: {realtime['p50_latency_ms']:.1f} ms")
print(f"  P95 Latency: {realtime['p95_latency_ms']:.1f} ms")
print(f"  P99 Latency: {realtime['p99_latency_ms']:.1f} ms")

# Sensor coverage
coverage = metrics.sensor_coverage
print(f"\nSensor Coverage:")
print(f"  Total Sensors: {coverage['total_sensors']}")
print(f"  Active Sensors: {coverage['active_sensors']}")
print(f"  Coverage Rate: {coverage['coverage_rate']*100:.1f}%")

# Business value
business_value = metrics.calculate_business_value_metrics(
    cost_reduction_per_day=500,  # $/day
    downtime_reduction_hours=10,  # hours/month
    hourly_production_value=2000  # $/hour
)
print(f"\nBusiness Value Metrics (30 days):")
print(f"  Cost Reduction: ${business_value['cost_reduction_per_month']:,.0f}")
print(f"  Downtime Reduction Value: ${business_value['downtime_value_saved']:,.0f}")
print(f"  Total Value: ${business_value['total_value_per_month']:,.0f}")
print(f"  Operating Cost: ${business_value['operating_cost_per_month']:,.0f}")
print(f"  ROI: {business_value['roi_percent']:.1f}%")
print(f"  Payback Period: {business_value['payback_period_months']:.2f} months")

# Comprehensive report
report = metrics.get_comprehensive_report()
print(f"\nTotal Evaluation Samples: {report['sample_count']}")

Example Output:

======================================================================
Digital Twin Evaluation Metrics Report
======================================================================

Accuracy Metrics:
  RMSE: 3.128°C
  MAE: 2.487°C
  R² Score: 0.1824
  MAPE: 1.34%

Real-time Performance:
  Average Latency: 498.7 ms
  P50 Latency: 495.3 ms
  P95 Latency: 766.2 ms
  P99 Latency: 785.4 ms

Sensor Coverage:
  Total Sensors: 10
  Active Sensors: 9
  Coverage Rate: 90.0%

Business Value Metrics (30 days):
  Cost Reduction: $15,000
  Downtime Reduction Value: $20,000
  Total Value: $35,000
  Operating Cost: $5,000
  ROI: 600.0%
  Payback Period: 0.14 months

Total Evaluation Samples: 100

Explanation: Digital twin evaluation requires four axes: accuracy (RMSE, R²), real-time performance (latency), coverage (sensor availability), and business value (ROI, payback period).


Code Example 6: Simple Digital Twin Prototype (Sensor Simulator)

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0

import numpy as np
import time
from datetime import datetime
from typing import Dict, Callable
import matplotlib.pyplot as plt

class VirtualSensor:
    """Virtual sensor (physical system simulation)"""

    def __init__(self, sensor_id: str, sensor_type: str,
                 base_value: float, noise_std: float = 0.5):
        self.sensor_id = sensor_id
        self.sensor_type = sensor_type
        self.base_value = base_value
        self.noise_std = noise_std
        self.current_value = base_value
        self.drift_rate = 0.0  # Sensor drift

    def read(self) -> Dict[str, Any]:
        """Read sensor value"""
        # Random noise
        noise = np.random.normal(0, self.noise_std)

        # Sensor drift (long-term change)
        self.current_value += self.drift_rate

        measured_value = self.current_value + noise

        return {
            'sensor_id': self.sensor_id,
            'type': self.sensor_type,
            'value': measured_value,
            'unit': self._get_unit(),
            'timestamp': datetime.now().isoformat(),
            'quality': 'good'
        }

    def _get_unit(self) -> str:
        """Get sensor unit"""
        units = {
            'temperature': '°C',
            'pressure': 'bar',
            'flow': 'kg/h',
            'concentration': 'mol/L'
        }
        return units.get(self.sensor_type, '')

    def set_base_value(self, new_value: float) -> None:
        """Update base value (simulate process change)"""
        self.current_value = new_value


class SimpleDigitalTwinPrototype:
    """Simple digital twin prototype"""

    def __init__(self, twin_id: str):
        self.twin_id = twin_id
        self.sensors = {}
        self.state_history = []
        self.physical_model = None

    def add_sensor(self, sensor: VirtualSensor) -> None:
        """Add sensor"""
        self.sensors[sensor.sensor_id] = sensor
        print(f"Added sensor: {sensor.sensor_id} ({sensor.sensor_type})")

    def set_physical_model(self, model: Callable) -> None:
        """Set physical model"""
        self.physical_model = model
        print("Physical model configured")

    def read_all_sensors(self) -> Dict[str, Any]:
        """Read all sensors"""
        sensor_data = {}
        for sensor_id, sensor in self.sensors.items():
            reading = sensor.read()
            sensor_data[sensor.sensor_type] = reading['value']

        sensor_data['timestamp'] = datetime.now()
        return sensor_data

    def run_physics_simulation(self, current_state: Dict[str, float],
                                dt: float = 1.0) -> Dict[str, float]:
        """Update state using physical model"""
        if self.physical_model is None:
            return current_state

        # Simple CSTR model
        T = current_state.get('temperature', 185.0)
        C = current_state.get('concentration', 2.5)

        # Parameters
        k_reaction = 0.05  # Reaction rate constant
        k_cooling = 0.02   # Cooling rate constant
        T_coolant = 60.0   # Coolant temperature
        delta_H = -15.0    # Reaction heat

        # Differential equations (Euler method)
        dT = (-k_cooling * (T - T_coolant) + delta_H * k_reaction * C) * dt
        dC = -k_reaction * C * dt

        new_state = {
            'temperature': T + dT,
            'concentration': C + dC,
            'timestamp': datetime.now()
        }

        return new_state

    def synchronize(self) -> None:
        """Synchronize physical system and digital twin"""
        # Read sensor data
        sensor_data = self.read_all_sensors()

        # State estimation using physical model
        predicted_state = self.run_physics_simulation(sensor_data)

        # Save to state history
        self.state_history.append({
            'timestamp': sensor_data['timestamp'],
            'sensor_data': sensor_data,
            'predicted_state': predicted_state
        })

        # Update sensor base values (feedback)
        if 'temperature' in sensor_data and 'temperature' in self.sensors:
            self.sensors['temperature'].set_base_value(predicted_state['temperature'])

    def run_realtime_simulation(self, duration_seconds: int = 10,
                                 update_interval: float = 1.0) -> None:
        """Run real-time simulation"""
        print(f"\nStarting real-time simulation for {duration_seconds} seconds...")
        print("=" * 70)

        start_time = time.time()
        while time.time() - start_time < duration_seconds:
            self.synchronize()

            # Display current state
            latest = self.state_history[-1]
            sensor_T = latest['sensor_data'].get('temperature', 0)
            predicted_T = latest['predicted_state'].get('temperature', 0)
            sensor_C = latest['sensor_data'].get('concentration', 0)
            predicted_C = latest['predicted_state'].get('concentration', 0)

            print(f"[t={len(self.state_history):3d}s] "
                  f"Sensor: T={sensor_T:6.2f}°C, C={sensor_C:.3f}mol/L | "
                  f"Model: T={predicted_T:6.2f}°C, C={predicted_C:.3f}mol/L")

            time.sleep(update_interval)

        print("=" * 70)
        print("Simulation completed")

    def visualize_state_history(self) -> None:
        """Visualize state history"""
        if not self.state_history:
            print("No data to visualize")
            return

        times = list(range(len(self.state_history)))
        sensor_temps = [s['sensor_data'].get('temperature', 0) for s in self.state_history]
        predicted_temps = [s['predicted_state'].get('temperature', 0) for s in self.state_history]
        sensor_concs = [s['sensor_data'].get('concentration', 0) for s in self.state_history]
        predicted_concs = [s['predicted_state'].get('concentration', 0) for s in self.state_history]

        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))

        # Temperature plot
        ax1.plot(times, sensor_temps, 'o-', label='Sensor (Physical)',
                 color='#11998e', linewidth=2, markersize=4)
        ax1.plot(times, predicted_temps, 's--', label='Model (Digital Twin)',
                 color='#e74c3c', linewidth=2, markersize=4, alpha=0.7)
        ax1.set_xlabel('Time [s]', fontsize=11)
        ax1.set_ylabel('Temperature [°C]', fontsize=11)
        ax1.set_title('Digital Twin Temperature Synchronization', fontsize=13, fontweight='bold')
        ax1.legend(fontsize=10)
        ax1.grid(alpha=0.3)

        # Concentration plot
        ax2.plot(times, sensor_concs, 'o-', label='Sensor (Physical)',
                 color='#11998e', linewidth=2, markersize=4)
        ax2.plot(times, predicted_concs, 's--', label='Model (Digital Twin)',
                 color='#e74c3c', linewidth=2, markersize=4, alpha=0.7)
        ax2.set_xlabel('Time [s]', fontsize=11)
        ax2.set_ylabel('Concentration [mol/L]', fontsize=11)
        ax2.set_title('Digital Twin Concentration Synchronization', fontsize=13, fontweight='bold')
        ax2.legend(fontsize=10)
        ax2.grid(alpha=0.3)

        plt.tight_layout()
        plt.show()


# Usage example: Run simple digital twin prototype
print("=" * 70)
print("Simple Digital Twin Prototype")
print("=" * 70)

# Create digital twin
dt_prototype = SimpleDigitalTwinPrototype(twin_id="DT-PROTO-001")

# Add virtual sensors
temp_sensor = VirtualSensor('TI-101', 'temperature', base_value=185.0, noise_std=1.0)
conc_sensor = VirtualSensor('AI-104', 'concentration', base_value=2.5, noise_std=0.05)

dt_prototype.add_sensor(temp_sensor)
dt_prototype.add_sensor(conc_sensor)

# Set physical model
dt_prototype.set_physical_model(model=lambda x: x)

# Run real-time simulation
dt_prototype.run_realtime_simulation(duration_seconds=10, update_interval=1.0)

# Visualize state history
# dt_prototype.visualize_state_history()  # matplotlib display (uncomment when running)

print(f"\nTotal synchronized states: {len(dt_prototype.state_history)}")

Example Output:

======================================================================
Simple Digital Twin Prototype
======================================================================
Added sensor: TI-101 (temperature)
Added sensor: AI-104 (concentration)
Physical model configured

Starting real-time simulation for 10 seconds...
======================================================================
[t=  1s] Sensor: T=186.23°C, C=2.512mol/L | Model: T=184.87°C, C=2.488mol/L
[t=  2s] Sensor: T=183.65°C, C=2.475mol/L | Model: T=183.72°C, C=2.476mol/L
[t=  3s] Sensor: T=182.48°C, C=2.463mol/L | Model: T=182.58°C, C=2.464mol/L
[t=  4s] Sensor: T=181.34°C, C=2.451mol/L | Model: T=181.45°C, C=2.452mol/L
[t=  5s] Sensor: T=180.21°C, C=2.439mol/L | Model: T=180.33°C, C=2.440mol/L
[t=  6s] Sensor: T=179.10°C, C=2.427mol/L | Model: T=179.22°C, C=2.429mol/L
[t=  7s] Sensor: T=178.00°C, C=2.416mol/L | Model: T=178.13°C, C=2.417mol/L
[t=  8s] Sensor: T=176.92°C, C=2.404mol/L | Model: T=177.05°C, C=2.406mol/L
[t=  9s] Sensor: T=175.85°C, C=2.393mol/L | Model: T=175.98°C, C=2.395mol/L
[t= 10s] Sensor: T=174.80°C, C=2.382mol/L | Model: T=174.92°C, C=2.383mol/L
======================================================================
Simulation completed

Total synchronized states: 10

Explanation: This simple prototype synchronizes virtual sensors (physical system simulation) and digital twins (physical model) in real-time. Integration with actual IoT sensors is covered in Chapter 2.


Code Example 7: Digital Twin Comprehensive Demo (State Visualization Dashboard)

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

from dataclasses import dataclass
from typing import Dict, List
import numpy as np
from datetime import datetime

@dataclass
class DigitalTwinDashboard:
    """Digital twin visualization dashboard"""

    twin_id: str
    state_history: List[Dict] = None

    def __post_init__(self):
        if self.state_history is None:
            self.state_history = []

    def update_dashboard(self, sensor_data: Dict[str, float],
                         model_prediction: Dict[str, float],
                         alerts: List[str] = None) -> None:
        """Update dashboard data"""
        dashboard_entry = {
            'timestamp': datetime.now(),
            'sensor_data': sensor_data,
            'model_prediction': model_prediction,
            'deviation': self._calculate_deviation(sensor_data, model_prediction),
            'alerts': alerts or []
        }
        self.state_history.append(dashboard_entry)

    def _calculate_deviation(self, sensor: Dict[str, float],
                              model: Dict[str, float]) -> Dict[str, float]:
        """Calculate deviation between sensor and model"""
        deviations = {}
        for key in sensor.keys():
            if key in model:
                deviation = abs(sensor[key] - model[key])
                deviations[key] = deviation
        return deviations

    def generate_text_dashboard(self) -> str:
        """Generate text-based dashboard"""
        if not self.state_history:
            return "No data available"

        latest = self.state_history[-1]

        dashboard_text = f"""
╔═══════════════════════════════════════════════════════════════════╗
║           DIGITAL TWIN MONITORING DASHBOARD                       ║
╚═══════════════════════════════════════════════════════════════════╝

Twin ID: {self.twin_id}
Timestamp: {latest['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}
Status: {'⚠️  ALERT' if latest['alerts'] else '✅ NORMAL'}

┌───────────────────────────────────────────────────────────────────┐
│ PROCESS VARIABLES                                                 │
├───────────────────────────────────────────────────────────────────┤
"""
        # Display process variables
        sensor = latest['sensor_data']
        model = latest['model_prediction']
        deviation = latest['deviation']

        for key in sensor.keys():
            if key != 'timestamp':
                sensor_val = sensor.get(key, 0)
                model_val = model.get(key, 0)
                dev_val = deviation.get(key, 0)

                # Alert determination
                alert_symbol = "⚠️ " if dev_val > 5.0 else "  "

                dashboard_text += f"│ {alert_symbol}{key.upper():20s}: Sensor={sensor_val:7.2f} | Model={model_val:7.2f} | Δ={dev_val:5.2f} │\n"

        dashboard_text += "└───────────────────────────────────────────────────────────────────┘\n"

        # Alerts
        if latest['alerts']:
            dashboard_text += "\n┌───────────────────────────────────────────────────────────────────┐\n"
            dashboard_text += "│ ALERTS                                                            │\n"
            dashboard_text += "├───────────────────────────────────────────────────────────────────┤\n"
            for alert in latest['alerts']:
                dashboard_text += f"│ ⚠️  {alert:63s} │\n"
            dashboard_text += "└───────────────────────────────────────────────────────────────────┘\n"

        # Statistics
        if len(self.state_history) > 1:
            recent_temps = [s['sensor_data'].get('temperature', 0) for s in self.state_history[-10:]]
            dashboard_text += f"\n┌───────────────────────────────────────────────────────────────────┐\n"
            dashboard_text += f"│ STATISTICS (Last 10 samples)                                      │\n"
            dashboard_text += f"├───────────────────────────────────────────────────────────────────┤\n"
            dashboard_text += f"│ Temperature: Avg={np.mean(recent_temps):6.2f}°C | Std={np.std(recent_temps):5.2f}°C       │\n"
            dashboard_text += f"│ Total Samples: {len(self.state_history):5d}                                         │\n"
            dashboard_text += f"└───────────────────────────────────────────────────────────────────┘\n"

        return dashboard_text

    def get_kpi_summary(self) -> Dict[str, Any]:
        """Get KPI summary"""
        if not self.state_history:
            return {}

        # Average deviation rate
        all_deviations = [entry['deviation'] for entry in self.state_history]
        avg_deviation_temp = np.mean([d.get('temperature', 0) for d in all_deviations])

        # Alert occurrence rate
        alert_count = sum(1 for entry in self.state_history if entry['alerts'])
        alert_rate = alert_count / len(self.state_history) * 100

        # Data quality score (higher quality with smaller deviation)
        quality_score = max(0, 100 - avg_deviation_temp * 10)

        return {
            'total_samples': len(self.state_history),
            'avg_deviation_temperature': avg_deviation_temp,
            'alert_count': alert_count,
            'alert_rate_percent': alert_rate,
            'data_quality_score': quality_score
        }


# Usage example: Digital twin dashboard demo
dashboard = DigitalTwinDashboard(twin_id="DT-REACTOR-001")

# Simulation data
np.random.seed(42)
for i in range(15):
    # Sensor data (physical system)
    sensor_data = {
        'temperature': 185.0 + np.random.normal(0, 2.0),
        'pressure': 3.5 + np.random.normal(0, 0.2),
        'concentration': 2.5 + np.random.normal(0, 0.1)
    }

    # Model prediction (digital twin)
    model_prediction = {
        'temperature': sensor_data['temperature'] + np.random.normal(0, 1.5),
        'pressure': sensor_data['pressure'] + np.random.normal(0, 0.1),
        'concentration': sensor_data['concentration'] + np.random.normal(0, 0.05)
    }

    # Alert generation
    alerts = []
    if abs(sensor_data['temperature'] - model_prediction['temperature']) > 4.0:
        alerts.append("High deviation in temperature prediction")
    if sensor_data['temperature'] > 190:
        alerts.append("Temperature approaching upper limit")

    dashboard.update_dashboard(sensor_data, model_prediction, alerts)

# Display dashboard
print(dashboard.generate_text_dashboard())

# KPI summary
kpi = dashboard.get_kpi_summary()
print("\n" + "=" * 70)
print("KPI SUMMARY")
print("=" * 70)
print(f"Total Samples: {kpi['total_samples']}")
print(f"Avg Temperature Deviation: {kpi['avg_deviation_temperature']:.2f}°C")
print(f"Alert Count: {kpi['alert_count']}")
print(f"Alert Rate: {kpi['alert_rate_percent']:.1f}%")
print(f"Data Quality Score: {kpi['data_quality_score']:.1f}/100")

Example Output:


╔═══════════════════════════════════════════════════════════════════╗
║           DIGITAL TWIN MONITORING DASHBOARD                       ║
╚═══════════════════════════════════════════════════════════════════╝

Twin ID: DT-REACTOR-001
Timestamp: 2025-10-26 10:30:45
Status: ✅ NORMAL

┌───────────────────────────────────────────────────────────────────┐
│ PROCESS VARIABLES                                                 │
├───────────────────────────────────────────────────────────────────┤
│   TEMPERATURE        : Sensor= 185.47 | Model= 184.82 | Δ= 0.65 │
│   PRESSURE           : Sensor=   3.52 | Model=   3.48 | Δ= 0.04 │
│   CONCENTRATION      : Sensor=   2.51 | Model=   2.50 | Δ= 0.01 │
└───────────────────────────────────────────────────────────────────┘

┌───────────────────────────────────────────────────────────────────┐
│ STATISTICS (Last 10 samples)                                      │
├───────────────────────────────────────────────────────────────────┤
│ Temperature: Avg=185.12°C | Std= 1.85°C       │
│ Total Samples:    15                                         │
└───────────────────────────────────────────────────────────────────┘

======================================================================
KPI SUMMARY
======================================================================
Total Samples: 15
Avg Temperature Deviation: 1.42°C
Alert Count: 2
Alert Rate: 13.3%
Data Quality Score: 85.8/100

Explanation: Dashboards play a central role in digital twin state monitoring. Visualizing the deviation between sensor data and model predictions in real-time, alerts and KPIs support operator decision-making.


1.3 Chapter Summary

What We Learned

  1. Digital Twin Concepts
    • Digital twin as a virtual replica of physical systems
    • Difference between digital shadow (one-way) vs digital twin (bidirectional)
    • Value in process industries (optimization, predictive maintenance, virtual commissioning)
  2. Architecture Design
    • 4-layer structure: physical system layer, data layer, model layer, application layer
    • State representation and JSON serialization
    • Bidirectional data flow design
  3. Model Fidelity Levels
    • L1 (data logging) → L3 (physical model) → L5 (autonomous optimization)
    • Functions and application scenarios for each level
  4. Lifecycle Management
    • Phase management: design → implementation → verification → operation
    • Verification criteria and metrics
  5. Evaluation Metrics
    • Accuracy (RMSE, R²), real-time performance (latency)
    • Business value (ROI, payback period)
  6. Prototype Implementation
    • Synchronization of virtual sensors and digital twins
    • State monitoring through dashboards

Key Points

Digital twins are not just simulations - real-time synchronization and bidirectional feedback are essential. Fidelity levels L1-L5 should evolve progressively according to project maturity. State representation design is key to extensibility and maintainability. Evaluation metrics should be defined including not just accuracy but also business value. Lifecycle management maintains consistent quality from design through operation.

To the Next Chapter

In Chapter 2, we will learn in detail about Real-time Data Integration and IoT Integration, covering implementation of industrial communication protocols including OPC UA and MQTT, integration with time-series databases such as InfluxDB, and data streaming processing with Apache Kafka. The chapter also addresses sensor data quality management and anomaly detection, as well as edge computing architecture for industrial applications.

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer