5.1 エッジコンピューティングデプロイ
プラント現場での低レイテンシーリアルタイム推論を実現するため、エッジデバイス上でデジタルツインを稼働させます。
5.1.1 軽量モデルのデプロイ
import pickle
import numpy as np
from pathlib import Path
import json
import time
class EdgeDigitalTwin:
"""エッジデバイス用軽量デジタルツイン"""
def __init__(self, model_path):
self.model = None
self.scaler = None
self.config = {}
self.load_model(model_path)
def load_model(self, model_path):
"""モデル読み込み"""
model_dir = Path(model_path)
# モデル本体
with open(model_dir / 'model.pkl', 'rb') as f:
self.model = pickle.load(f)
# スケーラー
with open(model_dir / 'scaler.pkl', 'rb') as f:
self.scaler = pickle.load(f)
# 設定ファイル
with open(model_dir / 'config.json', 'r') as f:
self.config = json.load(f)
print(f"Model loaded: {self.config['model_type']}")
print(f"Features: {self.config['features']}")
def predict(self, sensor_data):
"""リアルタイム推論
Args:
sensor_data: {feature_name: value}の辞書
Returns:
予測結果と信頼区間
"""
start_time = time.time()
# 特徴量抽出
X = np.array([[sensor_data[f] for f in self.config['features']]])
# 前処理
X_scaled = self.scaler.transform(X)
# 推論
prediction = self.model.predict(X_scaled)[0]
# 信頼区間(アンサンブルモデルの場合)
if hasattr(self.model, 'estimators_'):
predictions = [est.predict(X_scaled)[0]
for est in self.model.estimators_]
confidence_interval = (
np.percentile(predictions, 5),
np.percentile(predictions, 95)
)
else:
confidence_interval = None
inference_time = (time.time() - start_time) * 1000 # ms
return {
'prediction': prediction,
'confidence_interval': confidence_interval,
'inference_time_ms': inference_time,
'timestamp': time.time()
}
def validate_input(self, sensor_data):
"""入力データ検証"""
missing_features = [f for f in self.config['features']
if f not in sensor_data]
if missing_features:
return False, f"Missing features: {missing_features}"
# 範囲チェック
for feature, value in sensor_data.items():
if feature in self.config.get('valid_ranges', {}):
min_val, max_val = self.config['valid_ranges'][feature]
if not (min_val <= value <= max_val):
return False, f"{feature}={value} out of range [{min_val}, {max_val}]"
return True, "OK"
def get_model_info(self):
"""モデル情報取得"""
return {
'type': self.config['model_type'],
'version': self.config.get('version', 'unknown'),
'features': self.config['features'],
'trained_date': self.config.get('trained_date', 'unknown')
}
# モデルパッケージング関数
def package_model_for_edge(model, scaler, features, output_dir,
valid_ranges=None):
"""エッジ用モデルパッケージング"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# モデル保存
with open(output_path / 'model.pkl', 'wb') as f:
pickle.dump(model, f)
# スケーラー保存
with open(output_path / 'scaler.pkl', 'wb') as f:
pickle.dump(scaler, f)
# 設定ファイル
config = {
'model_type': type(model).__name__,
'features': features,
'version': '1.0.0',
'trained_date': time.strftime('%Y-%m-%d'),
'valid_ranges': valid_ranges or {}
}
with open(output_path / 'config.json', 'w') as f:
json.dump(config, f, indent=2)
print(f"Model packaged to {output_path}")
# 使用例
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
# モデル訓練(簡易例)
np.random.seed(42)
X_train = np.random.rand(100, 3)
y_train = X_train[:, 0] * 10 + X_train[:, 1] * 5 + np.random.randn(100)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_train)
model = RandomForestRegressor(n_estimators=10, max_depth=5, random_state=42)
model.fit(X_scaled, y_train)
# パッケージング
package_model_for_edge(
model, scaler,
features=['temp', 'pressure', 'flow'],
output_dir='./edge_model',
valid_ranges={'temp': (300, 400), 'pressure': (1, 10), 'flow': (50, 150)}
)
# エッジデバイスでの使用
edge_twin = EdgeDigitalTwin('./edge_model')
# リアルタイム推論
sensor_reading = {'temp': 350, 'pressure': 5.5, 'flow': 100}
result = edge_twin.predict(sensor_reading)
print(f"Prediction: {result['prediction']:.2f}")
print(f"Inference Time: {result['inference_time_ms']:.2f} ms")
if result['confidence_interval']:
print(f"90% CI: [{result['confidence_interval'][0]:.2f}, "
f"{result['confidence_interval'][1]:.2f}]")
- モデルサイズを最小化(量子化、枝刈り)
- 推論時間を1秒以内に(リアルタイム制約)
- 入力検証を必ず実施(異常値対策)
- フェイルセーフモードを用意
5.2 クラウドデプロイアーキテクチャ
スケーラブルなクラウド環境でデジタルツインをホスティングし、複数プラントからのアクセスを可能にします。
5.2.1 マイクロサービスアーキテクチャ
from flask import Flask, request, jsonify
from functools import wraps
import logging
from datetime import datetime
class DigitalTwinMicroservice:
"""デジタルツインマイクロサービス"""
def __init__(self, model_path, service_name='digital-twin'):
self.app = Flask(service_name)
self.model = EdgeDigitalTwin(model_path)
self.request_count = 0
# ロギング設定
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(service_name)
self._setup_routes()
def _setup_routes(self):
"""APIエンドポイント設定"""
@self.app.route('/health', methods=['GET'])
def health_check():
"""ヘルスチェック"""
return jsonify({
'status': 'healthy',
'service': 'digital-twin',
'model_version': self.model.config.get('version', 'unknown'),
'timestamp': datetime.now().isoformat()
})
@self.app.route('/predict', methods=['POST'])
@self._log_request
def predict():
"""推論エンドポイント"""
try:
data = request.get_json()
# 入力検証
valid, message = self.model.validate_input(data)
if not valid:
return jsonify({'error': message}), 400
# 推論実行
result = self.model.predict(data)
self.request_count += 1
return jsonify({
'success': True,
'result': result,
'request_id': self.request_count
})
except Exception as e:
self.logger.error(f"Prediction error: {str(e)}")
return jsonify({'error': str(e)}), 500
@self.app.route('/model/info', methods=['GET'])
def model_info():
"""モデル情報"""
return jsonify(self.model.get_model_info())
@self.app.route('/metrics', methods=['GET'])
def metrics():
"""メトリクス"""
return jsonify({
'total_requests': self.request_count,
'model_version': self.model.config.get('version'),
'uptime_seconds': self._get_uptime()
})
def _log_request(self, f):
"""リクエストログデコレータ"""
@wraps(f)
def decorated_function(*args, **kwargs):
self.logger.info(f"Request: {request.method} {request.path}")
response = f(*args, **kwargs)
self.logger.info(f"Response: {response[1] if isinstance(response, tuple) else 200}")
return response
return decorated_function
def _get_uptime(self):
"""稼働時間(簡易版)"""
return 0 # 実装は省略
def run(self, host='0.0.0.0', port=5000, debug=False):
"""サービス起動"""
self.logger.info(f"Starting Digital Twin Microservice on {host}:{port}")
self.app.run(host=host, port=port, debug=debug)
# 使用例(実行時)
# service = DigitalTwinMicroservice('./edge_model')
# service.run(port=5000)
# クライアント側の使用例
import requests
def call_digital_twin_api(sensor_data, api_url='http://localhost:5000'):
"""デジタルツインAPI呼び出し"""
try:
# 推論リクエスト
response = requests.post(
f"{api_url}/predict",
json=sensor_data,
timeout=5.0
)
if response.status_code == 200:
return response.json()
else:
print(f"Error: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
# API呼び出し例
sensor_data = {'temp': 360, 'pressure': 6.0, 'flow': 100}
result = call_digital_twin_api(sensor_data)
if result and result['success']:
print(f"Prediction: {result['result']['prediction']:.2f}")
print(f"Inference time: {result['result']['inference_time_ms']:.2f} ms")
5.3 Dockerコンテナ化
デジタルツインをコンテナ化し、環境非依存かつ容易にデプロイできるようにします。
# Dockerfile の作成例(Python コード内でテキストとして表示)
DOCKERFILE_CONTENT = """
# ベースイメージ
FROM python:3.9-slim
# 作業ディレクトリ
WORKDIR /app
# システム依存関係
RUN apt-get update && apt-get install -y \\
gcc \\
&& rm -rf /var/lib/apt/lists/*
# Python依存関係
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# アプリケーションコード
COPY ./digital_twin /app/digital_twin
COPY ./edge_model /app/edge_model
# 非rootユーザーで実行
RUN useradd -m -u 1000 dtuser && chown -R dtuser:dtuser /app
USER dtuser
# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \\
CMD python -c "import requests; requests.get('http://localhost:5000/health')"
# ポート公開
EXPOSE 5000
# 起動コマンド
CMD ["python", "-m", "digital_twin.service"]
"""
# requirements.txt の内容
REQUIREMENTS_CONTENT = """
flask==2.3.0
numpy==1.24.0
scikit-learn==1.3.0
requests==2.31.0
gunicorn==21.0.0
"""
# Docker compose 設定
DOCKER_COMPOSE_CONTENT = """
version: '3.8'
services:
digital-twin:
build: .
ports:
- "5000:5000"
environment:
- MODEL_PATH=/app/edge_model
- LOG_LEVEL=INFO
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
interval: 30s
timeout: 5s
retries: 3
# ロードバランサー(複数インスタンス用)
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- digital-twin
restart: unless-stopped
"""
# Docker ビルド・実行スクリプト
class DockerDeployment:
"""Docker デプロイメント管理"""
@staticmethod
def create_dockerfile(output_dir='.'):
"""Dockerfile 作成"""
with open(f"{output_dir}/Dockerfile", 'w') as f:
f.write(DOCKERFILE_CONTENT)
print("Dockerfile created")
@staticmethod
def create_requirements(output_dir='.'):
"""requirements.txt 作成"""
with open(f"{output_dir}/requirements.txt", 'w') as f:
f.write(REQUIREMENTS_CONTENT)
print("requirements.txt created")
@staticmethod
def create_docker_compose(output_dir='.'):
"""docker-compose.yml 作成"""
with open(f"{output_dir}/docker-compose.yml", 'w') as f:
f.write(DOCKER_COMPOSE_CONTENT)
print("docker-compose.yml created")
@staticmethod
def build_image(tag='digital-twin:latest'):
"""Docker イメージビルド"""
import subprocess
cmd = f"docker build -t {tag} ."
subprocess.run(cmd, shell=True)
print(f"Image built: {tag}")
@staticmethod
def run_container(tag='digital-twin:latest', port=5000):
"""コンテナ起動"""
import subprocess
cmd = f"docker run -d -p {port}:5000 --name dt-service {tag}"
subprocess.run(cmd, shell=True)
print(f"Container started on port {port}")
# 使用例
# deployment = DockerDeployment()
# deployment.create_dockerfile()
# deployment.create_requirements()
# deployment.create_docker_compose()
# deployment.build_image()
# deployment.run_container()
print("Docker deployment files ready")
print("To build: docker build -t digital-twin .")
print("To run: docker-compose up -d")
5.4 RESTful API設計
標準的なRESTful APIを通じてデジタルツインへアクセスできるようにします。
from flask import Flask, request, jsonify, make_response
from functools import wraps
import jwt
import datetime
class SecureDigitalTwinAPI:
"""セキュアなデジタルツインAPI"""
def __init__(self, model_path, secret_key):
self.app = Flask(__name__)
self.app.config['SECRET_KEY'] = secret_key
self.model = EdgeDigitalTwin(model_path)
self._setup_secure_routes()
def _token_required(self, f):
"""JWT認証デコレータ"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'message': 'Token is missing'}), 401
try:
# "Bearer " 形式
token = token.split()[1] if ' ' in token else token
data = jwt.decode(token, self.app.config['SECRET_KEY'],
algorithms=['HS256'])
current_user = data['user']
except:
return jsonify({'message': 'Token is invalid'}), 401
return f(current_user, *args, **kwargs)
return decorated
def _setup_secure_routes(self):
"""セキュアなAPIエンドポイント"""
@self.app.route('/api/v1/auth/login', methods=['POST'])
def login():
"""認証(トークン発行)"""
auth = request.get_json()
# 実際は DB でユーザー検証
if auth.get('username') == 'admin' and auth.get('password') == 'secret':
token = jwt.encode({
'user': auth['username'],
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)
}, self.app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({'token': token})
return jsonify({'message': 'Invalid credentials'}), 401
@self.app.route('/api/v1/twin/predict', methods=['POST'])
@self._token_required
def predict(current_user):
"""推論(認証必須)"""
data = request.get_json()
valid, message = self.model.validate_input(data)
if not valid:
return jsonify({'error': message}), 400
result = self.model.predict(data)
return jsonify({
'user': current_user,
'prediction': result['prediction'],
'confidence_interval': result['confidence_interval'],
'timestamp': result['timestamp']
})
@self.app.route('/api/v1/twin/batch', methods=['POST'])
@self._token_required
def batch_predict(current_user):
"""バッチ推論"""
data = request.get_json()
inputs = data.get('inputs', [])
if len(inputs) > 100:
return jsonify({'error': 'Batch size limit: 100'}), 400
results = []
for inp in inputs:
valid, _ = self.model.validate_input(inp)
if valid:
result = self.model.predict(inp)
results.append({
'input': inp,
'prediction': result['prediction']
})
return jsonify({
'user': current_user,
'total': len(inputs),
'successful': len(results),
'results': results
})
@self.app.route('/api/v1/twin/status', methods=['GET'])
@self._token_required
def status(current_user):
"""ステータス確認"""
return jsonify({
'status': 'operational',
'model_info': self.model.get_model_info(),
'user': current_user
})
# 使用例(クライアント側)
class DigitalTwinClient:
"""デジタルツインAPIクライアント"""
def __init__(self, api_url, username, password):
self.api_url = api_url
self.token = None
self._authenticate(username, password)
def _authenticate(self, username, password):
"""認証"""
response = requests.post(
f"{self.api_url}/api/v1/auth/login",
json={'username': username, 'password': password}
)
if response.status_code == 200:
self.token = response.json()['token']
print("Authenticated successfully")
else:
raise Exception("Authentication failed")
def predict(self, sensor_data):
"""推論リクエスト"""
headers = {'Authorization': f'Bearer {self.token}'}
response = requests.post(
f"{self.api_url}/api/v1/twin/predict",
json=sensor_data,
headers=headers
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Prediction failed: {response.status_code}")
def batch_predict(self, inputs_list):
"""バッチ推論"""
headers = {'Authorization': f'Bearer {self.token}'}
response = requests.post(
f"{self.api_url}/api/v1/twin/batch",
json={'inputs': inputs_list},
headers=headers
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Batch prediction failed: {response.status_code}")
# 使用例
# client = DigitalTwinClient('http://localhost:5000', 'admin', 'secret')
# result = client.predict({'temp': 360, 'pressure': 6.0, 'flow': 100})
# print(f"Prediction: {result['prediction']}")
print("Secure API implementation ready")
5.5 可視化ダッシュボード
Plotly Dashを使ってリアルタイムモニタリングダッシュボードを構築します。
import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.graph_objs as go
import pandas as pd
from collections import deque
import threading
import time
class DigitalTwinDashboard:
"""デジタルツインダッシュボード"""
def __init__(self, digital_twin_api_url):
self.api_url = digital_twin_api_url
self.app = dash.Dash(__name__)
# データバッファ(最新100ポイント)
self.time_buffer = deque(maxlen=100)
self.prediction_buffer = deque(maxlen=100)
self.temp_buffer = deque(maxlen=100)
self.pressure_buffer = deque(maxlen=100)
self._setup_layout()
self._setup_callbacks()
def _setup_layout(self):
"""レイアウト設定"""
self.app.layout = html.Div([
html.H1("Digital Twin Dashboard",
style={'textAlign': 'center', 'color': '#2c3e50'}),
html.Div([
html.Div([
html.H3("Current Status"),
html.Div(id='status-display',
style={'fontSize': '24px', 'fontWeight': 'bold'})
], style={'width': '48%', 'display': 'inline-block'}),
html.Div([
html.H3("Model Info"),
html.Div(id='model-info')
], style={'width': '48%', 'float': 'right',
'display': 'inline-block'})
]),
html.Div([
html.H3("Input Controls"),
html.Label("Temperature (°C):"),
dcc.Slider(id='temp-slider', min=340, max=380, value=360,
marks={i: str(i) for i in range(340, 381, 10)}),
html.Label("Pressure (bar):"),
dcc.Slider(id='pressure-slider', min=4, max=8, value=6,
step=0.5,
marks={i: str(i) for i in range(4, 9)}),
html.Label("Flow (kg/h):"),
dcc.Slider(id='flow-slider', min=80, max=120, value=100,
marks={i: str(i) for i in range(80, 121, 10)}),
html.Button('Predict', id='predict-button', n_clicks=0)
], style={'margin': '20px'}),
html.Div([
dcc.Graph(id='prediction-time-series'),
dcc.Graph(id='process-conditions')
]),
dcc.Interval(id='interval-component', interval=2000, n_intervals=0)
])
def _setup_callbacks(self):
"""コールバック設定"""
@self.app.callback(
Output('status-display', 'children'),
Output('prediction-time-series', 'figure'),
Output('process-conditions', 'figure'),
Input('predict-button', 'n_clicks'),
Input('interval-component', 'n_intervals'),
State('temp-slider', 'value'),
State('pressure-slider', 'value'),
State('flow-slider', 'value')
)
def update_dashboard(n_clicks, n_intervals, temp, pressure, flow):
# 推論実行(ボタンクリック時またはインターバル)
if n_clicks > 0 or n_intervals > 0:
sensor_data = {
'temp': temp,
'pressure': pressure,
'flow': flow
}
# API呼び出し(実際の実装)
# result = call_digital_twin_api(sensor_data, self.api_url)
# 簡易版: ローカル計算
prediction = 85 - 0.05*(temp-360)**2 - 2*(pressure-6)**2
# バッファ更新
self.time_buffer.append(time.time())
self.prediction_buffer.append(prediction)
self.temp_buffer.append(temp)
self.pressure_buffer.append(pressure)
# ステータス表示
if len(self.prediction_buffer) > 0:
status = f"Latest Prediction: {self.prediction_buffer[-1]:.2f}%"
else:
status = "No data"
# 予測値時系列グラフ
time_series_fig = go.Figure()
if len(self.time_buffer) > 0:
time_series_fig.add_trace(go.Scatter(
x=list(range(len(self.prediction_buffer))),
y=list(self.prediction_buffer),
mode='lines+markers',
name='Yield Prediction',
line=dict(color='#11998e', width=2)
))
time_series_fig.update_layout(
title='Yield Prediction Over Time',
xaxis_title='Sample',
yaxis_title='Yield (%)',
template='plotly_white',
height=400
)
# プロセス条件グラフ
conditions_fig = go.Figure()
if len(self.temp_buffer) > 0:
conditions_fig.add_trace(go.Scatter(
x=list(range(len(self.temp_buffer))),
y=list(self.temp_buffer),
mode='lines',
name='Temperature',
yaxis='y1'
))
conditions_fig.add_trace(go.Scatter(
x=list(range(len(self.pressure_buffer))),
y=list(self.pressure_buffer),
mode='lines',
name='Pressure',
yaxis='y2'
))
conditions_fig.update_layout(
title='Process Conditions',
xaxis=dict(title='Sample'),
yaxis=dict(title='Temperature (°C)', side='left'),
yaxis2=dict(title='Pressure (bar)', side='right',
overlaying='y'),
template='plotly_white',
height=400
)
return status, time_series_fig, conditions_fig
def run(self, host='0.0.0.0', port=8050, debug=False):
"""ダッシュボード起動"""
self.app.run_server(host=host, port=port, debug=debug)
# 使用例
# dashboard = DigitalTwinDashboard('http://localhost:5000')
# dashboard.run(port=8050)
print("Dashboard implementation ready")
print("Access at: http://localhost:8050")
5.6 セキュリティとアクセス制御
本番環境では適切なセキュリティ対策が不可欠です。
import hashlib
import secrets
from datetime import datetime, timedelta
import json
class SecurityManager:
"""セキュリティ管理システム"""
def __init__(self):
self.users = {} # 実際はデータベース
self.api_keys = {}
self.access_log = []
def create_user(self, username, password, role='user'):
"""ユーザー作成"""
# パスワードハッシュ化
salt = secrets.token_hex(16)
password_hash = self._hash_password(password, salt)
self.users[username] = {
'password_hash': password_hash,
'salt': salt,
'role': role,
'created_at': datetime.now().isoformat(),
'active': True
}
return True
def _hash_password(self, password, salt):
"""パスワードハッシュ"""
return hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000
).hex()
def verify_password(self, username, password):
"""パスワード検証"""
if username not in self.users:
return False
user = self.users[username]
password_hash = self._hash_password(password, user['salt'])
return password_hash == user['password_hash']
def generate_api_key(self, username, expiry_days=365):
"""APIキー生成"""
if username not in self.users:
return None
api_key = secrets.token_urlsafe(32)
expiry = datetime.now() + timedelta(days=expiry_days)
self.api_keys[api_key] = {
'username': username,
'created_at': datetime.now().isoformat(),
'expires_at': expiry.isoformat(),
'active': True
}
return api_key
def verify_api_key(self, api_key):
"""APIキー検証"""
if api_key not in self.api_keys:
return False, "Invalid API key"
key_info = self.api_keys[api_key]
if not key_info['active']:
return False, "API key deactivated"
expiry = datetime.fromisoformat(key_info['expires_at'])
if datetime.now() > expiry:
return False, "API key expired"
return True, key_info['username']
def log_access(self, username, action, resource, success=True):
"""アクセスログ"""
self.access_log.append({
'timestamp': datetime.now().isoformat(),
'username': username,
'action': action,
'resource': resource,
'success': success
})
def get_access_log(self, username=None, limit=100):
"""アクセスログ取得"""
logs = self.access_log[-limit:]
if username:
logs = [log for log in logs if log['username'] == username]
return logs
def check_rate_limit(self, username, max_requests=100, window_minutes=60):
"""レート制限チェック"""
cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
recent_requests = [
log for log in self.access_log
if (log['username'] == username and
datetime.fromisoformat(log['timestamp']) > cutoff_time)
]
return len(recent_requests) < max_requests
# 使用例
security = SecurityManager()
# ユーザー作成
security.create_user('operator1', 'secure_password', role='operator')
security.create_user('admin1', 'admin_password', role='admin')
# 認証
is_valid = security.verify_password('operator1', 'secure_password')
print(f"Authentication: {is_valid}")
# APIキー発行
api_key = security.generate_api_key('operator1', expiry_days=90)
print(f"API Key: {api_key[:20]}...")
# APIキー検証
valid, username = security.verify_api_key(api_key)
print(f"API Key Valid: {valid}, User: {username}")
# アクセスログ
security.log_access('operator1', 'PREDICT', '/api/v1/twin/predict', success=True)
security.log_access('operator1', 'PREDICT', '/api/v1/twin/predict', success=True)
# レート制限チェック
within_limit = security.check_rate_limit('operator1', max_requests=100)
print(f"Within rate limit: {within_limit}")
# アクセスログ確認
logs = security.get_access_log('operator1')
print(f"Access log entries: {len(logs)}")
- HTTPS通信の強制(TLS 1.2以上)
- 認証・認可の実装(JWT, OAuth2.0)
- レート制限とDDoS対策
- 入力検証とSQLインジェクション対策
- アクセスログの保存と監視
- 定期的なセキュリティ監査
5.7 本番システム統合例
すべての要素を統合した完全な本番システムの例です。
import logging
from pathlib import Path
class ProductionDigitalTwinSystem:
"""本番デジタルツインシステム"""
def __init__(self, config_path):
self.config = self._load_config(config_path)
self.logger = self._setup_logging()
# コンポーネント初期化
self.edge_twin = EdgeDigitalTwin(self.config['model_path'])
self.security = SecurityManager()
self.api_service = None # 実際はFlaskアプリ
self.logger.info("Production Digital Twin System initialized")
def _load_config(self, config_path):
"""設定読み込み"""
with open(config_path, 'r') as f:
return json.load(f)
def _setup_logging(self):
"""ロギング設定"""
log_dir = Path(self.config.get('log_dir', './logs'))
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / 'digital_twin.log'),
logging.StreamHandler()
]
)
return logging.getLogger('DigitalTwinSystem')
def start(self):
"""システム起動"""
self.logger.info("Starting Digital Twin System")
# ヘルスチェック
if not self._health_check():
self.logger.error("Health check failed")
return False
# API サービス起動(実際の実装)
# self.api_service = DigitalTwinMicroservice(...)
# self.api_service.run()
self.logger.info("System started successfully")
return True
def _health_check(self):
"""起動時ヘルスチェック"""
checks = {
'model_loaded': hasattr(self.edge_twin, 'model'),
'config_valid': 'model_path' in self.config,
'security_ready': self.security is not None
}
all_healthy = all(checks.values())
for check, status in checks.items():
self.logger.info(f"Health check - {check}: {'OK' if status else 'FAIL'}")
return all_healthy
def predict_with_monitoring(self, sensor_data, user):
"""監視付き推論"""
try:
# セキュリティチェック
if not self.security.check_rate_limit(user):
self.logger.warning(f"Rate limit exceeded for user: {user}")
return {'error': 'Rate limit exceeded'}, 429
# 推論実行
result = self.edge_twin.predict(sensor_data)
# ログ記録
self.security.log_access(user, 'PREDICT', 'twin/predict', True)
self.logger.info(f"Prediction successful - User: {user}, "
f"Time: {result['inference_time_ms']:.2f}ms")
return result, 200
except Exception as e:
self.logger.error(f"Prediction error: {str(e)}")
self.security.log_access(user, 'PREDICT', 'twin/predict', False)
return {'error': 'Internal server error'}, 500
def shutdown(self):
"""システム停止"""
self.logger.info("Shutting down Digital Twin System")
# クリーンアップ処理
# - データベース接続クローズ
# - キャッシュのフラッシュ
# - リソース解放
self.logger.info("System shutdown complete")
# 設定ファイル例
PRODUCTION_CONFIG = {
"model_path": "/app/models/production_v1.0",
"log_dir": "/var/log/digital_twin",
"api_port": 5000,
"max_requests_per_hour": 1000,
"enable_monitoring": True,
"database": {
"host": "localhost",
"port": 5432,
"name": "digital_twin_db"
}
}
# 使用例
# system = ProductionDigitalTwinSystem('config.json')
# system.start()
# 推論実行
# result, status = system.predict_with_monitoring(
# {'temp': 360, 'pressure': 6.0, 'flow': 100},
# user='operator1'
# )
print("Production system implementation complete")
- Siemens: クラウドベースのデジタルツインプラットフォームで100+プラントを管理
- Shell: エッジ+クラウドハイブリッド構成で1ms未満のレイテンシーを達成
- BASF: マイクロサービスアーキテクチャで99.99%の可用性を実現
学習目標の確認
この章を完了すると、以下を説明・実装できるようになります:
基本理解
- ✅ エッジとクラウドのデプロイメントの違いを説明できる
- ✅ コンテナ化の利点を理解している
- ✅ RESTful APIの設計原則を理解している
- ✅ 本番システムに必要なセキュリティ要件を説明できる
実践スキル
- ✅ 軽量モデルをエッジデバイス用にパッケージングできる
- ✅ FlaskでRESTful APIを実装できる
- ✅ Dockerfileを作成しコンテナ化できる
- ✅ Plotly Dashでダッシュボードを構築できる
- ✅ 基本的なセキュリティ機能を実装できる
応用力
- ✅ 要件に応じた適切なデプロイメント戦略を選択できる
- ✅ マイクロサービスアーキテクチャを設計できる
- ✅ 本番環境の監視とログ戦略を立案できる
- ✅ セキュリティリスクを評価し対策を講じられる
演習問題
Easy(基礎確認)
Q1: エッジデプロイとクラウドデプロイの主な違いを3つ挙げてください。
解答を見る
正解:
| 項目 | エッジデプロイ | クラウドデプロイ |
|---|---|---|
| レイテンシー | 低(<1ms可能) | 高(10-100ms) |
| 計算リソース | 制限あり | スケーラブル |
| ネットワーク依存 | 低(オフライン動作可) | 高(常時接続必要) |
解説: エッジデプロイは低レイテンシーが必要なリアルタイム制御に、クラウドデプロイは複雑な分析や複数拠点からのアクセスに適しています。実務では両者を組み合わせたハイブリッド構成が一般的です。
Medium(応用)
Q2: 本番環境のデジタルツインシステムで必須のセキュリティ対策を5つ挙げてください。
解答を見る
正解:
- 認証・認可: JWT、OAuth2.0によるユーザー認証
- 暗号化通信: HTTPS (TLS 1.2以上) の強制
- 入力検証: SQLインジェクション、XSS対策
- レート制限: DDoS攻撃対策
- 監査ログ: すべてのアクセスを記録・監視
追加推奨事項:
- APIキーのローテーション
- 最小権限の原則
- 定期的なセキュリティ監査
- 脆弱性スキャン
Hard(発展)
Q3: 提供されたProductionDigitalTwinSystemクラスを拡張し、以下の機能を追加してください:
- モデルのA/Bテスト機能(2つのモデルバージョンをランダムに切り替え)
- 予測結果のキャッシング(同じ入力への再計算を防ぐ)
- 異常検知アラート(予測値が異常範囲の場合に通知)
解答例を見る
import hashlib
from collections import OrderedDict
import random
class AdvancedProductionSystem(ProductionDigitalTwinSystem):
"""拡張本番システム"""
def __init__(self, config_path):
super().__init__(config_path)
# A/Bテスト用に2つのモデル
self.model_a = EdgeDigitalTwin(self.config['model_path_a'])
self.model_b = EdgeDigitalTwin(self.config['model_path_b'])
self.ab_ratio = 0.5 # 50/50分割
# キャッシュ(最大1000エントリ、LRU)
self.cache = OrderedDict()
self.cache_max_size = 1000
# 異常検知しきい値
self.alert_thresholds = {
'min': 70,
'max': 95
}
def _get_cache_key(self, sensor_data):
"""キャッシュキー生成"""
# センサーデータをソートして一意なキーを生成
sorted_data = json.dumps(sensor_data, sort_keys=True)
return hashlib.md5(sorted_data.encode()).hexdigest()
def predict_with_cache(self, sensor_data, user):
"""キャッシュ付き推論"""
cache_key = self._get_cache_key(sensor_data)
# キャッシュヒット
if cache_key in self.cache:
self.logger.info(f"Cache hit for user: {user}")
result = self.cache[cache_key]
result['from_cache'] = True
return result, 200
# A/Bテスト: モデル選択
use_model_a = random.random() < self.ab_ratio
model = self.model_a if use_model_a else self.model_b
model_version = 'A' if use_model_a else 'B'
# 推論実行
result = model.predict(sensor_data)
result['model_version'] = model_version
result['from_cache'] = False
# 異常検知
if self._is_anomaly(result['prediction']):
self._send_alert(user, sensor_data, result)
# キャッシュ保存(LRU)
self.cache[cache_key] = result
if len(self.cache) > self.cache_max_size:
self.cache.popitem(last=False) # 最古を削除
# ログ記録
self.security.log_access(user, 'PREDICT', 'twin/predict', True)
self.logger.info(f"Prediction - User: {user}, Model: {model_version}, "
f"Result: {result['prediction']:.2f}")
return result, 200
def _is_anomaly(self, prediction):
"""異常判定"""
return (prediction < self.alert_thresholds['min'] or
prediction > self.alert_thresholds['max'])
def _send_alert(self, user, sensor_data, result):
"""アラート送信"""
alert_message = (
f"ANOMALY DETECTED\\n"
f"User: {user}\\n"
f"Prediction: {result['prediction']:.2f}\\n"
f"Sensor Data: {sensor_data}\\n"
f"Expected Range: [{self.alert_thresholds['min']}, "
f"{self.alert_thresholds['max']}]"
)
self.logger.warning(alert_message)
# 実際は email/Slack/SMS 通知
print(f"🚨 ALERT: {alert_message}")
def get_ab_test_stats(self):
"""A/Bテスト統計"""
# 実際はデータベースから集計
return {
'model_a_count': 0,
'model_b_count': 0,
'cache_hit_rate': 0.0
}
# 使用例
# config = {
# 'model_path_a': './models/v1.0',
# 'model_path_b': './models/v1.1',
# 'log_dir': './logs'
# }
# system = AdvancedProductionSystem('config.json')
# result, status = system.predict_with_cache(
# {'temp': 360, 'pressure': 6.0, 'flow': 100},
# 'operator1'
# )
# print(f"Prediction: {result['prediction']:.2f}")
# print(f"Model: {result['model_version']}, Cache: {result['from_cache']}")
print("Advanced production system implementation complete")
解説: この実装では、A/Bテストでモデル性能を比較し、キャッシングで計算コストを削減し、異常検知で運転員に即座にアラートを送信します。本番システムではこれらの機能が運用効率を大きく向上させます。
まとめ
第5章では、デジタルツインの本番デプロイメントについて学びました。エッジからクラウドまで、セキュリティからモニタリングまで、実用的な実装例を通じて理解を深めました。
このシリーズを通じて、デジタルツインの基礎から応用、実装、デプロイまでの全体像を習得しました。次は、実際のプロセスに適用し、継続的に改善していくフェーズです。
- 自社プロセスでの小規模PoC(概念実証)を実施
- 段階的に機能を拡張(モデル精度向上、最適化機能追加)
- 運用データを蓄積し、継続的な改善サイクルを確立
- 他のデジタル化施策(MES、ERPなど)との統合を検討