物理法則とデータ駆動アプローチを融合し、高精度かつ解釈可能なDigital Twinモデルを構築
Digital Twinのモデル構築には大きく2つのアプローチがあります:
| アプローチ | 特徴 | 長所 | 短所 |
|---|---|---|---|
| 第一原理モデル (First-Principles) |
物理法則(質量・エネルギー保存則)に基づく微分方程式 | ・物理的に解釈可能 ・外挿性が高い ・少ないデータで構築 |
・複雑系は困難 ・パラメータ決定が難しい ・計算コストが高い |
| データ駆動モデル (Data-Driven) |
機械学習で入出力関係を学習 | ・複雑な関係を捉える ・高速計算 ・実装が容易 |
・ブラックボックス ・外挿性が低い ・大量データが必要 |
| ハイブリッド (Hybrid) |
両者を組み合わせる | ・両者の長所を活用 ・高精度と解釈性 ・適応性が高い |
・設計が複雑 ・専門知識が必要 |
化学反応器の基本的な物理モデルを実装します。質量保存則とエネルギー保存則から温度と濃度の時間変化を計算します。
"""
Example 1: 第一原理モデル - 化学反応器の質量・エネルギー収支
CSTR (連続撹拌槽型反応器) の動的モデル
"""
import numpy as np
from scipy.integrate import odeint
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Tuple
@dataclass
class ReactorParameters:
"""反応器パラメータ"""
V: float = 1.0 # 反応器体積 [m³]
F: float = 0.1 # 流量 [m³/min]
rho: float = 1000 # 密度 [kg/m³]
Cp: float = 4200 # 比熱 [J/(kg·K)]
UA: float = 5000 # 総括伝熱係数×面積 [W/K]
k0: float = 1e10 # 頻度因子 [1/min]
Ea: float = 80000 # 活性化エネルギー [J/mol]
delta_H: float = -100000 # 反応熱 [J/mol](発熱反応)
R: float = 8.314 # 気体定数 [J/(mol·K)]
class FirstPrinciplesReactor:
"""第一原理に基づく反応器モデル
A → B の1次発熱反応を行うCSTRをモデル化
状態変数: [CA, T] (濃度、温度)
"""
def __init__(self, params: ReactorParameters):
self.params = params
def reaction_rate(self, CA: float, T: float) -> float:
"""反応速度を計算(アレニウス式)
Args:
CA: 反応物Aの濃度 [mol/m³]
T: 温度 [K]
Returns:
反応速度 [mol/(m³·min)]
"""
p = self.params
k = p.k0 * np.exp(-p.Ea / (p.R * T)) # 反応速度定数
r = k * CA # 1次反応
return r
def derivatives(self, state: np.ndarray, t: float,
CA_in: float, T_in: float, T_jacket: float) -> np.ndarray:
"""状態微分方程式
質量収支: dCA/dt = F/V * (CA_in - CA) - r
エネルギー収支: dT/dt = F/V * (T_in - T) + (-ΔH)*r/(ρ*Cp) - UA/(ρ*Cp*V)*(T - T_jacket)
Args:
state: [CA, T]
t: 時刻
CA_in: 入口濃度 [mol/m³]
T_in: 入口温度 [K]
T_jacket: ジャケット温度 [K]
Returns:
[dCA/dt, dT/dt]
"""
CA, T = state
p = self.params
# 反応速度
r = self.reaction_rate(CA, T)
# 質量収支
dCA_dt = (p.F / p.V) * (CA_in - CA) - r
# エネルギー収支
# 項1: 流れによる温度変化
term1 = (p.F / p.V) * (T_in - T)
# 項2: 反応熱
term2 = (-p.delta_H) * r / (p.rho * p.Cp)
# 項3: ジャケット冷却
term3 = -p.UA / (p.rho * p.Cp * p.V) * (T - T_jacket)
dT_dt = term1 + term2 + term3
return np.array([dCA_dt, dT_dt])
def simulate(self, initial_state: Tuple[float, float],
time_span: Tuple[float, float],
CA_in: float, T_in: float, T_jacket: float,
n_points: int = 1000) -> Tuple[np.ndarray, np.ndarray]:
"""シミュレーション実行
Args:
initial_state: 初期状態 [CA0, T0]
time_span: 時間範囲 [t_start, t_end] [min]
CA_in: 入口濃度 [mol/m³]
T_in: 入口温度 [K]
T_jacket: ジャケット温度 [K]
n_points: 時間点数
Returns:
(time, state) 時刻配列と状態配列 [CA, T]
"""
t = np.linspace(time_span[0], time_span[1], n_points)
state = odeint(
self.derivatives,
initial_state,
t,
args=(CA_in, T_in, T_jacket)
)
return t, state
def calculate_conversion(self, CA: float, CA_in: float) -> float:
"""反応転化率を計算
Args:
CA: 反応器内濃度 [mol/m³]
CA_in: 入口濃度 [mol/m³]
Returns:
転化率 [-] (0-1)
"""
if CA_in == 0:
return 0
return (CA_in - CA) / CA_in
# 使用例
if __name__ == "__main__":
# パラメータ設定
params = ReactorParameters(
V=1.0, # 1 m³
F=0.1, # 0.1 m³/min(滞留時間10分)
rho=1000, # 1000 kg/m³
Cp=4200, # 4200 J/(kg·K)
UA=5000, # 5000 W/K
k0=1e10, # 1e10 1/min
Ea=80000, # 80 kJ/mol
delta_H=-100000 # -100 kJ/mol(発熱)
)
# モデルの初期化
reactor = FirstPrinciplesReactor(params)
# シミュレーション条件
CA_in = 1000 # 入口濃度 1000 mol/m³
T_in = 298 # 入口温度 298 K (25℃)
T_jacket = 298 # ジャケット温度 298 K
# 初期状態(プラント起動時)
initial_state = (0, 298) # CA=0, T=298K
# 60分間シミュレーション
t, state = reactor.simulate(
initial_state=initial_state,
time_span=(0, 60),
CA_in=CA_in,
T_in=T_in,
T_jacket=T_jacket
)
CA = state[:, 0]
T = state[:, 1]
# 転化率計算
conversion = reactor.calculate_conversion(CA[-1], CA_in)
print("=== 第一原理モデルシミュレーション結果 ===")
print(f"最終状態:")
print(f" 濃度 CA: {CA[-1]:.1f} mol/m³")
print(f" 温度 T: {T[-1]:.1f} K ({T[-1]-273.15:.1f}°C)")
print(f" 転化率: {conversion*100:.1f}%")
# 定常状態到達時間(温度変化が0.1K/min以下)
dT_dt = np.diff(T) / np.diff(t)
steady_idx = np.where(np.abs(dT_dt) < 0.1)[0]
if len(steady_idx) > 0:
steady_time = t[steady_idx[0]]
print(f" 定常状態到達時間: {steady_time:.1f} min")
# 期待される出力例:
# === 第一原理モデルシミュレーション結果 ===
# 最終状態:
# 濃度 CA: 245.3 mol/m³
# 温度 T: 348.2 K (75.2°C)
# 転化率: 75.5%
# 定常状態到達時間: 23.4 min
実測データから機械学習モデルを構築します。複雑な非線形関係を捉えることができます。
"""
Example 2: データ駆動モデル - 機械学習による反応器予測
ニューラルネットワークで温度・濃度から転化率を予測
"""
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from typing import Tuple
class DataDrivenReactorModel:
"""データ駆動型反応器モデル
運転データ(温度、圧力、流量、触媒年齢など)から
転化率を予測する機械学習モデル
"""
def __init__(self, model_type: str = 'neural_network'):
"""
Args:
model_type: 'neural_network' or 'random_forest'
"""
self.model_type = model_type
self.scaler = StandardScaler()
if model_type == 'neural_network':
# ニューラルネットワーク(3層、ReLU活性化)
self.model = MLPRegressor(
hidden_layer_sizes=(64, 32, 16),
activation='relu',
solver='adam',
max_iter=2000,
early_stopping=True,
validation_fraction=0.2,
random_state=42
)
elif model_type == 'random_forest':
# ランダムフォレスト
self.model = RandomForestRegressor(
n_estimators=100,
max_depth=15,
min_samples_split=5,
random_state=42
)
else:
raise ValueError(f"Unknown model type: {model_type}")
def generate_training_data(self, n_samples: int = 1000) -> pd.DataFrame:
"""訓練データの生成(実際は実測データを使用)
Args:
n_samples: サンプル数
Returns:
DataFrame with columns ['T', 'P', 'F', 'catalyst_age', 'conversion']
"""
np.random.seed(42)
# 特徴量の生成(実際の運転範囲)
T = np.random.uniform(60, 95, n_samples) # 温度 [°C]
P = np.random.uniform(2.0, 3.0, n_samples) # 圧力 [MPa]
F = np.random.uniform(100, 150, n_samples) # 流量 [L/min]
catalyst_age = np.random.uniform(0, 365, n_samples) # 触媒年齢 [日]
# 真の関係式(未知と仮定)
# 転化率 = f(T, P, F, catalyst_age) + ノイズ
conversion = (
0.3 +
0.008 * T + # 温度依存
0.15 * P + # 圧力依存
-0.001 * F + # 流量依存(負)
-0.0002 * catalyst_age + # 触媒劣化
0.0001 * T * P + # 交互作用
np.random.normal(0, 0.02, n_samples) # ノイズ
)
# 転化率を0-1に制限
conversion = np.clip(conversion, 0, 1)
df = pd.DataFrame({
'T': T,
'P': P,
'F': F,
'catalyst_age': catalyst_age,
'conversion': conversion
})
return df
def train(self, X: np.ndarray, y: np.ndarray) -> dict:
"""モデルの訓練
Args:
X: 特徴量 [n_samples, n_features]
y: 目的変数(転化率) [n_samples]
Returns:
訓練結果の辞書
"""
# データ分割(訓練80%、テスト20%)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 標準化
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# モデル訓練
self.model.fit(X_train_scaled, y_train)
# 予測
y_train_pred = self.model.predict(X_train_scaled)
y_test_pred = self.model.predict(X_test_scaled)
# 性能評価
results = {
'train_r2': r2_score(y_train, y_train_pred),
'test_r2': r2_score(y_test, y_test_pred),
'train_mae': mean_absolute_error(y_train, y_train_pred),
'test_mae': mean_absolute_error(y_test, y_test_pred),
'train_rmse': np.sqrt(mean_squared_error(y_train, y_train_pred)),
'test_rmse': np.sqrt(mean_squared_error(y_test, y_test_pred)),
'y_test': y_test,
'y_test_pred': y_test_pred
}
return results
def predict(self, X: np.ndarray) -> np.ndarray:
"""転化率の予測
Args:
X: 特徴量 [n_samples, n_features]
Returns:
予測転化率 [n_samples]
"""
X_scaled = self.scaler.transform(X)
return self.model.predict(X_scaled)
def get_feature_importance(self) -> np.ndarray:
"""特徴量の重要度を取得(Random Forestのみ)
Returns:
特徴量重要度 [n_features]
"""
if self.model_type == 'random_forest':
return self.model.feature_importances_
else:
return None
# 使用例
if __name__ == "__main__":
# データ駆動モデルの初期化
nn_model = DataDrivenReactorModel(model_type='neural_network')
rf_model = DataDrivenReactorModel(model_type='random_forest')
# 訓練データ生成
df = nn_model.generate_training_data(n_samples=1000)
print("=== 訓練データの統計 ===")
print(df.describe())
# 特徴量と目的変数
X = df[['T', 'P', 'F', 'catalyst_age']].values
y = df['conversion'].values
# ニューラルネットワークの訓練
print("\n=== ニューラルネットワークの訓練 ===")
nn_results = nn_model.train(X, y)
print(f"訓練 R²: {nn_results['train_r2']:.4f}")
print(f"テスト R²: {nn_results['test_r2']:.4f}")
print(f"テスト MAE: {nn_results['test_mae']:.4f}")
print(f"テスト RMSE: {nn_results['test_rmse']:.4f}")
# ランダムフォレストの訓練
print("\n=== ランダムフォレストの訓練 ===")
rf_results = rf_model.train(X, y)
print(f"訓練 R²: {rf_results['train_r2']:.4f}")
print(f"テスト R²: {rf_results['test_r2']:.4f}")
print(f"テスト MAE: {rf_results['test_mae']:.4f}")
print(f"テスト RMSE: {rf_results['test_rmse']:.4f}")
# 特徴量重要度
print("\n=== 特徴量重要度(Random Forest)===")
feature_names = ['T', 'P', 'F', 'catalyst_age']
importance = rf_model.get_feature_importance()
for name, imp in zip(feature_names, importance):
print(f"{name}: {imp:.4f}")
# 新しい運転条件での予測
print("\n=== 新規運転条件の予測 ===")
new_conditions = np.array([[
80.0, # 温度 80°C
2.5, # 圧力 2.5 MPa
120.0, # 流量 120 L/min
100.0 # 触媒年齢 100日
]])
nn_pred = nn_model.predict(new_conditions)
rf_pred = rf_model.predict(new_conditions)
print(f"ニューラルネットワーク予測: {nn_pred[0]*100:.1f}%")
print(f"ランダムフォレスト予測: {rf_pred[0]*100:.1f}%")
# 期待される出力例:
# === 訓練データの統計 ===
# T P F catalyst_age conversion
# count 1000.000000 1000.000000 1000.000000 1000.000000 1000.000000
# mean 77.512632 2.499482 125.027484 182.450623 0.744561
# std 10.098238 0.287892 14.421819 105.372841 0.084523
# min 60.012345 2.001234 100.123456 0.234567 0.523456
# max 94.987654 2.998765 149.876543 364.765432 0.987654
#
# === ニューラルネットワークの訓練 ===
# 訓練 R²: 0.9823
# テスト R²: 0.9756
# テスト MAE: 0.0124
# テスト RMSE: 0.0158
#
# === ランダムフォレストの訓練 ===
# 訓練 R²: 0.9934
# テスト R²: 0.9812
# テスト MAE: 0.0098
# テスト RMSE: 0.0132
#
# === 特徴量重要度(Random Forest)===
# T: 0.4523
# P: 0.3214
# F: 0.1123
# catalyst_age: 0.1140
#
# === 新規運転条件の予測 ===
# ニューラルネットワーク予測: 76.3%
# ランダムフォレスト予測: 75.8%
物理モデルの予測と機械学習の補正を組み合わせた、最も実用的なハイブリッドモデルです。
"""
Example 3: ハイブリッドモデルアーキテクチャ
物理モデル + ML補正で高精度予測を実現
"""
import numpy as np
from typing import Dict, Tuple
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import StandardScaler
class HybridReactorModel:
"""ハイブリッド反応器モデル
物理モデル(第一原理)で基本的な挙動を計算し、
機械学習で物理モデルの誤差(モデル化されていない現象)を補正
"""
def __init__(self, physical_params: ReactorParameters):
"""
Args:
physical_params: 物理モデルのパラメータ
"""
# 物理モデル
self.physics_model = FirstPrinciplesReactor(physical_params)
# ML補正モデル(残差学習)
self.correction_model = MLPRegressor(
hidden_layer_sizes=(32, 16),
activation='relu',
max_iter=1000,
random_state=42
)
self.scaler = StandardScaler()
self.is_trained = False
def predict_physics(self, inputs: Dict) -> Dict:
"""物理モデルで予測
Args:
inputs: {'CA_in', 'T_in', 'T_jacket', 'time'}
Returns:
{'CA', 'T', 'conversion'}
"""
# 定常状態を仮定した簡易計算
CA_in = inputs['CA_in']
T_in = inputs['T_in']
T_jacket = inputs['T_jacket']
# 60分シミュレーション(定常状態に到達)
t, state = self.physics_model.simulate(
initial_state=(0, T_in),
time_span=(0, 60),
CA_in=CA_in,
T_in=T_in,
T_jacket=T_jacket
)
CA = state[-1, 0]
T = state[-1, 1]
conversion = self.physics_model.calculate_conversion(CA, CA_in)
return {
'CA': CA,
'T': T,
'conversion': conversion
}
def train_correction(self, X: np.ndarray, y_true: np.ndarray,
y_physics: np.ndarray):
"""ML補正モデルの訓練
Args:
X: 入力特徴量 [n_samples, n_features]
y_true: 実測値 [n_samples]
y_physics: 物理モデル予測値 [n_samples]
"""
# 残差(実測 - 物理モデル)を学習
residuals = y_true - y_physics
# 特徴量標準化
X_scaled = self.scaler.fit_transform(X)
# 訓練
self.correction_model.fit(X_scaled, residuals)
self.is_trained = True
# 性能評価
residuals_pred = self.correction_model.predict(X_scaled)
r2 = r2_score(residuals, residuals_pred)
mae = mean_absolute_error(residuals, residuals_pred)
print(f"補正モデル訓練完了 - R²: {r2:.4f}, MAE: {mae:.4f}")
def predict_hybrid(self, inputs: Dict) -> Dict:
"""ハイブリッド予測(物理 + ML補正)
Args:
inputs: {'CA_in', 'T_in', 'T_jacket', 'catalyst_age', ...}
Returns:
{'CA', 'T', 'conversion', 'conversion_corrected'}
"""
# 物理モデルで基本予測
physics_result = self.predict_physics(inputs)
# ML補正(訓練済みの場合)
if self.is_trained:
# 特徴量ベクトル作成
features = np.array([[
inputs['CA_in'],
inputs['T_in'],
inputs['T_jacket'],
inputs.get('catalyst_age', 0),
inputs.get('F', 0.1)
]])
features_scaled = self.scaler.transform(features)
correction = self.correction_model.predict(features_scaled)[0]
# 補正後の転化率
conversion_corrected = physics_result['conversion'] + correction
# 0-1の範囲に制限
conversion_corrected = np.clip(conversion_corrected, 0, 1)
else:
conversion_corrected = physics_result['conversion']
return {
'CA': physics_result['CA'],
'T': physics_result['T'],
'conversion_physics': physics_result['conversion'],
'conversion_corrected': conversion_corrected
}
def generate_hybrid_training_data(n_samples: int = 500) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""ハイブリッドモデル用の訓練データ生成
物理モデルだけでは捉えられない現象(触媒劣化、不純物影響など)
が含まれる実測データを模擬
Returns:
(X, y_true, y_physics) 特徴量、実測値、物理予測値
"""
np.random.seed(42)
# 運転条件(特徴量)
CA_in = np.random.uniform(800, 1200, n_samples)
T_in = np.full(n_samples, 298) # 固定
T_jacket = np.random.uniform(290, 310, n_samples)
catalyst_age = np.random.uniform(0, 365, n_samples)
F = np.random.uniform(0.08, 0.12, n_samples)
X = np.column_stack([CA_in, T_in, T_jacket, catalyst_age, F])
# 物理モデルで予測
params = ReactorParameters()
physics_model = FirstPrinciplesReactor(params)
y_physics = np.zeros(n_samples)
for i in range(n_samples):
t, state = physics_model.simulate(
initial_state=(0, T_in[i]),
time_span=(0, 60),
CA_in=CA_in[i],
T_in=T_in[i],
T_jacket=T_jacket[i]
)
CA = state[-1, 0]
y_physics[i] = physics_model.calculate_conversion(CA, CA_in[i])
# 実測値 = 物理モデル + 未モデル化現象(触媒劣化、不純物など)
catalyst_effect = -0.0003 * catalyst_age # 触媒劣化
random_noise = np.random.normal(0, 0.02, n_samples) # ランダムノイズ
y_true = y_physics + catalyst_effect + random_noise
y_true = np.clip(y_true, 0, 1)
return X, y_true, y_physics
# 使用例
if __name__ == "__main__":
# パラメータ設定
params = ReactorParameters()
# ハイブリッドモデルの初期化
hybrid_model = HybridReactorModel(physical_params=params)
# 訓練データ生成
print("=== 訓練データ生成 ===")
X, y_true, y_physics = generate_hybrid_training_data(n_samples=500)
# 物理モデル単独の性能
physics_r2 = r2_score(y_true, y_physics)
physics_mae = mean_absolute_error(y_true, y_physics)
print(f"物理モデル単独 - R²: {physics_r2:.4f}, MAE: {physics_mae:.4f}")
# ML補正モデルの訓練
print("\n=== ML補正モデルの訓練 ===")
hybrid_model.train_correction(X, y_true, y_physics)
# ハイブリッドモデルの性能
y_hybrid = y_physics.copy()
for i in range(len(X)):
inputs = {
'CA_in': X[i, 0],
'T_in': X[i, 1],
'T_jacket': X[i, 2],
'catalyst_age': X[i, 3],
'F': X[i, 4]
}
result = hybrid_model.predict_hybrid(inputs)
y_hybrid[i] = result['conversion_corrected']
hybrid_r2 = r2_score(y_true, y_hybrid)
hybrid_mae = mean_absolute_error(y_true, y_hybrid)
print(f"\nハイブリッドモデル - R²: {hybrid_r2:.4f}, MAE: {hybrid_mae:.4f}")
# 改善率
improvement = (physics_mae - hybrid_mae) / physics_mae * 100
print(f"MAE改善率: {improvement:.1f}%")
# 新規条件での予測比較
print("\n=== 新規条件での予測 ===")
test_inputs = {
'CA_in': 1000,
'T_in': 298,
'T_jacket': 300,
'catalyst_age': 180, # 半年経過の触媒
'F': 0.1
}
result = hybrid_model.predict_hybrid(test_inputs)
print(f"物理モデル予測: {result['conversion_physics']*100:.1f}%")
print(f"ハイブリッド予測: {result['conversion_corrected']*100:.1f}%")
print(f"補正量: {(result['conversion_corrected'] - result['conversion_physics'])*100:.1f}%")
# 期待される出力例:
# === 訓練データ生成 ===
# 物理モデル単独 - R²: 0.8234, MAE: 0.0412
#
# === ML補正モデルの訓練 ===
# 補正モデル訓練完了 - R²: 0.8923, MAE: 0.0098
#
# ハイブリッドモデル - R²: 0.9756, MAE: 0.0123
# MAE改善率: 70.1%
#
# === 新規条件での予測 ===
# 物理モデル予測: 75.5%
# ハイブリッド予測: 70.1%
# 補正量: -5.4%
実測データを使って物理モデルのパラメータを最適化(キャリブレーション)します。
"""
Example 4: モデルキャリブレーション
実測データから物理モデルパラメータを最適化
"""
from scipy.optimize import minimize, differential_evolution
import numpy as np
from typing import List, Tuple
class ModelCalibrator:
"""物理モデルのパラメータキャリブレーション
実測データとの誤差を最小化するパラメータを探索
"""
def __init__(self, reactor_model: FirstPrinciplesReactor):
self.reactor_model = reactor_model
self.original_params = reactor_model.params
def objective_function(self, param_values: np.ndarray,
measured_data: List[Dict]) -> float:
"""最適化の目的関数(実測との誤差)
Args:
param_values: 調整するパラメータ [k0, Ea, UA]
measured_data: 実測データのリスト
[{'CA_in': 1000, 'T_in': 298, 'T_jacket': 300,
'measured_conversion': 0.75, 'measured_T': 348}, ...]
Returns:
平均二乗誤差(MSE)
"""
# パラメータの更新
k0, Ea, UA = param_values
self.reactor_model.params.k0 = k0
self.reactor_model.params.Ea = Ea
self.reactor_model.params.UA = UA
total_error = 0
for data in measured_data:
# モデルで予測
t, state = self.reactor_model.simulate(
initial_state=(0, data['T_in']),
time_span=(0, 60),
CA_in=data['CA_in'],
T_in=data['T_in'],
T_jacket=data['T_jacket']
)
CA_pred = state[-1, 0]
T_pred = state[-1, 1]
# 転化率の誤差
conversion_pred = self.reactor_model.calculate_conversion(
CA_pred, data['CA_in']
)
error_conversion = (conversion_pred - data['measured_conversion']) ** 2
# 温度の誤差(重み付け: 0.1)
error_T = 0.1 * ((T_pred - data['measured_T']) / 100) ** 2
total_error += error_conversion + error_T
mse = total_error / len(measured_data)
return mse
def calibrate(self, measured_data: List[Dict],
method: str = 'differential_evolution') -> Dict:
"""パラメータキャリブレーションの実行
Args:
measured_data: 実測データのリスト
method: 最適化手法 ('L-BFGS-B' or 'differential_evolution')
Returns:
最適化結果 {'k0', 'Ea', 'UA', 'mse'}
"""
# 初期推定値と探索範囲
initial_guess = [
self.original_params.k0,
self.original_params.Ea,
self.original_params.UA
]
bounds = [
(1e8, 1e12), # k0の範囲
(60000, 100000), # Eaの範囲 [J/mol]
(3000, 7000) # UAの範囲 [W/K]
]
if method == 'differential_evolution':
# グローバル最適化(推奨)
result = differential_evolution(
lambda x: self.objective_function(x, measured_data),
bounds=bounds,
seed=42,
maxiter=100,
popsize=15,
tol=1e-6
)
optimal_params = result.x
mse = result.fun
elif method == 'L-BFGS-B':
# 局所最適化
result = minimize(
lambda x: self.objective_function(x, measured_data),
x0=initial_guess,
bounds=bounds,
method='L-BFGS-B'
)
optimal_params = result.x
mse = result.fun
else:
raise ValueError(f"Unknown method: {method}")
return {
'k0': optimal_params[0],
'Ea': optimal_params[1],
'UA': optimal_params[2],
'mse': mse
}
def generate_measured_data(n_points: int = 20) -> List[Dict]:
"""実測データの生成(模擬)
実際のプラントから得られたデータを想定
Returns:
実測データのリスト
"""
np.random.seed(42)
# 真のパラメータ(未知と仮定)
true_params = ReactorParameters(
k0=7.5e9, # 初期推定値1e10より小さい
Ea=85000, # 初期推定値80000より大きい
UA=4500 # 初期推定値5000より小さい
)
true_model = FirstPrinciplesReactor(true_params)
measured_data = []
for _ in range(n_points):
# 運転条件をランダムに変化
CA_in = np.random.uniform(800, 1200)
T_in = 298
T_jacket = np.random.uniform(290, 310)
# 真のモデルでシミュレーション
t, state = true_model.simulate(
initial_state=(0, T_in),
time_span=(0, 60),
CA_in=CA_in,
T_in=T_in,
T_jacket=T_jacket
)
CA = state[-1, 0]
T = state[-1, 1]
conversion = true_model.calculate_conversion(CA, CA_in)
# 測定ノイズを追加
measured_data.append({
'CA_in': CA_in,
'T_in': T_in,
'T_jacket': T_jacket,
'measured_conversion': conversion + np.random.normal(0, 0.01),
'measured_T': T + np.random.normal(0, 1.0)
})
return measured_data
# 使用例
if __name__ == "__main__":
# 初期パラメータ(不正確)
initial_params = ReactorParameters(
k0=1e10, # 真値より大きい
Ea=80000, # 真値より小さい
UA=5000 # 真値より大きい
)
reactor = FirstPrinciplesReactor(initial_params)
# 実測データの生成
print("=== 実測データの生成 ===")
measured_data = generate_measured_data(n_points=20)
print(f"データポイント数: {len(measured_data)}")
# キャリブレーション前の性能評価
print("\n=== キャリブレーション前 ===")
calibrator = ModelCalibrator(reactor)
mse_before = calibrator.objective_function(
[initial_params.k0, initial_params.Ea, initial_params.UA],
measured_data
)
print(f"MSE: {mse_before:.6f}")
# キャリブレーション実行
print("\n=== キャリブレーション実行中... ===")
optimal_result = calibrator.calibrate(
measured_data,
method='differential_evolution'
)
print("\n=== 最適化結果 ===")
print(f"最適 k0: {optimal_result['k0']:.2e} (初期: {initial_params.k0:.2e})")
print(f"最適 Ea: {optimal_result['Ea']:.0f} J/mol (初期: {initial_params.Ea:.0f})")
print(f"最適 UA: {optimal_result['UA']:.0f} W/K (初期: {initial_params.UA:.0f})")
print(f"最小 MSE: {optimal_result['mse']:.6f}")
# 改善率
improvement = (mse_before - optimal_result['mse']) / mse_before * 100
print(f"\nMSE改善率: {improvement:.1f}%")
# 最適パラメータでモデルを更新
reactor.params.k0 = optimal_result['k0']
reactor.params.Ea = optimal_result['Ea']
reactor.params.UA = optimal_result['UA']
# テストデータで検証
print("\n=== テストデータでの検証 ===")
test_data = measured_data[0]
t, state = reactor.simulate(
initial_state=(0, test_data['T_in']),
time_span=(0, 60),
CA_in=test_data['CA_in'],
T_in=test_data['T_in'],
T_jacket=test_data['T_jacket']
)
CA_pred = state[-1, 0]
conversion_pred = reactor.calculate_conversion(CA_pred, test_data['CA_in'])
print(f"予測転化率: {conversion_pred*100:.1f}%")
print(f"実測転化率: {test_data['measured_conversion']*100:.1f}%")
print(f"誤差: {abs(conversion_pred - test_data['measured_conversion'])*100:.1f}%")
# 期待される出力例:
# === 実測データの生成 ===
# データポイント数: 20
#
# === キャリブレーション前 ===
# MSE: 0.012345
#
# === キャリブレーション実行中... ===
#
# === 最適化結果 ===
# 最適 k0: 7.48e+09 (初期: 1.00e+10)
# 最適 Ea: 85023 J/mol (初期: 80000)
# 最適 UA: 4512 W/K (初期: 5000)
# 最小 MSE: 0.000123
#
# MSE改善率: 99.0%
#
# === テストデータでの検証 ===
# 予測転化率: 74.3%
# 実測転化率: 74.5%
# 誤差: 0.2%
物理モデルの予測残差(誤差)を機械学習で学習する、効率的なハイブリッド手法です。
"""
Example 5: 残差学習アプローチ
物理モデルの誤差パターンを機械学習で学習
"""
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel
from typing import Dict, Tuple
class ResidualLearningModel:
"""残差学習ハイブリッドモデル
物理モデル予測と実測値の差(残差)を
ガウス過程回帰で学習し、不確実性も定量化
"""
def __init__(self, physics_model: FirstPrinciplesReactor):
self.physics_model = physics_model
# ガウス過程回帰(不確実性も推定)
kernel = RBF(length_scale=1.0) + WhiteKernel(noise_level=1e-5)
self.gp_model = GaussianProcessRegressor(
kernel=kernel,
n_restarts_optimizer=10,
random_state=42
)
self.is_trained = False
def compute_physics_residuals(self, X: np.ndarray,
y_true: np.ndarray) -> np.ndarray:
"""物理モデルの残差を計算
Args:
X: 特徴量 [n_samples, n_features]
columns: [CA_in, T_in, T_jacket, catalyst_age]
y_true: 実測転化率 [n_samples]
Returns:
残差 [n_samples]
"""
n_samples = X.shape[0]
residuals = np.zeros(n_samples)
for i in range(n_samples):
CA_in, T_in, T_jacket, _ = X[i]
# 物理モデルで予測
t, state = self.physics_model.simulate(
initial_state=(0, T_in),
time_span=(0, 60),
CA_in=CA_in,
T_in=T_in,
T_jacket=T_jacket
)
CA_pred = state[-1, 0]
conversion_pred = self.physics_model.calculate_conversion(
CA_pred, CA_in
)
# 残差 = 実測 - 物理予測
residuals[i] = y_true[i] - conversion_pred
return residuals
def train(self, X: np.ndarray, y_true: np.ndarray):
"""残差学習モデルの訓練
Args:
X: 特徴量 [n_samples, n_features]
y_true: 実測転化率 [n_samples]
"""
print("物理モデルの残差を計算中...")
residuals = self.compute_physics_residuals(X, y_true)
print(f"残差の統計: Mean={np.mean(residuals):.4f}, Std={np.std(residuals):.4f}")
# ガウス過程回帰で残差を学習
print("ガウス過程回帰の訓練中...")
self.gp_model.fit(X, residuals)
self.is_trained = True
print("訓練完了")
def predict(self, X: np.ndarray,
return_std: bool = False) -> Tuple[np.ndarray, np.ndarray]:
"""予測実行(物理 + 残差補正)
Args:
X: 特徴量 [n_samples, n_features]
return_std: 標準偏差(不確実性)も返すか
Returns:
(predictions, std) 予測値と標準偏差
"""
n_samples = X.shape[0]
physics_predictions = np.zeros(n_samples)
# 物理モデルで予測
for i in range(n_samples):
CA_in, T_in, T_jacket, _ = X[i]
t, state = self.physics_model.simulate(
initial_state=(0, T_in),
time_span=(0, 60),
CA_in=CA_in,
T_in=T_in,
T_jacket=T_jacket
)
CA_pred = state[-1, 0]
conversion_pred = self.physics_model.calculate_conversion(
CA_pred, CA_in
)
physics_predictions[i] = conversion_pred
# 残差を予測
if self.is_trained:
if return_std:
residual_pred, residual_std = self.gp_model.predict(
X, return_std=True
)
hybrid_predictions = physics_predictions + residual_pred
return hybrid_predictions, residual_std
else:
residual_pred = self.gp_model.predict(X)
hybrid_predictions = physics_predictions + residual_pred
return hybrid_predictions, None
else:
if return_std:
return physics_predictions, np.zeros(n_samples)
else:
return physics_predictions, None
# 使用例
if __name__ == "__main__":
# 物理モデルの初期化
params = ReactorParameters()
physics_model = FirstPrinciplesReactor(params)
# 残差学習モデルの初期化
residual_model = ResidualLearningModel(physics_model)
# 訓練データ生成
print("=== 訓練データ生成 ===")
np.random.seed(42)
n_train = 50
X_train = np.column_stack([
np.random.uniform(800, 1200, n_train), # CA_in
np.full(n_train, 298), # T_in
np.random.uniform(290, 310, n_train), # T_jacket
np.random.uniform(0, 365, n_train) # catalyst_age
])
# 真の転化率(物理モデル + 未知の影響)
y_train_physics = np.zeros(n_train)
for i in range(n_train):
t, state = physics_model.simulate(
initial_state=(0, X_train[i, 1]),
time_span=(0, 60),
CA_in=X_train[i, 0],
T_in=X_train[i, 1],
T_jacket=X_train[i, 2]
)
CA = state[-1, 0]
y_train_physics[i] = physics_model.calculate_conversion(CA, X_train[i, 0])
# 未知の影響(触媒劣化など)を追加
catalyst_effect = -0.0003 * X_train[:, 3]
y_train_true = y_train_physics + catalyst_effect + np.random.normal(0, 0.01, n_train)
# 訓練
print("\n=== 残差学習モデルの訓練 ===")
residual_model.train(X_train, y_train_true)
# テストデータで評価
print("\n=== テストデータで評価 ===")
n_test = 10
X_test = np.column_stack([
np.random.uniform(800, 1200, n_test),
np.full(n_test, 298),
np.random.uniform(290, 310, n_test),
np.random.uniform(0, 365, n_test)
])
# 真の値(テスト)
y_test_physics = np.zeros(n_test)
for i in range(n_test):
t, state = physics_model.simulate(
initial_state=(0, X_test[i, 1]),
time_span=(0, 60),
CA_in=X_test[i, 0],
T_in=X_test[i, 1],
T_jacket=X_test[i, 2]
)
CA = state[-1, 0]
y_test_physics[i] = physics_model.calculate_conversion(CA, X_test[i, 0])
catalyst_effect_test = -0.0003 * X_test[:, 3]
y_test_true = y_test_physics + catalyst_effect_test
# 予測(不確実性付き)
y_pred, y_std = residual_model.predict(X_test, return_std=True)
# 結果表示
print("\n予測結果:")
print("Index\tTrue\tPhysics\tHybrid\tStd\tError")
print("-" * 60)
for i in range(n_test):
error = abs(y_test_true[i] - y_pred[i])
print(f"{i}\t{y_test_true[i]:.3f}\t{y_test_physics[i]:.3f}\t"
f"{y_pred[i]:.3f}\t{y_std[i]:.3f}\t{error:.3f}")
# 性能評価
mae_physics = np.mean(np.abs(y_test_true - y_test_physics))
mae_hybrid = np.mean(np.abs(y_test_true - y_pred))
print(f"\n=== 性能比較 ===")
print(f"物理モデル MAE: {mae_physics:.4f}")
print(f"ハイブリッド MAE: {mae_hybrid:.4f}")
print(f"改善率: {(mae_physics - mae_hybrid) / mae_physics * 100:.1f}%")
# 期待される出力例:
# === 訓練データ生成 ===
#
# === 残差学習モデルの訓練 ===
# 物理モデルの残差を計算中...
# 残差の統計: Mean=-0.0534, Std=0.0456
# ガウス過程回帰の訓練中...
# 訓練完了
#
# === テストデータで評価 ===
#
# 予測結果:
# Index True Physics Hybrid Std Error
# ------------------------------------------------------------
# 0 0.712 0.756 0.708 0.012 0.004
# 1 0.745 0.789 0.742 0.015 0.003
# 2 0.698 0.745 0.695 0.018 0.003
# 3 0.723 0.768 0.721 0.013 0.002
# 4 0.734 0.779 0.732 0.014 0.002
# 5 0.689 0.738 0.687 0.019 0.002
# 6 0.756 0.798 0.754 0.011 0.002
# 7 0.701 0.748 0.699 0.016 0.002
# 8 0.745 0.787 0.743 0.013 0.002
# 9 0.712 0.759 0.710 0.015 0.002
#
# === 性能比較 ===
# 物理モデル MAE: 0.0456
# ハイブリッド MAE: 0.0025
# 改善率: 94.5%
物理法則を損失関数に組み込んだニューラルネットワーク、PINNの基本実装です。
"""
Example 6: Physics-Informed Neural Networks (PINN) 基礎
物理法則を制約として組み込んだニューラルネットワーク
"""
import torch
import torch.nn as nn
import numpy as np
from typing import Tuple
class PhysicsInformedNN(nn.Module):
"""Physics-Informed Neural Network (PINN)
ニューラルネットワークの損失関数に
物理法則(微分方程式)の残差を組み込む
"""
def __init__(self, input_dim: int = 4, hidden_dim: int = 64):
"""
Args:
input_dim: 入力次元(CA_in, T_in, T_jacket, t)
hidden_dim: 隠れ層の次元
"""
super(PhysicsInformedNN, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 2) # [CA, T]を出力
)
# 物理パラメータ(学習可能)
self.k0 = nn.Parameter(torch.tensor(1e10, dtype=torch.float32))
self.Ea = nn.Parameter(torch.tensor(80000.0, dtype=torch.float32))
self.UA = nn.Parameter(torch.tensor(5000.0, dtype=torch.float32))
# 固定パラメータ
self.V = 1.0
self.F = 0.1
self.rho = 1000.0
self.Cp = 4200.0
self.delta_H = -100000.0
self.R = 8.314
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""順伝播
Args:
x: [batch, 4] (CA_in, T_in, T_jacket, t)
Returns:
[batch, 2] (CA, T)
"""
return self.network(x)
def reaction_rate(self, CA: torch.Tensor, T: torch.Tensor) -> torch.Tensor:
"""反応速度(アレニウス式)
Args:
CA: 濃度 [batch]
T: 温度 [batch]
Returns:
反応速度 [batch]
"""
k = self.k0 * torch.exp(-self.Ea / (self.R * T))
r = k * CA
return r
def physics_loss(self, x: torch.Tensor,
CA: torch.Tensor, T: torch.Tensor) -> torch.Tensor:
"""物理法則の残差(微分方程式の誤差)
dCA/dt = F/V * (CA_in - CA) - r の残差
dT/dt = ... の残差
Args:
x: 入力 [batch, 4]
CA: 濃度予測 [batch]
T: 温度予測 [batch]
Returns:
物理残差(スカラー)
"""
CA_in = x[:, 0]
T_in = x[:, 1]
T_jacket = x[:, 2]
# 自動微分でdCA/dt, dT/dtを計算
CA.requires_grad_(True)
T.requires_grad_(True)
# 反応速度
r = self.reaction_rate(CA, T)
# 質量収支の右辺
dCA_dt_theory = (self.F / self.V) * (CA_in - CA) - r
# エネルギー収支の右辺
term1 = (self.F / self.V) * (T_in - T)
term2 = (-self.delta_H) * r / (self.rho * self.Cp)
term3 = -self.UA / (self.rho * self.Cp * self.V) * (T - T_jacket)
dT_dt_theory = term1 + term2 + term3
# 実際の時間微分(NNの出力をtで微分)
# 簡略化のため、ここでは理論値との直接比較
# 実際は torch.autograd.grad を使用
physics_residual = torch.mean(dCA_dt_theory ** 2 + dT_dt_theory ** 2)
return physics_residual
def data_loss(self, CA_pred: torch.Tensor, T_pred: torch.Tensor,
CA_true: torch.Tensor, T_true: torch.Tensor) -> torch.Tensor:
"""データ誤差(実測値との差)
Args:
CA_pred, T_pred: 予測値
CA_true, T_true: 実測値
Returns:
データ損失(スカラー)
"""
loss_CA = torch.mean((CA_pred - CA_true) ** 2)
loss_T = torch.mean((T_pred - T_true) ** 2) / 10000 # 温度スケール調整
return loss_CA + loss_T
def train_pinn(model: PhysicsInformedNN,
X_data: np.ndarray, CA_data: np.ndarray, T_data: np.ndarray,
epochs: int = 1000,
lambda_physics: float = 0.1) -> list:
"""PINNの訓練
Args:
model: PINNモデル
X_data: 入力データ [n_samples, 4]
CA_data: 濃度実測値 [n_samples]
T_data: 温度実測値 [n_samples]
epochs: エポック数
lambda_physics: 物理損失の重み
Returns:
損失の履歴
"""
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# データをTensorに変換
X_tensor = torch.tensor(X_data, dtype=torch.float32)
CA_tensor = torch.tensor(CA_data, dtype=torch.float32)
T_tensor = torch.tensor(T_data, dtype=torch.float32)
loss_history = []
for epoch in range(epochs):
optimizer.zero_grad()
# 順伝播
output = model(X_tensor)
CA_pred = output[:, 0]
T_pred = output[:, 1]
# データ損失
loss_data = model.data_loss(CA_pred, T_pred, CA_tensor, T_tensor)
# 物理損失
loss_physics = model.physics_loss(X_tensor, CA_pred, T_pred)
# 総損失
total_loss = loss_data + lambda_physics * loss_physics
# 逆伝播
total_loss.backward()
optimizer.step()
loss_history.append(total_loss.item())
if (epoch + 1) % 100 == 0:
print(f"Epoch {epoch+1}/{epochs} - "
f"Total Loss: {total_loss.item():.4f}, "
f"Data Loss: {loss_data.item():.4f}, "
f"Physics Loss: {loss_physics.item():.4f}")
return loss_history
# 使用例(簡略版)
if __name__ == "__main__":
print("=== Physics-Informed Neural Network (PINN) ===")
# 訓練データ生成(簡略版)
np.random.seed(42)
n_samples = 100
X_data = np.column_stack([
np.random.uniform(800, 1200, n_samples), # CA_in
np.full(n_samples, 298), # T_in
np.random.uniform(290, 310, n_samples), # T_jacket
np.random.uniform(0, 60, n_samples) # t
])
# 模擬的な実測値
CA_data = np.random.uniform(200, 800, n_samples)
T_data = np.random.uniform(320, 360, n_samples)
# PINNモデルの初期化
pinn_model = PhysicsInformedNN(input_dim=4, hidden_dim=64)
print(f"\n初期パラメータ:")
print(f"k0: {pinn_model.k0.item():.2e}")
print(f"Ea: {pinn_model.Ea.item():.0f} J/mol")
print(f"UA: {pinn_model.UA.item():.0f} W/K")
# 訓練(簡略版 - 実際はより多くのエポック)
print(f"\n訓練開始...")
loss_history = train_pinn(
pinn_model,
X_data, CA_data, T_data,
epochs=500,
lambda_physics=0.1
)
print(f"\n最終パラメータ:")
print(f"k0: {pinn_model.k0.item():.2e}")
print(f"Ea: {pinn_model.Ea.item():.0f} J/mol")
print(f"UA: {pinn_model.UA.item():.0f} W/K")
print("\nPINN訓練完了(実際のアプリケーションではより詳細な実装が必要)")
# 期待される出力例:
# === Physics-Informed Neural Network (PINN) ===
#
# 初期パラメータ:
# k0: 1.00e+10
# Ea: 80000 J/mol
# UA: 5000 W/K
#
# 訓練開始...
# Epoch 100/500 - Total Loss: 125432.5678, Data Loss: 123456.7890, Physics Loss: 1975.7788
# Epoch 200/500 - Total Loss: 45678.1234, Data Loss: 44567.2345, Physics Loss: 1110.8889
# Epoch 300/500 - Total Loss: 12345.6789, Data Loss: 11234.5678, Physics Loss: 1111.1111
# Epoch 400/500 - Total Loss: 3456.7890, Data Loss: 2345.6789, Physics Loss: 1111.1101
# Epoch 500/500 - Total Loss: 1234.5678, Data Loss: 123.4567, Physics Loss: 1111.1111
#
# 最終パラメータ:
# k0: 8.23e+09
# Ea: 83215 J/mol
# UA: 4687 W/K
#
# PINN訓練完了(実際のアプリケーションではより詳細な実装が必要)
運転状況に応じて最適なモデル(物理/データ駆動/ハイブリッド)を自動選択します。
"""
Example 7: モデル選択と切り替えロジック
運転状況に応じて最適なモデルを動的に選択
"""
from enum import Enum
from typing import Dict, Optional
import numpy as np
class ModelType(Enum):
"""モデルタイプ"""
PHYSICS_ONLY = "physics"
DATA_DRIVEN = "data_driven"
HYBRID = "hybrid"
class AdaptiveModelSelector:
"""適応的モデル選択器
運転条件、データ品質、予測精度に基づいて
最適なモデルを動的に選択
"""
def __init__(self,
physics_model: FirstPrinciplesReactor,
data_model: DataDrivenReactorModel,
hybrid_model: HybridReactorModel):
"""
Args:
physics_model: 物理モデル
data_model: データ駆動モデル
hybrid_model: ハイブリッドモデル
"""
self.physics_model = physics_model
self.data_model = data_model
self.hybrid_model = hybrid_model
# 各モデルの適用範囲(訓練データ範囲)
self.data_model_range = {
'T': (60, 95),
'P': (2.0, 3.0),
'F': (100, 150),
'catalyst_age': (0, 365)
}
# モデル性能履歴
self.performance_history = {
ModelType.PHYSICS_ONLY: [],
ModelType.DATA_DRIVEN: [],
ModelType.HYBRID: []
}
def is_within_training_range(self, inputs: Dict) -> bool:
"""入力がデータ駆動モデルの訓練範囲内か確認
Args:
inputs: 入力辞書
Returns:
範囲内ならTrue
"""
if 'T' in inputs:
T = inputs['T']
if not (self.data_model_range['T'][0] <= T <= self.data_model_range['T'][1]):
return False
if 'P' in inputs:
P = inputs['P']
if not (self.data_model_range['P'][0] <= P <= self.data_model_range['P'][1]):
return False
# 他の変数も同様にチェック
return True
def assess_data_quality(self, sensor_data: Dict) -> float:
"""センサーデータの品質を評価
Args:
sensor_data: センサーデータ辞書
Returns:
品質スコア (0-1, 1が最高)
"""
quality_score = 1.0
# データ品質フラグのチェック
if sensor_data.get('quality') == 'Bad':
quality_score *= 0.3
elif sensor_data.get('quality') == 'Uncertain':
quality_score *= 0.7
# データ完全性のチェック
required_fields = ['temperature', 'pressure', 'flow_rate']
missing_count = sum(1 for field in required_fields if field not in sensor_data)
quality_score *= (1.0 - 0.2 * missing_count)
# ノイズレベルのチェック(標準偏差が大きい場合)
if 'noise_level' in sensor_data:
quality_score *= max(0.5, 1.0 - sensor_data['noise_level'])
return quality_score
def select_model(self, inputs: Dict,
sensor_data: Dict,
data_quality: Optional[float] = None) -> ModelType:
"""運転状況に基づいて最適なモデルを選択
Args:
inputs: 運転条件
sensor_data: センサーデータ
data_quality: データ品質スコア(Noneなら自動計算)
Returns:
選択されたモデルタイプ
"""
if data_quality is None:
data_quality = self.assess_data_quality(sensor_data)
in_training_range = self.is_within_training_range(inputs)
# 決定ロジック
if data_quality < 0.5:
# データ品質が低い → 物理モデルを使用
return ModelType.PHYSICS_ONLY
elif in_training_range and data_quality >= 0.8:
# 訓練範囲内かつ高品質データ → ハイブリッドモデル(最高精度)
return ModelType.HYBRID
elif in_training_range:
# 訓練範囲内だが品質は中程度 → データ駆動モデル
return ModelType.DATA_DRIVEN
else:
# 訓練範囲外 → 物理モデル(外挿性が高い)
return ModelType.PHYSICS_ONLY
def predict(self, inputs: Dict, sensor_data: Dict) -> Dict:
"""予測実行(最適なモデルを自動選択)
Args:
inputs: 運転条件
sensor_data: センサーデータ
Returns:
予測結果辞書 {'conversion', 'model_used', 'confidence'}
"""
# モデル選択
selected_model = self.select_model(inputs, sensor_data)
# データ品質評価
data_quality = self.assess_data_quality(sensor_data)
# 選択されたモデルで予測
if selected_model == ModelType.PHYSICS_ONLY:
result = self.physics_model.predict_physics(inputs)
confidence = 0.7 # 物理モデルの信頼度
elif selected_model == ModelType.DATA_DRIVEN:
X = np.array([[
inputs.get('T', 80),
inputs.get('P', 2.5),
inputs.get('F', 120),
inputs.get('catalyst_age', 0)
]])
conversion_pred = self.data_model.predict(X)[0]
result = {'conversion': conversion_pred}
confidence = 0.9 * data_quality # データ品質に依存
elif selected_model == ModelType.HYBRID:
result = self.hybrid_model.predict_hybrid(inputs)
confidence = 0.95 * data_quality # 最高精度
else:
raise ValueError(f"Unknown model type: {selected_model}")
# 結果に情報を追加
result['model_used'] = selected_model.value
result['confidence'] = confidence
result['data_quality'] = data_quality
return result
def update_performance(self, model_type: ModelType,
prediction: float, actual: float):
"""モデル性能の履歴を更新(オンライン学習に利用)
Args:
model_type: 使用したモデルタイプ
prediction: 予測値
actual: 実測値
"""
error = abs(prediction - actual)
self.performance_history[model_type].append(error)
# 最新100件のみ保持
if len(self.performance_history[model_type]) > 100:
self.performance_history[model_type] = \
self.performance_history[model_type][-100:]
def get_model_performance_summary(self) -> Dict:
"""各モデルの性能サマリーを取得
Returns:
{'physics': {'mae': 0.05, 'count': 120}, ...}
"""
summary = {}
for model_type, errors in self.performance_history.items():
if len(errors) > 0:
summary[model_type.value] = {
'mae': np.mean(errors),
'std': np.std(errors),
'count': len(errors)
}
return summary
# 使用例
if __name__ == "__main__":
# 各モデルの初期化(前の例から)
params = ReactorParameters()
physics_model = FirstPrinciplesReactor(params)
data_model = DataDrivenReactorModel(model_type='random_forest')
# (訓練は省略)
hybrid_model = HybridReactorModel(physical_params=params)
# (訓練は省略)
# 適応的モデル選択器の初期化
model_selector = AdaptiveModelSelector(
physics_model=physics_model,
data_model=data_model,
hybrid_model=hybrid_model
)
# シナリオ1: 通常運転(高品質データ、訓練範囲内)
print("=== シナリオ1: 通常運転 ===")
inputs1 = {
'CA_in': 1000,
'T_in': 298,
'T_jacket': 300,
'T': 80,
'P': 2.5,
'F': 120,
'catalyst_age': 100
}
sensor_data1 = {
'temperature': 80.0,
'pressure': 2.5,
'flow_rate': 120.0,
'quality': 'Good'
}
result1 = model_selector.predict(inputs1, sensor_data1)
print(f"使用モデル: {result1['model_used']}")
print(f"信頼度: {result1['confidence']:.2f}")
print(f"データ品質: {result1['data_quality']:.2f}")
# シナリオ2: 異常運転(低品質データ)
print("\n=== シナリオ2: 異常運転(低品質データ) ===")
sensor_data2 = {
'temperature': 80.0,
'quality': 'Bad', # 不良データ
'noise_level': 0.8
}
result2 = model_selector.predict(inputs1, sensor_data2)
print(f"使用モデル: {result2['model_used']}")
print(f"信頼度: {result2['confidence']:.2f}")
print(f"データ品質: {result2['data_quality']:.2f}")
# シナリオ3: 訓練範囲外(外挿)
print("\n=== シナリオ3: 訓練範囲外 ===")
inputs3 = {
'CA_in': 1000,
'T_in': 298,
'T_jacket': 300,
'T': 105, # 訓練範囲外(max 95℃)
'P': 2.5,
'F': 120,
'catalyst_age': 100
}
sensor_data3 = {
'temperature': 105.0,
'pressure': 2.5,
'flow_rate': 120.0,
'quality': 'Good'
}
result3 = model_selector.predict(inputs3, sensor_data3)
print(f"使用モデル: {result3['model_used']}")
print(f"信頼度: {result3['confidence']:.2f}")
print(f"理由: 訓練範囲外のため物理モデルを使用")
# 性能履歴の更新(模擬)
print("\n=== モデル性能の更新 ===")
model_selector.update_performance(ModelType.HYBRID, 0.75, 0.74)
model_selector.update_performance(ModelType.HYBRID, 0.76, 0.75)
model_selector.update_performance(ModelType.PHYSICS_ONLY, 0.70, 0.75)
summary = model_selector.get_model_performance_summary()
for model, stats in summary.items():
print(f"{model}: MAE={stats['mae']:.4f}, 使用回数={stats['count']}")
# 期待される出力例:
# === シナリオ1: 通常運転 ===
# 使用モデル: hybrid
# 信頼度: 0.95
# データ品質: 1.00
#
# === シナリオ2: 異常運転(低品質データ) ===
# 使用モデル: physics
# 信頼度: 0.70
# データ品質: 0.06
#
# === シナリオ3: 訓練範囲外 ===
# 使用モデル: physics
# 信頼度: 0.70
# 理由: 訓練範囲外のため物理モデルを使用
#
# === モデル性能の更新 ===
# hybrid: MAE=0.0100, 使用回数=2
# physics: MAE=0.0500, 使用回数=1
この章を完了すると、以下を実装できるようになります:
Q1: ハイブリッドモデルの利点として、最も適切でないものはどれですか?
a) 物理的解釈性と高精度を両立できる
b) 訓練データが少なくても高性能
c) 実装が最も簡単
d) 外挿性とデータフィット性を両立
正解: c) 実装が最も簡単
解説:
ハイブリッドモデルは物理モデルとデータ駆動モデルの統合が必要なため、実装は最も複雑です。しかし、その複雑さと引き換えに以下の利点が得られます:
Q2: 物理モデルのキャリブレーションで、パラメータ最適化の目的関数(MSE)が初期値0.012345から0.000123に改善しました。このとき、RMSEはどれくらい改善しましたか?改善率も計算してください。
計算:
正解: RMSE改善率 = 90.0%
重要なポイント:
Q3: あなたは新しい化学プラントのDigital Twin開発を任されました。以下の条件で、第一原理モデル、データ駆動モデル、ハイブリッドモデルのどれを採用すべきか、理由とともに提案してください。
条件:
推奨: 段階的アプローチ(第一原理 → ハイブリッド)
フェーズ1(最初の1ヶ月): 第一原理モデル開発
フェーズ2(2〜3ヶ月目): ハイブリッド化
データ駆動単独を選ばない理由:
実装スケジュール(3ヶ月):
| 月 | タスク | マイルストーン |
|---|---|---|
| 1 | 第一原理モデル開発、パイロットデータでキャリブレーション | 精度±5%達成 |
| 2 | 本格運転開始、データ収集、ML補正モデル開発 | 500時間データ蓄積 |
| 3 | ハイブリッドモデル統合、検証、デプロイ | 精度±2%達成 |
リスク緩和策:
ハイブリッドモデリングの基礎を習得したら、次はDigital Twinの実践的な応用に進みます。プロセス最適化、予測保全、What-if分析など、実ビジネスでの活用事例を学びます。