🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 5: Deploying Digital Twins

From Research to Production: Practical Deployment Strategies

📚 Difficulty: Intermediate to Advanced ⏱️ Reading Time: 35-40 min 🔧 Python Examples: 7

This chapter covers Deploying Digital Twins. You will learn differences between edge, benefits of containerization, and RESTful API design principles.

5.1 Edge Computing Deployment

To achieve low-latency real-time inference at plant sites, we deploy digital twins on edge devices.

5.1.1 Lightweight Model Deployment

import pickle
import numpy as np
from pathlib import Path
import json
import time

class EdgeDigitalTwin:
    """Lightweight Digital Twin for Edge Devices"""

    def __init__(self, model_path):
        self.model = None
        self.scaler = None
        self.config = {}
        self.load_model(model_path)

    def load_model(self, model_path):
        """Load model"""
        model_dir = Path(model_path)

        # Model file
        with open(model_dir / 'model.pkl', 'rb') as f:
            self.model = pickle.load(f)

        # Scaler
        with open(model_dir / 'scaler.pkl', 'rb') as f:
            self.scaler = pickle.load(f)

        # Configuration file
        with open(model_dir / 'config.json', 'r') as f:
            self.config = json.load(f)

        print(f"Model loaded: {self.config['model_type']}")
        print(f"Features: {self.config['features']}")

    def predict(self, sensor_data):
        """Real-time inference

        Args:
            sensor_data: Dictionary of {feature_name: value}

        Returns:
            Prediction result and confidence interval
        """
        start_time = time.time()

        # Feature extraction
        X = np.array([[sensor_data[f] for f in self.config['features']]])

        # Preprocessing
        X_scaled = self.scaler.transform(X)

        # Inference
        prediction = self.model.predict(X_scaled)[0]

        # Confidence interval (for ensemble models)
        if hasattr(self.model, 'estimators_'):
            predictions = [est.predict(X_scaled)[0]
                          for est in self.model.estimators_]
            confidence_interval = (
                np.percentile(predictions, 5),
                np.percentile(predictions, 95)
            )
        else:
            confidence_interval = None

        inference_time = (time.time() - start_time) * 1000  # ms

        return {
            'prediction': prediction,
            'confidence_interval': confidence_interval,
            'inference_time_ms': inference_time,
            'timestamp': time.time()
        }

    def validate_input(self, sensor_data):
        """Validate input data"""
        missing_features = [f for f in self.config['features']
                           if f not in sensor_data]

        if missing_features:
            return False, f"Missing features: {missing_features}"

        # Range check
        for feature, value in sensor_data.items():
            if feature in self.config.get('valid_ranges', {}):
                min_val, max_val = self.config['valid_ranges'][feature]
                if not (min_val <= value <= max_val):
                    return False, f"{feature}={value} out of range [{min_val}, {max_val}]"

        return True, "OK"

    def get_model_info(self):
        """Get model information"""
        return {
            'type': self.config['model_type'],
            'version': self.config.get('version', 'unknown'),
            'features': self.config['features'],
            'trained_date': self.config.get('trained_date', 'unknown')
        }

# Model packaging function
def package_model_for_edge(model, scaler, features, output_dir,
                           valid_ranges=None):
    """Package model for edge deployment"""
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    # Save model
    with open(output_path / 'model.pkl', 'wb') as f:
        pickle.dump(model, f)

    # Save scaler
    with open(output_path / 'scaler.pkl', 'wb') as f:
        pickle.dump(scaler, f)

    # Configuration file
    config = {
        'model_type': type(model).__name__,
        'features': features,
        'version': '1.0.0',
        'trained_date': time.strftime('%Y-%m-%d'),
        'valid_ranges': valid_ranges or {}
    }

    with open(output_path / 'config.json', 'w') as f:
        json.dump(config, f, indent=2)

    print(f"Model packaged to {output_path}")

# Usage example
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler

# Model training (simple example)
np.random.seed(42)
X_train = np.random.rand(100, 3)
y_train = X_train[:, 0] * 10 + X_train[:, 1] * 5 + np.random.randn(100)

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_train)

model = RandomForestRegressor(n_estimators=10, max_depth=5, random_state=42)
model.fit(X_scaled, y_train)

# Packaging
package_model_for_edge(
    model, scaler,
    features=['temp', 'pressure', 'flow'],
    output_dir='./edge_model',
    valid_ranges={'temp': (300, 400), 'pressure': (1, 10), 'flow': (50, 150)}
)

# Usage on edge device
edge_twin = EdgeDigitalTwin('./edge_model')

# Real-time inference
sensor_reading = {'temp': 350, 'pressure': 5.5, 'flow': 100}
result = edge_twin.predict(sensor_reading)

print(f"Prediction: {result['prediction']:.2f}")
print(f"Inference Time: {result['inference_time_ms']:.2f} ms")
if result['confidence_interval']:
    print(f"90% CI: [{result['confidence_interval'][0]:.2f}, "
          f"{result['confidence_interval'][1]:.2f}]")
💡 Edge Deployment Best Practices:
  • Minimize model size (quantization, pruning)
  • Keep inference time under 1 second (real-time constraints)
  • Always implement input validation (anomaly prevention)
  • Provide fail-safe mode

5.2 Cloud Deployment Architecture

Host digital twins in a scalable cloud environment to enable access from multiple plants.

5.2.1 Microservice Architecture

from flask import Flask, request, jsonify
from functools import wraps
import logging
from datetime import datetime

class DigitalTwinMicroservice:
    """Digital Twin Microservice"""

    def __init__(self, model_path, service_name='digital-twin'):
        self.app = Flask(service_name)
        self.model = EdgeDigitalTwin(model_path)
        self.request_count = 0

        # Logging setup
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(service_name)

        self._setup_routes()

    def _setup_routes(self):
        """Setup API endpoints"""

        @self.app.route('/health', methods=['GET'])
        def health_check():
            """Health check"""
            return jsonify({
                'status': 'healthy',
                'service': 'digital-twin',
                'model_version': self.model.config.get('version', 'unknown'),
                'timestamp': datetime.now().isoformat()
            })

        @self.app.route('/predict', methods=['POST'])
        @self._log_request
        def predict():
            """Prediction endpoint"""
            try:
                data = request.get_json()

                # Input validation
                valid, message = self.model.validate_input(data)
                if not valid:
                    return jsonify({'error': message}), 400

                # Execute inference
                result = self.model.predict(data)

                self.request_count += 1

                return jsonify({
                    'success': True,
                    'result': result,
                    'request_id': self.request_count
                })

            except Exception as e:
                self.logger.error(f"Prediction error: {str(e)}")
                return jsonify({'error': str(e)}), 500

        @self.app.route('/model/info', methods=['GET'])
        def model_info():
            """Model information"""
            return jsonify(self.model.get_model_info())

        @self.app.route('/metrics', methods=['GET'])
        def metrics():
            """Metrics"""
            return jsonify({
                'total_requests': self.request_count,
                'model_version': self.model.config.get('version'),
                'uptime_seconds': self._get_uptime()
            })

    def _log_request(self, f):
        """Request logging decorator"""
        @wraps(f)
        def decorated_function(*args, **kwargs):
            self.logger.info(f"Request: {request.method} {request.path}")
            response = f(*args, **kwargs)
            self.logger.info(f"Response: {response[1] if isinstance(response, tuple) else 200}")
            return response
        return decorated_function

    def _get_uptime(self):
        """Uptime (simplified version)"""
        return 0  # Implementation omitted

    def run(self, host='0.0.0.0', port=5000, debug=False):
        """Start service"""
        self.logger.info(f"Starting Digital Twin Microservice on {host}:{port}")
        self.app.run(host=host, port=port, debug=debug)

# Usage example (at runtime)
# service = DigitalTwinMicroservice('./edge_model')
# service.run(port=5000)

# Client-side usage example
import requests

def call_digital_twin_api(sensor_data, api_url='http://localhost:5000'):
    """Call Digital Twin API"""
    try:
        # Inference request
        response = requests.post(
            f"{api_url}/predict",
            json=sensor_data,
            timeout=5.0
        )

        if response.status_code == 200:
            return response.json()
        else:
            print(f"Error: {response.status_code}")
            return None

    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        return None

# API call example
sensor_data = {'temp': 360, 'pressure': 6.0, 'flow': 100}
result = call_digital_twin_api(sensor_data)

if result and result['success']:
    print(f"Prediction: {result['result']['prediction']:.2f}")
    print(f"Inference time: {result['result']['inference_time_ms']:.2f} ms")

5.3 Docker Containerization

Containerize digital twins for environment-independent and easy deployment.

# Dockerfile example (displayed as text within Python code)

DOCKERFILE_CONTENT = """
# Base image
FROM python:3.9-slim

# Working directory
WORKDIR /app

# System dependencies
RUN apt-get update && apt-get install -y \\
    gcc \\
    && rm -rf /var/lib/apt/lists/*

# Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Application code
COPY ./digital_twin /app/digital_twin
COPY ./edge_model /app/edge_model

# Run as non-root user
RUN useradd -m -u 1000 dtuser && chown -R dtuser:dtuser /app
USER dtuser

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \\
    CMD python -c "import requests; requests.get('http://localhost:5000/health')"

# Expose port
EXPOSE 5000

# Startup command
CMD ["python", "-m", "digital_twin.service"]
"""

# requirements.txt content
REQUIREMENTS_CONTENT = """
flask==2.3.0
numpy==1.24.0
scikit-learn==1.3.0
requests==2.31.0
gunicorn==21.0.0
"""

# Docker compose configuration
DOCKER_COMPOSE_CONTENT = """
version: '3.8'

services:
  digital-twin:
    build: .
    ports:
      - "5000:5000"
    environment:
      - MODEL_PATH=/app/edge_model
      - LOG_LEVEL=INFO
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
      interval: 30s
      timeout: 5s
      retries: 3

  # Load balancer (for multiple instances)
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - digital-twin
    restart: unless-stopped
"""

# Docker build and run script
class DockerDeployment:
    """Docker Deployment Management"""

    @staticmethod
    def create_dockerfile(output_dir='.'):
        """Create Dockerfile"""
        with open(f"{output_dir}/Dockerfile", 'w') as f:
            f.write(DOCKERFILE_CONTENT)
        print("Dockerfile created")

    @staticmethod
    def create_requirements(output_dir='.'):
        """Create requirements.txt"""
        with open(f"{output_dir}/requirements.txt", 'w') as f:
            f.write(REQUIREMENTS_CONTENT)
        print("requirements.txt created")

    @staticmethod
    def create_docker_compose(output_dir='.'):
        """Create docker-compose.yml"""
        with open(f"{output_dir}/docker-compose.yml", 'w') as f:
            f.write(DOCKER_COMPOSE_CONTENT)
        print("docker-compose.yml created")

    @staticmethod
    def build_image(tag='digital-twin:latest'):
        """Build Docker image"""
        import subprocess
        cmd = f"docker build -t {tag} ."
        subprocess.run(cmd, shell=True)
        print(f"Image built: {tag}")

    @staticmethod
    def run_container(tag='digital-twin:latest', port=5000):
        """Start container"""
        import subprocess
        cmd = f"docker run -d -p {port}:5000 --name dt-service {tag}"
        subprocess.run(cmd, shell=True)
        print(f"Container started on port {port}")

# Usage example
# deployment = DockerDeployment()
# deployment.create_dockerfile()
# deployment.create_requirements()
# deployment.create_docker_compose()
# deployment.build_image()
# deployment.run_container()

print("Docker deployment files ready")
print("To build: docker build -t digital-twin .")
print("To run: docker-compose up -d")

5.4 RESTful API Design

Enable access to digital twins through standard RESTful APIs.

from flask import Flask, request, jsonify, make_response
from functools import wraps
import jwt
import datetime

class SecureDigitalTwinAPI:
    """Secure Digital Twin API"""

    def __init__(self, model_path, secret_key):
        self.app = Flask(__name__)
        self.app.config['SECRET_KEY'] = secret_key
        self.model = EdgeDigitalTwin(model_path)
        self._setup_secure_routes()

    def _token_required(self, f):
        """JWT authentication decorator"""
        @wraps(f)
        def decorated(*args, **kwargs):
            token = request.headers.get('Authorization')

            if not token:
                return jsonify({'message': 'Token is missing'}), 401

            try:
                # "Bearer " format
                token = token.split()[1] if ' ' in token else token
                data = jwt.decode(token, self.app.config['SECRET_KEY'],
                                 algorithms=['HS256'])
                current_user = data['user']
            except:
                return jsonify({'message': 'Token is invalid'}), 401

            return f(current_user, *args, **kwargs)

        return decorated

    def _setup_secure_routes(self):
        """Setup secure API endpoints"""

        @self.app.route('/api/v1/auth/login', methods=['POST'])
        def login():
            """Authentication (token issuance)"""
            auth = request.get_json()

            # In practice, verify user with DB
            if auth.get('username') == 'admin' and auth.get('password') == 'secret':
                token = jwt.encode({
                    'user': auth['username'],
                    'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)
                }, self.app.config['SECRET_KEY'], algorithm='HS256')

                return jsonify({'token': token})

            return jsonify({'message': 'Invalid credentials'}), 401

        @self.app.route('/api/v1/twin/predict', methods=['POST'])
        @self._token_required
        def predict(current_user):
            """Prediction (authentication required)"""
            data = request.get_json()

            valid, message = self.model.validate_input(data)
            if not valid:
                return jsonify({'error': message}), 400

            result = self.model.predict(data)

            return jsonify({
                'user': current_user,
                'prediction': result['prediction'],
                'confidence_interval': result['confidence_interval'],
                'timestamp': result['timestamp']
            })

        @self.app.route('/api/v1/twin/batch', methods=['POST'])
        @self._token_required
        def batch_predict(current_user):
            """Batch prediction"""
            data = request.get_json()
            inputs = data.get('inputs', [])

            if len(inputs) > 100:
                return jsonify({'error': 'Batch size limit: 100'}), 400

            results = []
            for inp in inputs:
                valid, _ = self.model.validate_input(inp)
                if valid:
                    result = self.model.predict(inp)
                    results.append({
                        'input': inp,
                        'prediction': result['prediction']
                    })

            return jsonify({
                'user': current_user,
                'total': len(inputs),
                'successful': len(results),
                'results': results
            })

        @self.app.route('/api/v1/twin/status', methods=['GET'])
        @self._token_required
        def status(current_user):
            """Status check"""
            return jsonify({
                'status': 'operational',
                'model_info': self.model.get_model_info(),
                'user': current_user
            })

# Usage example (client side)
class DigitalTwinClient:
    """Digital Twin API Client"""

    def __init__(self, api_url, username, password):
        self.api_url = api_url
        self.token = None
        self._authenticate(username, password)

    def _authenticate(self, username, password):
        """Authentication"""
        response = requests.post(
            f"{self.api_url}/api/v1/auth/login",
            json={'username': username, 'password': password}
        )

        if response.status_code == 200:
            self.token = response.json()['token']
            print("Authenticated successfully")
        else:
            raise Exception("Authentication failed")

    def predict(self, sensor_data):
        """Prediction request"""
        headers = {'Authorization': f'Bearer {self.token}'}

        response = requests.post(
            f"{self.api_url}/api/v1/twin/predict",
            json=sensor_data,
            headers=headers
        )

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Prediction failed: {response.status_code}")

    def batch_predict(self, inputs_list):
        """Batch prediction"""
        headers = {'Authorization': f'Bearer {self.token}'}

        response = requests.post(
            f"{self.api_url}/api/v1/twin/batch",
            json={'inputs': inputs_list},
            headers=headers
        )

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Batch prediction failed: {response.status_code}")

# Usage example
# client = DigitalTwinClient('http://localhost:5000', 'admin', 'secret')
# result = client.predict({'temp': 360, 'pressure': 6.0, 'flow': 100})
# print(f"Prediction: {result['prediction']}")

print("Secure API implementation ready")

5.5 Visualization Dashboard

Build a real-time monitoring dashboard using Plotly Dash.

import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.graph_objs as go
import pandas as pd
from collections import deque
import threading
import time

class DigitalTwinDashboard:
    """Digital Twin Dashboard"""

    def __init__(self, digital_twin_api_url):
        self.api_url = digital_twin_api_url
        self.app = dash.Dash(__name__)

        # Data buffers (latest 100 points)
        self.time_buffer = deque(maxlen=100)
        self.prediction_buffer = deque(maxlen=100)
        self.temp_buffer = deque(maxlen=100)
        self.pressure_buffer = deque(maxlen=100)

        self._setup_layout()
        self._setup_callbacks()

    def _setup_layout(self):
        """Layout setup"""
        self.app.layout = html.Div([
            html.H1("Digital Twin Dashboard",
                   style={'textAlign': 'center', 'color': '#2c3e50'}),

            html.Div([
                html.Div([
                    html.H3("Current Status"),
                    html.Div(id='status-display',
                            style={'fontSize': '24px', 'fontWeight': 'bold'})
                ], style={'width': '48%', 'display': 'inline-block'}),

                html.Div([
                    html.H3("Model Info"),
                    html.Div(id='model-info')
                ], style={'width': '48%', 'float': 'right',
                         'display': 'inline-block'})
            ]),

            html.Div([
                html.H3("Input Controls"),
                html.Label("Temperature (°C):"),
                dcc.Slider(id='temp-slider', min=340, max=380, value=360,
                          marks={i: str(i) for i in range(340, 381, 10)}),

                html.Label("Pressure (bar):"),
                dcc.Slider(id='pressure-slider', min=4, max=8, value=6,
                          step=0.5,
                          marks={i: str(i) for i in range(4, 9)}),

                html.Label("Flow (kg/h):"),
                dcc.Slider(id='flow-slider', min=80, max=120, value=100,
                          marks={i: str(i) for i in range(80, 121, 10)}),

                html.Button('Predict', id='predict-button', n_clicks=0)
            ], style={'margin': '20px'}),

            html.Div([
                dcc.Graph(id='prediction-time-series'),
                dcc.Graph(id='process-conditions')
            ]),

            dcc.Interval(id='interval-component', interval=2000, n_intervals=0)
        ])

    def _setup_callbacks(self):
        """Callback setup"""

        @self.app.callback(
            Output('status-display', 'children'),
            Output('prediction-time-series', 'figure'),
            Output('process-conditions', 'figure'),
            Input('predict-button', 'n_clicks'),
            Input('interval-component', 'n_intervals'),
            State('temp-slider', 'value'),
            State('pressure-slider', 'value'),
            State('flow-slider', 'value')
        )
        def update_dashboard(n_clicks, n_intervals, temp, pressure, flow):
            # Execute inference (on button click or interval)
            if n_clicks > 0 or n_intervals > 0:
                sensor_data = {
                    'temp': temp,
                    'pressure': pressure,
                    'flow': flow
                }

                # API call (actual implementation)
                # result = call_digital_twin_api(sensor_data, self.api_url)
                # Simplified version: local calculation
                prediction = 85 - 0.05*(temp-360)**2 - 2*(pressure-6)**2

                # Update buffers
                self.time_buffer.append(time.time())
                self.prediction_buffer.append(prediction)
                self.temp_buffer.append(temp)
                self.pressure_buffer.append(pressure)

            # Status display
            if len(self.prediction_buffer) > 0:
                status = f"Latest Prediction: {self.prediction_buffer[-1]:.2f}%"
            else:
                status = "No data"

            # Prediction time series graph
            time_series_fig = go.Figure()
            if len(self.time_buffer) > 0:
                time_series_fig.add_trace(go.Scatter(
                    x=list(range(len(self.prediction_buffer))),
                    y=list(self.prediction_buffer),
                    mode='lines+markers',
                    name='Yield Prediction',
                    line=dict(color='#11998e', width=2)
                ))

            time_series_fig.update_layout(
                title='Yield Prediction Over Time',
                xaxis_title='Sample',
                yaxis_title='Yield (%)',
                template='plotly_white',
                height=400
            )

            # Process conditions graph
            conditions_fig = go.Figure()
            if len(self.temp_buffer) > 0:
                conditions_fig.add_trace(go.Scatter(
                    x=list(range(len(self.temp_buffer))),
                    y=list(self.temp_buffer),
                    mode='lines',
                    name='Temperature',
                    yaxis='y1'
                ))
                conditions_fig.add_trace(go.Scatter(
                    x=list(range(len(self.pressure_buffer))),
                    y=list(self.pressure_buffer),
                    mode='lines',
                    name='Pressure',
                    yaxis='y2'
                ))

            conditions_fig.update_layout(
                title='Process Conditions',
                xaxis=dict(title='Sample'),
                yaxis=dict(title='Temperature (°C)', side='left'),
                yaxis2=dict(title='Pressure (bar)', side='right',
                           overlaying='y'),
                template='plotly_white',
                height=400
            )

            return status, time_series_fig, conditions_fig

    def run(self, host='0.0.0.0', port=8050, debug=False):
        """Start dashboard"""
        self.app.run_server(host=host, port=port, debug=debug)

# Usage example
# dashboard = DigitalTwinDashboard('http://localhost:5000')
# dashboard.run(port=8050)

print("Dashboard implementation ready")
print("Access at: http://localhost:8050")

5.6 Security and Access Control

Proper security measures are essential in production environments.

import hashlib
import secrets
from datetime import datetime, timedelta
import json

class SecurityManager:
    """Security Management System"""

    def __init__(self):
        self.users = {}  # In practice, use database
        self.api_keys = {}
        self.access_log = []

    def create_user(self, username, password, role='user'):
        """Create user"""
        # Password hashing
        salt = secrets.token_hex(16)
        password_hash = self._hash_password(password, salt)

        self.users[username] = {
            'password_hash': password_hash,
            'salt': salt,
            'role': role,
            'created_at': datetime.now().isoformat(),
            'active': True
        }

        return True

    def _hash_password(self, password, salt):
        """Hash password"""
        return hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            salt.encode('utf-8'),
            100000
        ).hex()

    def verify_password(self, username, password):
        """Verify password"""
        if username not in self.users:
            return False

        user = self.users[username]
        password_hash = self._hash_password(password, user['salt'])

        return password_hash == user['password_hash']

    def generate_api_key(self, username, expiry_days=365):
        """Generate API key"""
        if username not in self.users:
            return None

        api_key = secrets.token_urlsafe(32)
        expiry = datetime.now() + timedelta(days=expiry_days)

        self.api_keys[api_key] = {
            'username': username,
            'created_at': datetime.now().isoformat(),
            'expires_at': expiry.isoformat(),
            'active': True
        }

        return api_key

    def verify_api_key(self, api_key):
        """Verify API key"""
        if api_key not in self.api_keys:
            return False, "Invalid API key"

        key_info = self.api_keys[api_key]

        if not key_info['active']:
            return False, "API key deactivated"

        expiry = datetime.fromisoformat(key_info['expires_at'])
        if datetime.now() > expiry:
            return False, "API key expired"

        return True, key_info['username']

    def log_access(self, username, action, resource, success=True):
        """Log access"""
        self.access_log.append({
            'timestamp': datetime.now().isoformat(),
            'username': username,
            'action': action,
            'resource': resource,
            'success': success
        })

    def get_access_log(self, username=None, limit=100):
        """Get access log"""
        logs = self.access_log[-limit:]

        if username:
            logs = [log for log in logs if log['username'] == username]

        return logs

    def check_rate_limit(self, username, max_requests=100, window_minutes=60):
        """Check rate limit"""
        cutoff_time = datetime.now() - timedelta(minutes=window_minutes)

        recent_requests = [
            log for log in self.access_log
            if (log['username'] == username and
                datetime.fromisoformat(log['timestamp']) > cutoff_time)
        ]

        return len(recent_requests) < max_requests

# Usage example
security = SecurityManager()

# Create users
security.create_user('operator1', 'secure_password', role='operator')
security.create_user('admin1', 'admin_password', role='admin')

# Authentication
is_valid = security.verify_password('operator1', 'secure_password')
print(f"Authentication: {is_valid}")

# Issue API key
api_key = security.generate_api_key('operator1', expiry_days=90)
print(f"API Key: {api_key[:20]}...")

# Verify API key
valid, username = security.verify_api_key(api_key)
print(f"API Key Valid: {valid}, User: {username}")

# Access logging
security.log_access('operator1', 'PREDICT', '/api/v1/twin/predict', success=True)
security.log_access('operator1', 'PREDICT', '/api/v1/twin/predict', success=True)

# Rate limit check
within_limit = security.check_rate_limit('operator1', max_requests=100)
print(f"Within rate limit: {within_limit}")

# Check access log
logs = security.get_access_log('operator1')
print(f"Access log entries: {len(logs)}")
⚠️ Security Checklist:
  • Enforce HTTPS communication (TLS 1.2 or higher)
  • Implement authentication and authorization (JWT, OAuth2.0)
  • Rate limiting and DDoS protection
  • Input validation and SQL injection prevention
  • Access log storage and monitoring
  • Regular security audits

5.7 Production System Integration Example

A complete production system example integrating all elements.

import logging
from pathlib import Path

class ProductionDigitalTwinSystem:
    """Production Digital Twin System"""

    def __init__(self, config_path):
        self.config = self._load_config(config_path)
        self.logger = self._setup_logging()

        # Component initialization
        self.edge_twin = EdgeDigitalTwin(self.config['model_path'])
        self.security = SecurityManager()
        self.api_service = None  # In practice, Flask app

        self.logger.info("Production Digital Twin System initialized")

    def _load_config(self, config_path):
        """Load configuration"""
        with open(config_path, 'r') as f:
            return json.load(f)

    def _setup_logging(self):
        """Setup logging"""
        log_dir = Path(self.config.get('log_dir', './logs'))
        log_dir.mkdir(exist_ok=True)

        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_dir / 'digital_twin.log'),
                logging.StreamHandler()
            ]
        )

        return logging.getLogger('DigitalTwinSystem')

    def start(self):
        """Start system"""
        self.logger.info("Starting Digital Twin System")

        # Health check
        if not self._health_check():
            self.logger.error("Health check failed")
            return False

        # Start API service (actual implementation)
        # self.api_service = DigitalTwinMicroservice(...)
        # self.api_service.run()

        self.logger.info("System started successfully")
        return True

    def _health_check(self):
        """Startup health check"""
        checks = {
            'model_loaded': hasattr(self.edge_twin, 'model'),
            'config_valid': 'model_path' in self.config,
            'security_ready': self.security is not None
        }

        all_healthy = all(checks.values())

        for check, status in checks.items():
            self.logger.info(f"Health check - {check}: {'OK' if status else 'FAIL'}")

        return all_healthy

    def predict_with_monitoring(self, sensor_data, user):
        """Prediction with monitoring"""
        try:
            # Security check
            if not self.security.check_rate_limit(user):
                self.logger.warning(f"Rate limit exceeded for user: {user}")
                return {'error': 'Rate limit exceeded'}, 429

            # Execute inference
            result = self.edge_twin.predict(sensor_data)

            # Log
            self.security.log_access(user, 'PREDICT', 'twin/predict', True)
            self.logger.info(f"Prediction successful - User: {user}, "
                           f"Time: {result['inference_time_ms']:.2f}ms")

            return result, 200

        except Exception as e:
            self.logger.error(f"Prediction error: {str(e)}")
            self.security.log_access(user, 'PREDICT', 'twin/predict', False)
            return {'error': 'Internal server error'}, 500

    def shutdown(self):
        """Shutdown system"""
        self.logger.info("Shutting down Digital Twin System")

        # Cleanup operations
        # - Close database connections
        # - Flush cache
        # - Release resources

        self.logger.info("System shutdown complete")

# Configuration file example
PRODUCTION_CONFIG = {
    "model_path": "/app/models/production_v1.0",
    "log_dir": "/var/log/digital_twin",
    "api_port": 5000,
    "max_requests_per_hour": 1000,
    "enable_monitoring": True,
    "database": {
        "host": "localhost",
        "port": 5432,
        "name": "digital_twin_db"
    }
}

# Usage example
# system = ProductionDigitalTwinSystem('config.json')
# system.start()

# Execute inference
# result, status = system.predict_with_monitoring(
#     {'temp': 360, 'pressure': 6.0, 'flow': 100},
#     user='operator1'
# )

print("Production system implementation complete")
📊 Deployment Success Stories:
  • Siemens: Cloud-based digital twin platform managing 100+ plants
  • Shell: Edge + cloud hybrid configuration achieving sub-1ms latency
  • BASF: Microservice architecture realizing 99.99% availability

Learning Objectives Review

Upon completing this chapter, you will be able to explain and implement the following:

Basic Understanding

Practical Skills

Advanced Capabilities

Exercises

Easy (Basic Verification)

Q1: List three main differences between edge deployment and cloud deployment.

View Answer

Answer:

Aspect Edge Deployment Cloud Deployment
Latency Low (<1ms possible) High (10-100ms)
Computing Resources Limited Scalable
Network Dependency Low (offline operation possible) High (constant connection required)

Explanation: Edge deployment is suitable for real-time control requiring low latency, while cloud deployment is appropriate for complex analysis and access from multiple locations. In practice, hybrid configurations combining both are common.

Medium (Application)

Q2: List five essential security measures for production digital twin systems.

View Answer

Answer:

  1. Authentication and Authorization: User authentication via JWT, OAuth2.0
  2. Encrypted Communication: Enforce HTTPS (TLS 1.2 or higher)
  3. Input Validation: Prevent SQL injection and XSS attacks
  4. Rate Limiting: DDoS attack prevention
  5. Audit Logs: Record and monitor all access

Additional Recommendations:

  • API key rotation
  • Principle of least privilege
  • Regular security audits
  • Vulnerability scanning

Hard (Advanced)

Q3: Extend the provided ProductionDigitalTwinSystem class to add the following features:

  1. Model A/B testing functionality (randomly switch between two model versions)
  2. Prediction result caching (prevent recalculation for identical inputs)
  3. Anomaly detection alerts (notify when predictions are in abnormal range)
View Example Answer
import hashlib
from collections import OrderedDict
import random

class AdvancedProductionSystem(ProductionDigitalTwinSystem):
    """Advanced Production System"""

    def __init__(self, config_path):
        super().__init__(config_path)

        # Two models for A/B testing
        self.model_a = EdgeDigitalTwin(self.config['model_path_a'])
        self.model_b = EdgeDigitalTwin(self.config['model_path_b'])
        self.ab_ratio = 0.5  # 50/50 split

        # Cache (max 1000 entries, LRU)
        self.cache = OrderedDict()
        self.cache_max_size = 1000

        # Anomaly detection thresholds
        self.alert_thresholds = {
            'min': 70,
            'max': 95
        }

    def _get_cache_key(self, sensor_data):
        """Generate cache key"""
        # Generate unique key from sorted sensor data
        sorted_data = json.dumps(sensor_data, sort_keys=True)
        return hashlib.md5(sorted_data.encode()).hexdigest()

    def predict_with_cache(self, sensor_data, user):
        """Prediction with cache"""
        cache_key = self._get_cache_key(sensor_data)

        # Cache hit
        if cache_key in self.cache:
            self.logger.info(f"Cache hit for user: {user}")
            result = self.cache[cache_key]
            result['from_cache'] = True
            return result, 200

        # A/B test: model selection
        use_model_a = random.random() < self.ab_ratio
        model = self.model_a if use_model_a else self.model_b
        model_version = 'A' if use_model_a else 'B'

        # Execute inference
        result = model.predict(sensor_data)
        result['model_version'] = model_version
        result['from_cache'] = False

        # Anomaly detection
        if self._is_anomaly(result['prediction']):
            self._send_alert(user, sensor_data, result)

        # Save to cache (LRU)
        self.cache[cache_key] = result
        if len(self.cache) > self.cache_max_size:
            self.cache.popitem(last=False)  # Remove oldest

        # Log
        self.security.log_access(user, 'PREDICT', 'twin/predict', True)
        self.logger.info(f"Prediction - User: {user}, Model: {model_version}, "
                        f"Result: {result['prediction']:.2f}")

        return result, 200

    def _is_anomaly(self, prediction):
        """Anomaly detection"""
        return (prediction < self.alert_thresholds['min'] or
                prediction > self.alert_thresholds['max'])

    def _send_alert(self, user, sensor_data, result):
        """Send alert"""
        alert_message = (
            f"ANOMALY DETECTED\\n"
            f"User: {user}\\n"
            f"Prediction: {result['prediction']:.2f}\\n"
            f"Sensor Data: {sensor_data}\\n"
            f"Expected Range: [{self.alert_thresholds['min']}, "
            f"{self.alert_thresholds['max']}]"
        )

        self.logger.warning(alert_message)

        # In practice: email/Slack/SMS notification
        print(f"🚨 ALERT: {alert_message}")

    def get_ab_test_stats(self):
        """A/B test statistics"""
        # In practice, aggregate from database
        return {
            'model_a_count': 0,
            'model_b_count': 0,
            'cache_hit_rate': 0.0
        }

# Usage example
# config = {
#     'model_path_a': './models/v1.0',
#     'model_path_b': './models/v1.1',
#     'log_dir': './logs'
# }
# system = AdvancedProductionSystem('config.json')
# result, status = system.predict_with_cache(
#     {'temp': 360, 'pressure': 6.0, 'flow': 100},
#     'operator1'
# )
# print(f"Prediction: {result['prediction']:.2f}")
# print(f"Model: {result['model_version']}, Cache: {result['from_cache']}")

print("Advanced production system implementation complete")

Explanation: This implementation compares model performance through A/B testing, reduces computational cost with caching, and immediately alerts operators through anomaly detection. These features significantly enhance operational efficiency in production systems.

Summary

In Chapter 5, we learned about production deployment of digital twins. Through practical implementation examples covering edge to cloud, security to monitoring, we deepened our understanding.

Through this series, you have mastered the complete picture from digital twin fundamentals to applications, implementation, and deployment. Next is the phase of applying to actual processes and continuously improving.

🎓 Next Steps:
  • Conduct small-scale PoC (proof of concept) in your own processes
  • Gradually expand functionality (improve model accuracy, add optimization features)
  • Accumulate operational data and establish continuous improvement cycle
  • Consider integration with other digitalization initiatives (MES, ERP, etc.)

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer