This chapter covers Deploying Digital Twins. You will learn differences between edge, benefits of containerization, and RESTful API design principles.
5.1 Edge Computing Deployment
To achieve low-latency real-time inference at plant sites, we deploy digital twins on edge devices.
5.1.1 Lightweight Model Deployment
import pickle
import numpy as np
from pathlib import Path
import json
import time
class EdgeDigitalTwin:
"""Lightweight Digital Twin for Edge Devices"""
def __init__(self, model_path):
self.model = None
self.scaler = None
self.config = {}
self.load_model(model_path)
def load_model(self, model_path):
"""Load model"""
model_dir = Path(model_path)
# Model file
with open(model_dir / 'model.pkl', 'rb') as f:
self.model = pickle.load(f)
# Scaler
with open(model_dir / 'scaler.pkl', 'rb') as f:
self.scaler = pickle.load(f)
# Configuration file
with open(model_dir / 'config.json', 'r') as f:
self.config = json.load(f)
print(f"Model loaded: {self.config['model_type']}")
print(f"Features: {self.config['features']}")
def predict(self, sensor_data):
"""Real-time inference
Args:
sensor_data: Dictionary of {feature_name: value}
Returns:
Prediction result and confidence interval
"""
start_time = time.time()
# Feature extraction
X = np.array([[sensor_data[f] for f in self.config['features']]])
# Preprocessing
X_scaled = self.scaler.transform(X)
# Inference
prediction = self.model.predict(X_scaled)[0]
# Confidence interval (for ensemble models)
if hasattr(self.model, 'estimators_'):
predictions = [est.predict(X_scaled)[0]
for est in self.model.estimators_]
confidence_interval = (
np.percentile(predictions, 5),
np.percentile(predictions, 95)
)
else:
confidence_interval = None
inference_time = (time.time() - start_time) * 1000 # ms
return {
'prediction': prediction,
'confidence_interval': confidence_interval,
'inference_time_ms': inference_time,
'timestamp': time.time()
}
def validate_input(self, sensor_data):
"""Validate input data"""
missing_features = [f for f in self.config['features']
if f not in sensor_data]
if missing_features:
return False, f"Missing features: {missing_features}"
# Range check
for feature, value in sensor_data.items():
if feature in self.config.get('valid_ranges', {}):
min_val, max_val = self.config['valid_ranges'][feature]
if not (min_val <= value <= max_val):
return False, f"{feature}={value} out of range [{min_val}, {max_val}]"
return True, "OK"
def get_model_info(self):
"""Get model information"""
return {
'type': self.config['model_type'],
'version': self.config.get('version', 'unknown'),
'features': self.config['features'],
'trained_date': self.config.get('trained_date', 'unknown')
}
# Model packaging function
def package_model_for_edge(model, scaler, features, output_dir,
valid_ranges=None):
"""Package model for edge deployment"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Save model
with open(output_path / 'model.pkl', 'wb') as f:
pickle.dump(model, f)
# Save scaler
with open(output_path / 'scaler.pkl', 'wb') as f:
pickle.dump(scaler, f)
# Configuration file
config = {
'model_type': type(model).__name__,
'features': features,
'version': '1.0.0',
'trained_date': time.strftime('%Y-%m-%d'),
'valid_ranges': valid_ranges or {}
}
with open(output_path / 'config.json', 'w') as f:
json.dump(config, f, indent=2)
print(f"Model packaged to {output_path}")
# Usage example
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
# Model training (simple example)
np.random.seed(42)
X_train = np.random.rand(100, 3)
y_train = X_train[:, 0] * 10 + X_train[:, 1] * 5 + np.random.randn(100)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_train)
model = RandomForestRegressor(n_estimators=10, max_depth=5, random_state=42)
model.fit(X_scaled, y_train)
# Packaging
package_model_for_edge(
model, scaler,
features=['temp', 'pressure', 'flow'],
output_dir='./edge_model',
valid_ranges={'temp': (300, 400), 'pressure': (1, 10), 'flow': (50, 150)}
)
# Usage on edge device
edge_twin = EdgeDigitalTwin('./edge_model')
# Real-time inference
sensor_reading = {'temp': 350, 'pressure': 5.5, 'flow': 100}
result = edge_twin.predict(sensor_reading)
print(f"Prediction: {result['prediction']:.2f}")
print(f"Inference Time: {result['inference_time_ms']:.2f} ms")
if result['confidence_interval']:
print(f"90% CI: [{result['confidence_interval'][0]:.2f}, "
f"{result['confidence_interval'][1]:.2f}]")
- Minimize model size (quantization, pruning)
- Keep inference time under 1 second (real-time constraints)
- Always implement input validation (anomaly prevention)
- Provide fail-safe mode
5.2 Cloud Deployment Architecture
Host digital twins in a scalable cloud environment to enable access from multiple plants.
5.2.1 Microservice Architecture
from flask import Flask, request, jsonify
from functools import wraps
import logging
from datetime import datetime
class DigitalTwinMicroservice:
"""Digital Twin Microservice"""
def __init__(self, model_path, service_name='digital-twin'):
self.app = Flask(service_name)
self.model = EdgeDigitalTwin(model_path)
self.request_count = 0
# Logging setup
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(service_name)
self._setup_routes()
def _setup_routes(self):
"""Setup API endpoints"""
@self.app.route('/health', methods=['GET'])
def health_check():
"""Health check"""
return jsonify({
'status': 'healthy',
'service': 'digital-twin',
'model_version': self.model.config.get('version', 'unknown'),
'timestamp': datetime.now().isoformat()
})
@self.app.route('/predict', methods=['POST'])
@self._log_request
def predict():
"""Prediction endpoint"""
try:
data = request.get_json()
# Input validation
valid, message = self.model.validate_input(data)
if not valid:
return jsonify({'error': message}), 400
# Execute inference
result = self.model.predict(data)
self.request_count += 1
return jsonify({
'success': True,
'result': result,
'request_id': self.request_count
})
except Exception as e:
self.logger.error(f"Prediction error: {str(e)}")
return jsonify({'error': str(e)}), 500
@self.app.route('/model/info', methods=['GET'])
def model_info():
"""Model information"""
return jsonify(self.model.get_model_info())
@self.app.route('/metrics', methods=['GET'])
def metrics():
"""Metrics"""
return jsonify({
'total_requests': self.request_count,
'model_version': self.model.config.get('version'),
'uptime_seconds': self._get_uptime()
})
def _log_request(self, f):
"""Request logging decorator"""
@wraps(f)
def decorated_function(*args, **kwargs):
self.logger.info(f"Request: {request.method} {request.path}")
response = f(*args, **kwargs)
self.logger.info(f"Response: {response[1] if isinstance(response, tuple) else 200}")
return response
return decorated_function
def _get_uptime(self):
"""Uptime (simplified version)"""
return 0 # Implementation omitted
def run(self, host='0.0.0.0', port=5000, debug=False):
"""Start service"""
self.logger.info(f"Starting Digital Twin Microservice on {host}:{port}")
self.app.run(host=host, port=port, debug=debug)
# Usage example (at runtime)
# service = DigitalTwinMicroservice('./edge_model')
# service.run(port=5000)
# Client-side usage example
import requests
def call_digital_twin_api(sensor_data, api_url='http://localhost:5000'):
"""Call Digital Twin API"""
try:
# Inference request
response = requests.post(
f"{api_url}/predict",
json=sensor_data,
timeout=5.0
)
if response.status_code == 200:
return response.json()
else:
print(f"Error: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
# API call example
sensor_data = {'temp': 360, 'pressure': 6.0, 'flow': 100}
result = call_digital_twin_api(sensor_data)
if result and result['success']:
print(f"Prediction: {result['result']['prediction']:.2f}")
print(f"Inference time: {result['result']['inference_time_ms']:.2f} ms")
5.3 Docker Containerization
Containerize digital twins for environment-independent and easy deployment.
# Dockerfile example (displayed as text within Python code)
DOCKERFILE_CONTENT = """
# Base image
FROM python:3.9-slim
# Working directory
WORKDIR /app
# System dependencies
RUN apt-get update && apt-get install -y \\
gcc \\
&& rm -rf /var/lib/apt/lists/*
# Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Application code
COPY ./digital_twin /app/digital_twin
COPY ./edge_model /app/edge_model
# Run as non-root user
RUN useradd -m -u 1000 dtuser && chown -R dtuser:dtuser /app
USER dtuser
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \\
CMD python -c "import requests; requests.get('http://localhost:5000/health')"
# Expose port
EXPOSE 5000
# Startup command
CMD ["python", "-m", "digital_twin.service"]
"""
# requirements.txt content
REQUIREMENTS_CONTENT = """
flask==2.3.0
numpy==1.24.0
scikit-learn==1.3.0
requests==2.31.0
gunicorn==21.0.0
"""
# Docker compose configuration
DOCKER_COMPOSE_CONTENT = """
version: '3.8'
services:
digital-twin:
build: .
ports:
- "5000:5000"
environment:
- MODEL_PATH=/app/edge_model
- LOG_LEVEL=INFO
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
interval: 30s
timeout: 5s
retries: 3
# Load balancer (for multiple instances)
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- digital-twin
restart: unless-stopped
"""
# Docker build and run script
class DockerDeployment:
"""Docker Deployment Management"""
@staticmethod
def create_dockerfile(output_dir='.'):
"""Create Dockerfile"""
with open(f"{output_dir}/Dockerfile", 'w') as f:
f.write(DOCKERFILE_CONTENT)
print("Dockerfile created")
@staticmethod
def create_requirements(output_dir='.'):
"""Create requirements.txt"""
with open(f"{output_dir}/requirements.txt", 'w') as f:
f.write(REQUIREMENTS_CONTENT)
print("requirements.txt created")
@staticmethod
def create_docker_compose(output_dir='.'):
"""Create docker-compose.yml"""
with open(f"{output_dir}/docker-compose.yml", 'w') as f:
f.write(DOCKER_COMPOSE_CONTENT)
print("docker-compose.yml created")
@staticmethod
def build_image(tag='digital-twin:latest'):
"""Build Docker image"""
import subprocess
cmd = f"docker build -t {tag} ."
subprocess.run(cmd, shell=True)
print(f"Image built: {tag}")
@staticmethod
def run_container(tag='digital-twin:latest', port=5000):
"""Start container"""
import subprocess
cmd = f"docker run -d -p {port}:5000 --name dt-service {tag}"
subprocess.run(cmd, shell=True)
print(f"Container started on port {port}")
# Usage example
# deployment = DockerDeployment()
# deployment.create_dockerfile()
# deployment.create_requirements()
# deployment.create_docker_compose()
# deployment.build_image()
# deployment.run_container()
print("Docker deployment files ready")
print("To build: docker build -t digital-twin .")
print("To run: docker-compose up -d")
5.4 RESTful API Design
Enable access to digital twins through standard RESTful APIs.
from flask import Flask, request, jsonify, make_response
from functools import wraps
import jwt
import datetime
class SecureDigitalTwinAPI:
"""Secure Digital Twin API"""
def __init__(self, model_path, secret_key):
self.app = Flask(__name__)
self.app.config['SECRET_KEY'] = secret_key
self.model = EdgeDigitalTwin(model_path)
self._setup_secure_routes()
def _token_required(self, f):
"""JWT authentication decorator"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'message': 'Token is missing'}), 401
try:
# "Bearer " format
token = token.split()[1] if ' ' in token else token
data = jwt.decode(token, self.app.config['SECRET_KEY'],
algorithms=['HS256'])
current_user = data['user']
except:
return jsonify({'message': 'Token is invalid'}), 401
return f(current_user, *args, **kwargs)
return decorated
def _setup_secure_routes(self):
"""Setup secure API endpoints"""
@self.app.route('/api/v1/auth/login', methods=['POST'])
def login():
"""Authentication (token issuance)"""
auth = request.get_json()
# In practice, verify user with DB
if auth.get('username') == 'admin' and auth.get('password') == 'secret':
token = jwt.encode({
'user': auth['username'],
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)
}, self.app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({'token': token})
return jsonify({'message': 'Invalid credentials'}), 401
@self.app.route('/api/v1/twin/predict', methods=['POST'])
@self._token_required
def predict(current_user):
"""Prediction (authentication required)"""
data = request.get_json()
valid, message = self.model.validate_input(data)
if not valid:
return jsonify({'error': message}), 400
result = self.model.predict(data)
return jsonify({
'user': current_user,
'prediction': result['prediction'],
'confidence_interval': result['confidence_interval'],
'timestamp': result['timestamp']
})
@self.app.route('/api/v1/twin/batch', methods=['POST'])
@self._token_required
def batch_predict(current_user):
"""Batch prediction"""
data = request.get_json()
inputs = data.get('inputs', [])
if len(inputs) > 100:
return jsonify({'error': 'Batch size limit: 100'}), 400
results = []
for inp in inputs:
valid, _ = self.model.validate_input(inp)
if valid:
result = self.model.predict(inp)
results.append({
'input': inp,
'prediction': result['prediction']
})
return jsonify({
'user': current_user,
'total': len(inputs),
'successful': len(results),
'results': results
})
@self.app.route('/api/v1/twin/status', methods=['GET'])
@self._token_required
def status(current_user):
"""Status check"""
return jsonify({
'status': 'operational',
'model_info': self.model.get_model_info(),
'user': current_user
})
# Usage example (client side)
class DigitalTwinClient:
"""Digital Twin API Client"""
def __init__(self, api_url, username, password):
self.api_url = api_url
self.token = None
self._authenticate(username, password)
def _authenticate(self, username, password):
"""Authentication"""
response = requests.post(
f"{self.api_url}/api/v1/auth/login",
json={'username': username, 'password': password}
)
if response.status_code == 200:
self.token = response.json()['token']
print("Authenticated successfully")
else:
raise Exception("Authentication failed")
def predict(self, sensor_data):
"""Prediction request"""
headers = {'Authorization': f'Bearer {self.token}'}
response = requests.post(
f"{self.api_url}/api/v1/twin/predict",
json=sensor_data,
headers=headers
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Prediction failed: {response.status_code}")
def batch_predict(self, inputs_list):
"""Batch prediction"""
headers = {'Authorization': f'Bearer {self.token}'}
response = requests.post(
f"{self.api_url}/api/v1/twin/batch",
json={'inputs': inputs_list},
headers=headers
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Batch prediction failed: {response.status_code}")
# Usage example
# client = DigitalTwinClient('http://localhost:5000', 'admin', 'secret')
# result = client.predict({'temp': 360, 'pressure': 6.0, 'flow': 100})
# print(f"Prediction: {result['prediction']}")
print("Secure API implementation ready")
5.5 Visualization Dashboard
Build a real-time monitoring dashboard using Plotly Dash.
import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.graph_objs as go
import pandas as pd
from collections import deque
import threading
import time
class DigitalTwinDashboard:
"""Digital Twin Dashboard"""
def __init__(self, digital_twin_api_url):
self.api_url = digital_twin_api_url
self.app = dash.Dash(__name__)
# Data buffers (latest 100 points)
self.time_buffer = deque(maxlen=100)
self.prediction_buffer = deque(maxlen=100)
self.temp_buffer = deque(maxlen=100)
self.pressure_buffer = deque(maxlen=100)
self._setup_layout()
self._setup_callbacks()
def _setup_layout(self):
"""Layout setup"""
self.app.layout = html.Div([
html.H1("Digital Twin Dashboard",
style={'textAlign': 'center', 'color': '#2c3e50'}),
html.Div([
html.Div([
html.H3("Current Status"),
html.Div(id='status-display',
style={'fontSize': '24px', 'fontWeight': 'bold'})
], style={'width': '48%', 'display': 'inline-block'}),
html.Div([
html.H3("Model Info"),
html.Div(id='model-info')
], style={'width': '48%', 'float': 'right',
'display': 'inline-block'})
]),
html.Div([
html.H3("Input Controls"),
html.Label("Temperature (°C):"),
dcc.Slider(id='temp-slider', min=340, max=380, value=360,
marks={i: str(i) for i in range(340, 381, 10)}),
html.Label("Pressure (bar):"),
dcc.Slider(id='pressure-slider', min=4, max=8, value=6,
step=0.5,
marks={i: str(i) for i in range(4, 9)}),
html.Label("Flow (kg/h):"),
dcc.Slider(id='flow-slider', min=80, max=120, value=100,
marks={i: str(i) for i in range(80, 121, 10)}),
html.Button('Predict', id='predict-button', n_clicks=0)
], style={'margin': '20px'}),
html.Div([
dcc.Graph(id='prediction-time-series'),
dcc.Graph(id='process-conditions')
]),
dcc.Interval(id='interval-component', interval=2000, n_intervals=0)
])
def _setup_callbacks(self):
"""Callback setup"""
@self.app.callback(
Output('status-display', 'children'),
Output('prediction-time-series', 'figure'),
Output('process-conditions', 'figure'),
Input('predict-button', 'n_clicks'),
Input('interval-component', 'n_intervals'),
State('temp-slider', 'value'),
State('pressure-slider', 'value'),
State('flow-slider', 'value')
)
def update_dashboard(n_clicks, n_intervals, temp, pressure, flow):
# Execute inference (on button click or interval)
if n_clicks > 0 or n_intervals > 0:
sensor_data = {
'temp': temp,
'pressure': pressure,
'flow': flow
}
# API call (actual implementation)
# result = call_digital_twin_api(sensor_data, self.api_url)
# Simplified version: local calculation
prediction = 85 - 0.05*(temp-360)**2 - 2*(pressure-6)**2
# Update buffers
self.time_buffer.append(time.time())
self.prediction_buffer.append(prediction)
self.temp_buffer.append(temp)
self.pressure_buffer.append(pressure)
# Status display
if len(self.prediction_buffer) > 0:
status = f"Latest Prediction: {self.prediction_buffer[-1]:.2f}%"
else:
status = "No data"
# Prediction time series graph
time_series_fig = go.Figure()
if len(self.time_buffer) > 0:
time_series_fig.add_trace(go.Scatter(
x=list(range(len(self.prediction_buffer))),
y=list(self.prediction_buffer),
mode='lines+markers',
name='Yield Prediction',
line=dict(color='#11998e', width=2)
))
time_series_fig.update_layout(
title='Yield Prediction Over Time',
xaxis_title='Sample',
yaxis_title='Yield (%)',
template='plotly_white',
height=400
)
# Process conditions graph
conditions_fig = go.Figure()
if len(self.temp_buffer) > 0:
conditions_fig.add_trace(go.Scatter(
x=list(range(len(self.temp_buffer))),
y=list(self.temp_buffer),
mode='lines',
name='Temperature',
yaxis='y1'
))
conditions_fig.add_trace(go.Scatter(
x=list(range(len(self.pressure_buffer))),
y=list(self.pressure_buffer),
mode='lines',
name='Pressure',
yaxis='y2'
))
conditions_fig.update_layout(
title='Process Conditions',
xaxis=dict(title='Sample'),
yaxis=dict(title='Temperature (°C)', side='left'),
yaxis2=dict(title='Pressure (bar)', side='right',
overlaying='y'),
template='plotly_white',
height=400
)
return status, time_series_fig, conditions_fig
def run(self, host='0.0.0.0', port=8050, debug=False):
"""Start dashboard"""
self.app.run_server(host=host, port=port, debug=debug)
# Usage example
# dashboard = DigitalTwinDashboard('http://localhost:5000')
# dashboard.run(port=8050)
print("Dashboard implementation ready")
print("Access at: http://localhost:8050")
5.6 Security and Access Control
Proper security measures are essential in production environments.
import hashlib
import secrets
from datetime import datetime, timedelta
import json
class SecurityManager:
"""Security Management System"""
def __init__(self):
self.users = {} # In practice, use database
self.api_keys = {}
self.access_log = []
def create_user(self, username, password, role='user'):
"""Create user"""
# Password hashing
salt = secrets.token_hex(16)
password_hash = self._hash_password(password, salt)
self.users[username] = {
'password_hash': password_hash,
'salt': salt,
'role': role,
'created_at': datetime.now().isoformat(),
'active': True
}
return True
def _hash_password(self, password, salt):
"""Hash password"""
return hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000
).hex()
def verify_password(self, username, password):
"""Verify password"""
if username not in self.users:
return False
user = self.users[username]
password_hash = self._hash_password(password, user['salt'])
return password_hash == user['password_hash']
def generate_api_key(self, username, expiry_days=365):
"""Generate API key"""
if username not in self.users:
return None
api_key = secrets.token_urlsafe(32)
expiry = datetime.now() + timedelta(days=expiry_days)
self.api_keys[api_key] = {
'username': username,
'created_at': datetime.now().isoformat(),
'expires_at': expiry.isoformat(),
'active': True
}
return api_key
def verify_api_key(self, api_key):
"""Verify API key"""
if api_key not in self.api_keys:
return False, "Invalid API key"
key_info = self.api_keys[api_key]
if not key_info['active']:
return False, "API key deactivated"
expiry = datetime.fromisoformat(key_info['expires_at'])
if datetime.now() > expiry:
return False, "API key expired"
return True, key_info['username']
def log_access(self, username, action, resource, success=True):
"""Log access"""
self.access_log.append({
'timestamp': datetime.now().isoformat(),
'username': username,
'action': action,
'resource': resource,
'success': success
})
def get_access_log(self, username=None, limit=100):
"""Get access log"""
logs = self.access_log[-limit:]
if username:
logs = [log for log in logs if log['username'] == username]
return logs
def check_rate_limit(self, username, max_requests=100, window_minutes=60):
"""Check rate limit"""
cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
recent_requests = [
log for log in self.access_log
if (log['username'] == username and
datetime.fromisoformat(log['timestamp']) > cutoff_time)
]
return len(recent_requests) < max_requests
# Usage example
security = SecurityManager()
# Create users
security.create_user('operator1', 'secure_password', role='operator')
security.create_user('admin1', 'admin_password', role='admin')
# Authentication
is_valid = security.verify_password('operator1', 'secure_password')
print(f"Authentication: {is_valid}")
# Issue API key
api_key = security.generate_api_key('operator1', expiry_days=90)
print(f"API Key: {api_key[:20]}...")
# Verify API key
valid, username = security.verify_api_key(api_key)
print(f"API Key Valid: {valid}, User: {username}")
# Access logging
security.log_access('operator1', 'PREDICT', '/api/v1/twin/predict', success=True)
security.log_access('operator1', 'PREDICT', '/api/v1/twin/predict', success=True)
# Rate limit check
within_limit = security.check_rate_limit('operator1', max_requests=100)
print(f"Within rate limit: {within_limit}")
# Check access log
logs = security.get_access_log('operator1')
print(f"Access log entries: {len(logs)}")
- Enforce HTTPS communication (TLS 1.2 or higher)
- Implement authentication and authorization (JWT, OAuth2.0)
- Rate limiting and DDoS protection
- Input validation and SQL injection prevention
- Access log storage and monitoring
- Regular security audits
5.7 Production System Integration Example
A complete production system example integrating all elements.
import logging
from pathlib import Path
class ProductionDigitalTwinSystem:
"""Production Digital Twin System"""
def __init__(self, config_path):
self.config = self._load_config(config_path)
self.logger = self._setup_logging()
# Component initialization
self.edge_twin = EdgeDigitalTwin(self.config['model_path'])
self.security = SecurityManager()
self.api_service = None # In practice, Flask app
self.logger.info("Production Digital Twin System initialized")
def _load_config(self, config_path):
"""Load configuration"""
with open(config_path, 'r') as f:
return json.load(f)
def _setup_logging(self):
"""Setup logging"""
log_dir = Path(self.config.get('log_dir', './logs'))
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / 'digital_twin.log'),
logging.StreamHandler()
]
)
return logging.getLogger('DigitalTwinSystem')
def start(self):
"""Start system"""
self.logger.info("Starting Digital Twin System")
# Health check
if not self._health_check():
self.logger.error("Health check failed")
return False
# Start API service (actual implementation)
# self.api_service = DigitalTwinMicroservice(...)
# self.api_service.run()
self.logger.info("System started successfully")
return True
def _health_check(self):
"""Startup health check"""
checks = {
'model_loaded': hasattr(self.edge_twin, 'model'),
'config_valid': 'model_path' in self.config,
'security_ready': self.security is not None
}
all_healthy = all(checks.values())
for check, status in checks.items():
self.logger.info(f"Health check - {check}: {'OK' if status else 'FAIL'}")
return all_healthy
def predict_with_monitoring(self, sensor_data, user):
"""Prediction with monitoring"""
try:
# Security check
if not self.security.check_rate_limit(user):
self.logger.warning(f"Rate limit exceeded for user: {user}")
return {'error': 'Rate limit exceeded'}, 429
# Execute inference
result = self.edge_twin.predict(sensor_data)
# Log
self.security.log_access(user, 'PREDICT', 'twin/predict', True)
self.logger.info(f"Prediction successful - User: {user}, "
f"Time: {result['inference_time_ms']:.2f}ms")
return result, 200
except Exception as e:
self.logger.error(f"Prediction error: {str(e)}")
self.security.log_access(user, 'PREDICT', 'twin/predict', False)
return {'error': 'Internal server error'}, 500
def shutdown(self):
"""Shutdown system"""
self.logger.info("Shutting down Digital Twin System")
# Cleanup operations
# - Close database connections
# - Flush cache
# - Release resources
self.logger.info("System shutdown complete")
# Configuration file example
PRODUCTION_CONFIG = {
"model_path": "/app/models/production_v1.0",
"log_dir": "/var/log/digital_twin",
"api_port": 5000,
"max_requests_per_hour": 1000,
"enable_monitoring": True,
"database": {
"host": "localhost",
"port": 5432,
"name": "digital_twin_db"
}
}
# Usage example
# system = ProductionDigitalTwinSystem('config.json')
# system.start()
# Execute inference
# result, status = system.predict_with_monitoring(
# {'temp': 360, 'pressure': 6.0, 'flow': 100},
# user='operator1'
# )
print("Production system implementation complete")
- Siemens: Cloud-based digital twin platform managing 100+ plants
- Shell: Edge + cloud hybrid configuration achieving sub-1ms latency
- BASF: Microservice architecture realizing 99.99% availability
Learning Objectives Review
Upon completing this chapter, you will be able to explain and implement the following:
Basic Understanding
- ✅ Explain differences between edge and cloud deployment
- ✅ Understand benefits of containerization
- ✅ Understand RESTful API design principles
- ✅ Explain security requirements for production systems
Practical Skills
- ✅ Package lightweight models for edge devices
- ✅ Implement RESTful API with Flask
- ✅ Create Dockerfile and containerize
- ✅ Build dashboards with Plotly Dash
- ✅ Implement basic security features
Advanced Capabilities
- ✅ Select appropriate deployment strategy based on requirements
- ✅ Design microservice architecture
- ✅ Develop production monitoring and logging strategies
- ✅ Assess security risks and implement countermeasures
Exercises
Easy (Basic Verification)
Q1: List three main differences between edge deployment and cloud deployment.
View Answer
Answer:
| Aspect | Edge Deployment | Cloud Deployment |
|---|---|---|
| Latency | Low (<1ms possible) | High (10-100ms) |
| Computing Resources | Limited | Scalable |
| Network Dependency | Low (offline operation possible) | High (constant connection required) |
Explanation: Edge deployment is suitable for real-time control requiring low latency, while cloud deployment is appropriate for complex analysis and access from multiple locations. In practice, hybrid configurations combining both are common.
Medium (Application)
Q2: List five essential security measures for production digital twin systems.
View Answer
Answer:
- Authentication and Authorization: User authentication via JWT, OAuth2.0
- Encrypted Communication: Enforce HTTPS (TLS 1.2 or higher)
- Input Validation: Prevent SQL injection and XSS attacks
- Rate Limiting: DDoS attack prevention
- Audit Logs: Record and monitor all access
Additional Recommendations:
- API key rotation
- Principle of least privilege
- Regular security audits
- Vulnerability scanning
Hard (Advanced)
Q3: Extend the provided ProductionDigitalTwinSystem class to add the following features:
- Model A/B testing functionality (randomly switch between two model versions)
- Prediction result caching (prevent recalculation for identical inputs)
- Anomaly detection alerts (notify when predictions are in abnormal range)
View Example Answer
import hashlib
from collections import OrderedDict
import random
class AdvancedProductionSystem(ProductionDigitalTwinSystem):
"""Advanced Production System"""
def __init__(self, config_path):
super().__init__(config_path)
# Two models for A/B testing
self.model_a = EdgeDigitalTwin(self.config['model_path_a'])
self.model_b = EdgeDigitalTwin(self.config['model_path_b'])
self.ab_ratio = 0.5 # 50/50 split
# Cache (max 1000 entries, LRU)
self.cache = OrderedDict()
self.cache_max_size = 1000
# Anomaly detection thresholds
self.alert_thresholds = {
'min': 70,
'max': 95
}
def _get_cache_key(self, sensor_data):
"""Generate cache key"""
# Generate unique key from sorted sensor data
sorted_data = json.dumps(sensor_data, sort_keys=True)
return hashlib.md5(sorted_data.encode()).hexdigest()
def predict_with_cache(self, sensor_data, user):
"""Prediction with cache"""
cache_key = self._get_cache_key(sensor_data)
# Cache hit
if cache_key in self.cache:
self.logger.info(f"Cache hit for user: {user}")
result = self.cache[cache_key]
result['from_cache'] = True
return result, 200
# A/B test: model selection
use_model_a = random.random() < self.ab_ratio
model = self.model_a if use_model_a else self.model_b
model_version = 'A' if use_model_a else 'B'
# Execute inference
result = model.predict(sensor_data)
result['model_version'] = model_version
result['from_cache'] = False
# Anomaly detection
if self._is_anomaly(result['prediction']):
self._send_alert(user, sensor_data, result)
# Save to cache (LRU)
self.cache[cache_key] = result
if len(self.cache) > self.cache_max_size:
self.cache.popitem(last=False) # Remove oldest
# Log
self.security.log_access(user, 'PREDICT', 'twin/predict', True)
self.logger.info(f"Prediction - User: {user}, Model: {model_version}, "
f"Result: {result['prediction']:.2f}")
return result, 200
def _is_anomaly(self, prediction):
"""Anomaly detection"""
return (prediction < self.alert_thresholds['min'] or
prediction > self.alert_thresholds['max'])
def _send_alert(self, user, sensor_data, result):
"""Send alert"""
alert_message = (
f"ANOMALY DETECTED\\n"
f"User: {user}\\n"
f"Prediction: {result['prediction']:.2f}\\n"
f"Sensor Data: {sensor_data}\\n"
f"Expected Range: [{self.alert_thresholds['min']}, "
f"{self.alert_thresholds['max']}]"
)
self.logger.warning(alert_message)
# In practice: email/Slack/SMS notification
print(f"🚨 ALERT: {alert_message}")
def get_ab_test_stats(self):
"""A/B test statistics"""
# In practice, aggregate from database
return {
'model_a_count': 0,
'model_b_count': 0,
'cache_hit_rate': 0.0
}
# Usage example
# config = {
# 'model_path_a': './models/v1.0',
# 'model_path_b': './models/v1.1',
# 'log_dir': './logs'
# }
# system = AdvancedProductionSystem('config.json')
# result, status = system.predict_with_cache(
# {'temp': 360, 'pressure': 6.0, 'flow': 100},
# 'operator1'
# )
# print(f"Prediction: {result['prediction']:.2f}")
# print(f"Model: {result['model_version']}, Cache: {result['from_cache']}")
print("Advanced production system implementation complete")
Explanation: This implementation compares model performance through A/B testing, reduces computational cost with caching, and immediately alerts operators through anomaly detection. These features significantly enhance operational efficiency in production systems.
Summary
In Chapter 5, we learned about production deployment of digital twins. Through practical implementation examples covering edge to cloud, security to monitoring, we deepened our understanding.
Through this series, you have mastered the complete picture from digital twin fundamentals to applications, implementation, and deployment. Next is the phase of applying to actual processes and continuously improving.
- Conduct small-scale PoC (proof of concept) in your own processes
- Gradually expand functionality (improve model accuracy, add optimization features)
- Accumulate operational data and establish continuous improvement cycle
- Consider integration with other digitalization initiatives (MES, ERP, etc.)
References
- Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
- Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
- Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
- McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.
Disclaimer
- This content is provided solely for educational, research, and informational purposes and does not constitute professional advice (legal, accounting, technical warranty, etc.).
- This content and accompanying code examples are provided "AS IS" without any warranty, express or implied, including but not limited to merchantability, fitness for a particular purpose, non-infringement, accuracy, completeness, operation, or safety.
- The author and Tohoku University assume no responsibility for the content, availability, or safety of external links, third-party data, tools, libraries, etc.
- To the maximum extent permitted by applicable law, the author and Tohoku University shall not be liable for any direct, indirect, incidental, special, consequential, or punitive damages arising from the use, execution, or interpretation of this content.
- The content may be changed, updated, or discontinued without notice.
- The copyright and license of this content are subject to the stated conditions (e.g., CC BY 4.0). Such licenses typically include no-warranty clauses.