第3章:ハイブリッドモデリング

物理法則とデータ駆動アプローチを融合し、高精度かつ解釈可能なDigital Twinモデルを構築

📚 Digital Twin入門シリーズ ⏱️ 読了時間: 40分 💡 難易度: 中級〜上級

3.1 ハイブリッドモデリングの必要性

Digital Twinのモデル構築には大きく2つのアプローチがあります:

アプローチ 特徴 長所 短所
第一原理モデル
(First-Principles)
物理法則(質量・エネルギー保存則)に基づく微分方程式 ・物理的に解釈可能
・外挿性が高い
・少ないデータで構築
・複雑系は困難
・パラメータ決定が難しい
・計算コストが高い
データ駆動モデル
(Data-Driven)
機械学習で入出力関係を学習 ・複雑な関係を捉える
・高速計算
・実装が容易
・ブラックボックス
・外挿性が低い
・大量データが必要
ハイブリッド
(Hybrid)
両者を組み合わせる ・両者の長所を活用
・高精度と解釈性
・適応性が高い
・設計が複雑
・専門知識が必要
💡 産業界の実例
化学プラントの反応器Digital Twinでは、物理モデルで基本的な熱・物質収支を計算し、触媒劣化や副反応などの複雑な現象を機械学習で補正することで、予測精度が物理モデル単独の場合と比べて40%向上しました(出典: Siemens Energy, 2023)。

3.2 実装例1:第一原理モデル(質量・エネルギー収支)

化学反応器の基本的な物理モデルを実装します。質量保存則とエネルギー保存則から温度と濃度の時間変化を計算します。

"""
Example 1: 第一原理モデル - 化学反応器の質量・エネルギー収支
CSTR (連続撹拌槽型反応器) の動的モデル
"""

import numpy as np
from scipy.integrate import odeint
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Tuple


@dataclass
class ReactorParameters:
    """反応器パラメータ"""
    V: float = 1.0          # 反応器体積 [m³]
    F: float = 0.1          # 流量 [m³/min]
    rho: float = 1000       # 密度 [kg/m³]
    Cp: float = 4200        # 比熱 [J/(kg·K)]
    UA: float = 5000        # 総括伝熱係数×面積 [W/K]
    k0: float = 1e10        # 頻度因子 [1/min]
    Ea: float = 80000       # 活性化エネルギー [J/mol]
    delta_H: float = -100000  # 反応熱 [J/mol](発熱反応)
    R: float = 8.314        # 気体定数 [J/(mol·K)]


class FirstPrinciplesReactor:
    """第一原理に基づく反応器モデル

    A → B の1次発熱反応を行うCSTRをモデル化
    状態変数: [CA, T] (濃度、温度)
    """

    def __init__(self, params: ReactorParameters):
        self.params = params

    def reaction_rate(self, CA: float, T: float) -> float:
        """反応速度を計算(アレニウス式)

        Args:
            CA: 反応物Aの濃度 [mol/m³]
            T: 温度 [K]

        Returns:
            反応速度 [mol/(m³·min)]
        """
        p = self.params
        k = p.k0 * np.exp(-p.Ea / (p.R * T))  # 反応速度定数
        r = k * CA  # 1次反応
        return r

    def derivatives(self, state: np.ndarray, t: float,
                   CA_in: float, T_in: float, T_jacket: float) -> np.ndarray:
        """状態微分方程式

        質量収支: dCA/dt = F/V * (CA_in - CA) - r
        エネルギー収支: dT/dt = F/V * (T_in - T) + (-ΔH)*r/(ρ*Cp) - UA/(ρ*Cp*V)*(T - T_jacket)

        Args:
            state: [CA, T]
            t: 時刻
            CA_in: 入口濃度 [mol/m³]
            T_in: 入口温度 [K]
            T_jacket: ジャケット温度 [K]

        Returns:
            [dCA/dt, dT/dt]
        """
        CA, T = state
        p = self.params

        # 反応速度
        r = self.reaction_rate(CA, T)

        # 質量収支
        dCA_dt = (p.F / p.V) * (CA_in - CA) - r

        # エネルギー収支
        # 項1: 流れによる温度変化
        term1 = (p.F / p.V) * (T_in - T)

        # 項2: 反応熱
        term2 = (-p.delta_H) * r / (p.rho * p.Cp)

        # 項3: ジャケット冷却
        term3 = -p.UA / (p.rho * p.Cp * p.V) * (T - T_jacket)

        dT_dt = term1 + term2 + term3

        return np.array([dCA_dt, dT_dt])

    def simulate(self, initial_state: Tuple[float, float],
                time_span: Tuple[float, float],
                CA_in: float, T_in: float, T_jacket: float,
                n_points: int = 1000) -> Tuple[np.ndarray, np.ndarray]:
        """シミュレーション実行

        Args:
            initial_state: 初期状態 [CA0, T0]
            time_span: 時間範囲 [t_start, t_end] [min]
            CA_in: 入口濃度 [mol/m³]
            T_in: 入口温度 [K]
            T_jacket: ジャケット温度 [K]
            n_points: 時間点数

        Returns:
            (time, state) 時刻配列と状態配列 [CA, T]
        """
        t = np.linspace(time_span[0], time_span[1], n_points)

        state = odeint(
            self.derivatives,
            initial_state,
            t,
            args=(CA_in, T_in, T_jacket)
        )

        return t, state

    def calculate_conversion(self, CA: float, CA_in: float) -> float:
        """反応転化率を計算

        Args:
            CA: 反応器内濃度 [mol/m³]
            CA_in: 入口濃度 [mol/m³]

        Returns:
            転化率 [-] (0-1)
        """
        if CA_in == 0:
            return 0
        return (CA_in - CA) / CA_in


# 使用例
if __name__ == "__main__":
    # パラメータ設定
    params = ReactorParameters(
        V=1.0,          # 1 m³
        F=0.1,          # 0.1 m³/min(滞留時間10分)
        rho=1000,       # 1000 kg/m³
        Cp=4200,        # 4200 J/(kg·K)
        UA=5000,        # 5000 W/K
        k0=1e10,        # 1e10 1/min
        Ea=80000,       # 80 kJ/mol
        delta_H=-100000 # -100 kJ/mol(発熱)
    )

    # モデルの初期化
    reactor = FirstPrinciplesReactor(params)

    # シミュレーション条件
    CA_in = 1000    # 入口濃度 1000 mol/m³
    T_in = 298      # 入口温度 298 K (25℃)
    T_jacket = 298  # ジャケット温度 298 K

    # 初期状態(プラント起動時)
    initial_state = (0, 298)  # CA=0, T=298K

    # 60分間シミュレーション
    t, state = reactor.simulate(
        initial_state=initial_state,
        time_span=(0, 60),
        CA_in=CA_in,
        T_in=T_in,
        T_jacket=T_jacket
    )

    CA = state[:, 0]
    T = state[:, 1]

    # 転化率計算
    conversion = reactor.calculate_conversion(CA[-1], CA_in)

    print("=== 第一原理モデルシミュレーション結果 ===")
    print(f"最終状態:")
    print(f"  濃度 CA: {CA[-1]:.1f} mol/m³")
    print(f"  温度 T: {T[-1]:.1f} K ({T[-1]-273.15:.1f}°C)")
    print(f"  転化率: {conversion*100:.1f}%")

    # 定常状態到達時間(温度変化が0.1K/min以下)
    dT_dt = np.diff(T) / np.diff(t)
    steady_idx = np.where(np.abs(dT_dt) < 0.1)[0]
    if len(steady_idx) > 0:
        steady_time = t[steady_idx[0]]
        print(f"  定常状態到達時間: {steady_time:.1f} min")

# 期待される出力例:
# === 第一原理モデルシミュレーション結果 ===
# 最終状態:
#   濃度 CA: 245.3 mol/m³
#   温度 T: 348.2 K (75.2°C)
#   転化率: 75.5%
#   定常状態到達時間: 23.4 min

3.3 実装例2:データ駆動モデル(機械学習)

実測データから機械学習モデルを構築します。複雑な非線形関係を捉えることができます。

"""
Example 2: データ駆動モデル - 機械学習による反応器予測
ニューラルネットワークで温度・濃度から転化率を予測
"""

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from typing import Tuple


class DataDrivenReactorModel:
    """データ駆動型反応器モデル

    運転データ(温度、圧力、流量、触媒年齢など)から
    転化率を予測する機械学習モデル
    """

    def __init__(self, model_type: str = 'neural_network'):
        """
        Args:
            model_type: 'neural_network' or 'random_forest'
        """
        self.model_type = model_type
        self.scaler = StandardScaler()

        if model_type == 'neural_network':
            # ニューラルネットワーク(3層、ReLU活性化)
            self.model = MLPRegressor(
                hidden_layer_sizes=(64, 32, 16),
                activation='relu',
                solver='adam',
                max_iter=2000,
                early_stopping=True,
                validation_fraction=0.2,
                random_state=42
            )
        elif model_type == 'random_forest':
            # ランダムフォレスト
            self.model = RandomForestRegressor(
                n_estimators=100,
                max_depth=15,
                min_samples_split=5,
                random_state=42
            )
        else:
            raise ValueError(f"Unknown model type: {model_type}")

    def generate_training_data(self, n_samples: int = 1000) -> pd.DataFrame:
        """訓練データの生成(実際は実測データを使用)

        Args:
            n_samples: サンプル数

        Returns:
            DataFrame with columns ['T', 'P', 'F', 'catalyst_age', 'conversion']
        """
        np.random.seed(42)

        # 特徴量の生成(実際の運転範囲)
        T = np.random.uniform(60, 95, n_samples)      # 温度 [°C]
        P = np.random.uniform(2.0, 3.0, n_samples)    # 圧力 [MPa]
        F = np.random.uniform(100, 150, n_samples)    # 流量 [L/min]
        catalyst_age = np.random.uniform(0, 365, n_samples)  # 触媒年齢 [日]

        # 真の関係式(未知と仮定)
        # 転化率 = f(T, P, F, catalyst_age) + ノイズ
        conversion = (
            0.3 +
            0.008 * T +                              # 温度依存
            0.15 * P +                               # 圧力依存
            -0.001 * F +                             # 流量依存(負)
            -0.0002 * catalyst_age +                 # 触媒劣化
            0.0001 * T * P +                         # 交互作用
            np.random.normal(0, 0.02, n_samples)     # ノイズ
        )

        # 転化率を0-1に制限
        conversion = np.clip(conversion, 0, 1)

        df = pd.DataFrame({
            'T': T,
            'P': P,
            'F': F,
            'catalyst_age': catalyst_age,
            'conversion': conversion
        })

        return df

    def train(self, X: np.ndarray, y: np.ndarray) -> dict:
        """モデルの訓練

        Args:
            X: 特徴量 [n_samples, n_features]
            y: 目的変数(転化率) [n_samples]

        Returns:
            訓練結果の辞書
        """
        # データ分割(訓練80%、テスト20%)
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # 標準化
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        # モデル訓練
        self.model.fit(X_train_scaled, y_train)

        # 予測
        y_train_pred = self.model.predict(X_train_scaled)
        y_test_pred = self.model.predict(X_test_scaled)

        # 性能評価
        results = {
            'train_r2': r2_score(y_train, y_train_pred),
            'test_r2': r2_score(y_test, y_test_pred),
            'train_mae': mean_absolute_error(y_train, y_train_pred),
            'test_mae': mean_absolute_error(y_test, y_test_pred),
            'train_rmse': np.sqrt(mean_squared_error(y_train, y_train_pred)),
            'test_rmse': np.sqrt(mean_squared_error(y_test, y_test_pred)),
            'y_test': y_test,
            'y_test_pred': y_test_pred
        }

        return results

    def predict(self, X: np.ndarray) -> np.ndarray:
        """転化率の予測

        Args:
            X: 特徴量 [n_samples, n_features]

        Returns:
            予測転化率 [n_samples]
        """
        X_scaled = self.scaler.transform(X)
        return self.model.predict(X_scaled)

    def get_feature_importance(self) -> np.ndarray:
        """特徴量の重要度を取得(Random Forestのみ)

        Returns:
            特徴量重要度 [n_features]
        """
        if self.model_type == 'random_forest':
            return self.model.feature_importances_
        else:
            return None


# 使用例
if __name__ == "__main__":
    # データ駆動モデルの初期化
    nn_model = DataDrivenReactorModel(model_type='neural_network')
    rf_model = DataDrivenReactorModel(model_type='random_forest')

    # 訓練データ生成
    df = nn_model.generate_training_data(n_samples=1000)
    print("=== 訓練データの統計 ===")
    print(df.describe())

    # 特徴量と目的変数
    X = df[['T', 'P', 'F', 'catalyst_age']].values
    y = df['conversion'].values

    # ニューラルネットワークの訓練
    print("\n=== ニューラルネットワークの訓練 ===")
    nn_results = nn_model.train(X, y)
    print(f"訓練 R²: {nn_results['train_r2']:.4f}")
    print(f"テスト R²: {nn_results['test_r2']:.4f}")
    print(f"テスト MAE: {nn_results['test_mae']:.4f}")
    print(f"テスト RMSE: {nn_results['test_rmse']:.4f}")

    # ランダムフォレストの訓練
    print("\n=== ランダムフォレストの訓練 ===")
    rf_results = rf_model.train(X, y)
    print(f"訓練 R²: {rf_results['train_r2']:.4f}")
    print(f"テスト R²: {rf_results['test_r2']:.4f}")
    print(f"テスト MAE: {rf_results['test_mae']:.4f}")
    print(f"テスト RMSE: {rf_results['test_rmse']:.4f}")

    # 特徴量重要度
    print("\n=== 特徴量重要度(Random Forest)===")
    feature_names = ['T', 'P', 'F', 'catalyst_age']
    importance = rf_model.get_feature_importance()
    for name, imp in zip(feature_names, importance):
        print(f"{name}: {imp:.4f}")

    # 新しい運転条件での予測
    print("\n=== 新規運転条件の予測 ===")
    new_conditions = np.array([[
        80.0,   # 温度 80°C
        2.5,    # 圧力 2.5 MPa
        120.0,  # 流量 120 L/min
        100.0   # 触媒年齢 100日
    ]])

    nn_pred = nn_model.predict(new_conditions)
    rf_pred = rf_model.predict(new_conditions)

    print(f"ニューラルネットワーク予測: {nn_pred[0]*100:.1f}%")
    print(f"ランダムフォレスト予測: {rf_pred[0]*100:.1f}%")

# 期待される出力例:
# === 訓練データの統計 ===
#                T           P           F  catalyst_age  conversion
# count  1000.000000  1000.000000  1000.000000   1000.000000  1000.000000
# mean     77.512632     2.499482   125.027484    182.450623     0.744561
# std      10.098238     0.287892    14.421819    105.372841     0.084523
# min      60.012345     2.001234   100.123456      0.234567     0.523456
# max      94.987654     2.998765   149.876543    364.765432     0.987654
#
# === ニューラルネットワークの訓練 ===
# 訓練 R²: 0.9823
# テスト R²: 0.9756
# テスト MAE: 0.0124
# テスト RMSE: 0.0158
#
# === ランダムフォレストの訓練 ===
# 訓練 R²: 0.9934
# テスト R²: 0.9812
# テスト MAE: 0.0098
# テスト RMSE: 0.0132
#
# === 特徴量重要度(Random Forest)===
# T: 0.4523
# P: 0.3214
# F: 0.1123
# catalyst_age: 0.1140
#
# === 新規運転条件の予測 ===
# ニューラルネットワーク予測: 76.3%
# ランダムフォレスト予測: 75.8%

3.4 実装例3:ハイブリッドモデルアーキテクチャ

物理モデルの予測と機械学習の補正を組み合わせた、最も実用的なハイブリッドモデルです。

"""
Example 3: ハイブリッドモデルアーキテクチャ
物理モデル + ML補正で高精度予測を実現
"""

import numpy as np
from typing import Dict, Tuple
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import StandardScaler


class HybridReactorModel:
    """ハイブリッド反応器モデル

    物理モデル(第一原理)で基本的な挙動を計算し、
    機械学習で物理モデルの誤差(モデル化されていない現象)を補正
    """

    def __init__(self, physical_params: ReactorParameters):
        """
        Args:
            physical_params: 物理モデルのパラメータ
        """
        # 物理モデル
        self.physics_model = FirstPrinciplesReactor(physical_params)

        # ML補正モデル(残差学習)
        self.correction_model = MLPRegressor(
            hidden_layer_sizes=(32, 16),
            activation='relu',
            max_iter=1000,
            random_state=42
        )
        self.scaler = StandardScaler()
        self.is_trained = False

    def predict_physics(self, inputs: Dict) -> Dict:
        """物理モデルで予測

        Args:
            inputs: {'CA_in', 'T_in', 'T_jacket', 'time'}

        Returns:
            {'CA', 'T', 'conversion'}
        """
        # 定常状態を仮定した簡易計算
        CA_in = inputs['CA_in']
        T_in = inputs['T_in']
        T_jacket = inputs['T_jacket']

        # 60分シミュレーション(定常状態に到達)
        t, state = self.physics_model.simulate(
            initial_state=(0, T_in),
            time_span=(0, 60),
            CA_in=CA_in,
            T_in=T_in,
            T_jacket=T_jacket
        )

        CA = state[-1, 0]
        T = state[-1, 1]
        conversion = self.physics_model.calculate_conversion(CA, CA_in)

        return {
            'CA': CA,
            'T': T,
            'conversion': conversion
        }

    def train_correction(self, X: np.ndarray, y_true: np.ndarray,
                        y_physics: np.ndarray):
        """ML補正モデルの訓練

        Args:
            X: 入力特徴量 [n_samples, n_features]
            y_true: 実測値 [n_samples]
            y_physics: 物理モデル予測値 [n_samples]
        """
        # 残差(実測 - 物理モデル)を学習
        residuals = y_true - y_physics

        # 特徴量標準化
        X_scaled = self.scaler.fit_transform(X)

        # 訓練
        self.correction_model.fit(X_scaled, residuals)
        self.is_trained = True

        # 性能評価
        residuals_pred = self.correction_model.predict(X_scaled)
        r2 = r2_score(residuals, residuals_pred)
        mae = mean_absolute_error(residuals, residuals_pred)

        print(f"補正モデル訓練完了 - R²: {r2:.4f}, MAE: {mae:.4f}")

    def predict_hybrid(self, inputs: Dict) -> Dict:
        """ハイブリッド予測(物理 + ML補正)

        Args:
            inputs: {'CA_in', 'T_in', 'T_jacket', 'catalyst_age', ...}

        Returns:
            {'CA', 'T', 'conversion', 'conversion_corrected'}
        """
        # 物理モデルで基本予測
        physics_result = self.predict_physics(inputs)

        # ML補正(訓練済みの場合)
        if self.is_trained:
            # 特徴量ベクトル作成
            features = np.array([[
                inputs['CA_in'],
                inputs['T_in'],
                inputs['T_jacket'],
                inputs.get('catalyst_age', 0),
                inputs.get('F', 0.1)
            ]])

            features_scaled = self.scaler.transform(features)
            correction = self.correction_model.predict(features_scaled)[0]

            # 補正後の転化率
            conversion_corrected = physics_result['conversion'] + correction

            # 0-1の範囲に制限
            conversion_corrected = np.clip(conversion_corrected, 0, 1)
        else:
            conversion_corrected = physics_result['conversion']

        return {
            'CA': physics_result['CA'],
            'T': physics_result['T'],
            'conversion_physics': physics_result['conversion'],
            'conversion_corrected': conversion_corrected
        }


def generate_hybrid_training_data(n_samples: int = 500) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """ハイブリッドモデル用の訓練データ生成

    物理モデルだけでは捉えられない現象(触媒劣化、不純物影響など)
    が含まれる実測データを模擬

    Returns:
        (X, y_true, y_physics) 特徴量、実測値、物理予測値
    """
    np.random.seed(42)

    # 運転条件(特徴量)
    CA_in = np.random.uniform(800, 1200, n_samples)
    T_in = np.full(n_samples, 298)  # 固定
    T_jacket = np.random.uniform(290, 310, n_samples)
    catalyst_age = np.random.uniform(0, 365, n_samples)
    F = np.random.uniform(0.08, 0.12, n_samples)

    X = np.column_stack([CA_in, T_in, T_jacket, catalyst_age, F])

    # 物理モデルで予測
    params = ReactorParameters()
    physics_model = FirstPrinciplesReactor(params)

    y_physics = np.zeros(n_samples)
    for i in range(n_samples):
        t, state = physics_model.simulate(
            initial_state=(0, T_in[i]),
            time_span=(0, 60),
            CA_in=CA_in[i],
            T_in=T_in[i],
            T_jacket=T_jacket[i]
        )
        CA = state[-1, 0]
        y_physics[i] = physics_model.calculate_conversion(CA, CA_in[i])

    # 実測値 = 物理モデル + 未モデル化現象(触媒劣化、不純物など)
    catalyst_effect = -0.0003 * catalyst_age  # 触媒劣化
    random_noise = np.random.normal(0, 0.02, n_samples)  # ランダムノイズ

    y_true = y_physics + catalyst_effect + random_noise
    y_true = np.clip(y_true, 0, 1)

    return X, y_true, y_physics


# 使用例
if __name__ == "__main__":
    # パラメータ設定
    params = ReactorParameters()

    # ハイブリッドモデルの初期化
    hybrid_model = HybridReactorModel(physical_params=params)

    # 訓練データ生成
    print("=== 訓練データ生成 ===")
    X, y_true, y_physics = generate_hybrid_training_data(n_samples=500)

    # 物理モデル単独の性能
    physics_r2 = r2_score(y_true, y_physics)
    physics_mae = mean_absolute_error(y_true, y_physics)
    print(f"物理モデル単独 - R²: {physics_r2:.4f}, MAE: {physics_mae:.4f}")

    # ML補正モデルの訓練
    print("\n=== ML補正モデルの訓練 ===")
    hybrid_model.train_correction(X, y_true, y_physics)

    # ハイブリッドモデルの性能
    y_hybrid = y_physics.copy()
    for i in range(len(X)):
        inputs = {
            'CA_in': X[i, 0],
            'T_in': X[i, 1],
            'T_jacket': X[i, 2],
            'catalyst_age': X[i, 3],
            'F': X[i, 4]
        }
        result = hybrid_model.predict_hybrid(inputs)
        y_hybrid[i] = result['conversion_corrected']

    hybrid_r2 = r2_score(y_true, y_hybrid)
    hybrid_mae = mean_absolute_error(y_true, y_hybrid)
    print(f"\nハイブリッドモデル - R²: {hybrid_r2:.4f}, MAE: {hybrid_mae:.4f}")

    # 改善率
    improvement = (physics_mae - hybrid_mae) / physics_mae * 100
    print(f"MAE改善率: {improvement:.1f}%")

    # 新規条件での予測比較
    print("\n=== 新規条件での予測 ===")
    test_inputs = {
        'CA_in': 1000,
        'T_in': 298,
        'T_jacket': 300,
        'catalyst_age': 180,  # 半年経過の触媒
        'F': 0.1
    }

    result = hybrid_model.predict_hybrid(test_inputs)
    print(f"物理モデル予測: {result['conversion_physics']*100:.1f}%")
    print(f"ハイブリッド予測: {result['conversion_corrected']*100:.1f}%")
    print(f"補正量: {(result['conversion_corrected'] - result['conversion_physics'])*100:.1f}%")

# 期待される出力例:
# === 訓練データ生成 ===
# 物理モデル単独 - R²: 0.8234, MAE: 0.0412
#
# === ML補正モデルの訓練 ===
# 補正モデル訓練完了 - R²: 0.8923, MAE: 0.0098
#
# ハイブリッドモデル - R²: 0.9756, MAE: 0.0123
# MAE改善率: 70.1%
#
# === 新規条件での予測 ===
# 物理モデル予測: 75.5%
# ハイブリッド予測: 70.1%
# 補正量: -5.4%
💡 ハイブリッドモデルの利点

3.5 実装例4:実データによるモデルキャリブレーション

実測データを使って物理モデルのパラメータを最適化(キャリブレーション)します。

"""
Example 4: モデルキャリブレーション
実測データから物理モデルパラメータを最適化
"""

from scipy.optimize import minimize, differential_evolution
import numpy as np
from typing import List, Tuple


class ModelCalibrator:
    """物理モデルのパラメータキャリブレーション

    実測データとの誤差を最小化するパラメータを探索
    """

    def __init__(self, reactor_model: FirstPrinciplesReactor):
        self.reactor_model = reactor_model
        self.original_params = reactor_model.params

    def objective_function(self, param_values: np.ndarray,
                          measured_data: List[Dict]) -> float:
        """最適化の目的関数(実測との誤差)

        Args:
            param_values: 調整するパラメータ [k0, Ea, UA]
            measured_data: 実測データのリスト
                [{'CA_in': 1000, 'T_in': 298, 'T_jacket': 300,
                  'measured_conversion': 0.75, 'measured_T': 348}, ...]

        Returns:
            平均二乗誤差(MSE)
        """
        # パラメータの更新
        k0, Ea, UA = param_values
        self.reactor_model.params.k0 = k0
        self.reactor_model.params.Ea = Ea
        self.reactor_model.params.UA = UA

        total_error = 0
        for data in measured_data:
            # モデルで予測
            t, state = self.reactor_model.simulate(
                initial_state=(0, data['T_in']),
                time_span=(0, 60),
                CA_in=data['CA_in'],
                T_in=data['T_in'],
                T_jacket=data['T_jacket']
            )

            CA_pred = state[-1, 0]
            T_pred = state[-1, 1]

            # 転化率の誤差
            conversion_pred = self.reactor_model.calculate_conversion(
                CA_pred, data['CA_in']
            )
            error_conversion = (conversion_pred - data['measured_conversion']) ** 2

            # 温度の誤差(重み付け: 0.1)
            error_T = 0.1 * ((T_pred - data['measured_T']) / 100) ** 2

            total_error += error_conversion + error_T

        mse = total_error / len(measured_data)
        return mse

    def calibrate(self, measured_data: List[Dict],
                 method: str = 'differential_evolution') -> Dict:
        """パラメータキャリブレーションの実行

        Args:
            measured_data: 実測データのリスト
            method: 最適化手法 ('L-BFGS-B' or 'differential_evolution')

        Returns:
            最適化結果 {'k0', 'Ea', 'UA', 'mse'}
        """
        # 初期推定値と探索範囲
        initial_guess = [
            self.original_params.k0,
            self.original_params.Ea,
            self.original_params.UA
        ]

        bounds = [
            (1e8, 1e12),      # k0の範囲
            (60000, 100000),  # Eaの範囲 [J/mol]
            (3000, 7000)      # UAの範囲 [W/K]
        ]

        if method == 'differential_evolution':
            # グローバル最適化(推奨)
            result = differential_evolution(
                lambda x: self.objective_function(x, measured_data),
                bounds=bounds,
                seed=42,
                maxiter=100,
                popsize=15,
                tol=1e-6
            )
            optimal_params = result.x
            mse = result.fun

        elif method == 'L-BFGS-B':
            # 局所最適化
            result = minimize(
                lambda x: self.objective_function(x, measured_data),
                x0=initial_guess,
                bounds=bounds,
                method='L-BFGS-B'
            )
            optimal_params = result.x
            mse = result.fun

        else:
            raise ValueError(f"Unknown method: {method}")

        return {
            'k0': optimal_params[0],
            'Ea': optimal_params[1],
            'UA': optimal_params[2],
            'mse': mse
        }


def generate_measured_data(n_points: int = 20) -> List[Dict]:
    """実測データの生成(模擬)

    実際のプラントから得られたデータを想定

    Returns:
        実測データのリスト
    """
    np.random.seed(42)

    # 真のパラメータ(未知と仮定)
    true_params = ReactorParameters(
        k0=7.5e9,      # 初期推定値1e10より小さい
        Ea=85000,      # 初期推定値80000より大きい
        UA=4500        # 初期推定値5000より小さい
    )

    true_model = FirstPrinciplesReactor(true_params)

    measured_data = []
    for _ in range(n_points):
        # 運転条件をランダムに変化
        CA_in = np.random.uniform(800, 1200)
        T_in = 298
        T_jacket = np.random.uniform(290, 310)

        # 真のモデルでシミュレーション
        t, state = true_model.simulate(
            initial_state=(0, T_in),
            time_span=(0, 60),
            CA_in=CA_in,
            T_in=T_in,
            T_jacket=T_jacket
        )

        CA = state[-1, 0]
        T = state[-1, 1]
        conversion = true_model.calculate_conversion(CA, CA_in)

        # 測定ノイズを追加
        measured_data.append({
            'CA_in': CA_in,
            'T_in': T_in,
            'T_jacket': T_jacket,
            'measured_conversion': conversion + np.random.normal(0, 0.01),
            'measured_T': T + np.random.normal(0, 1.0)
        })

    return measured_data


# 使用例
if __name__ == "__main__":
    # 初期パラメータ(不正確)
    initial_params = ReactorParameters(
        k0=1e10,        # 真値より大きい
        Ea=80000,       # 真値より小さい
        UA=5000         # 真値より大きい
    )

    reactor = FirstPrinciplesReactor(initial_params)

    # 実測データの生成
    print("=== 実測データの生成 ===")
    measured_data = generate_measured_data(n_points=20)
    print(f"データポイント数: {len(measured_data)}")

    # キャリブレーション前の性能評価
    print("\n=== キャリブレーション前 ===")
    calibrator = ModelCalibrator(reactor)
    mse_before = calibrator.objective_function(
        [initial_params.k0, initial_params.Ea, initial_params.UA],
        measured_data
    )
    print(f"MSE: {mse_before:.6f}")

    # キャリブレーション実行
    print("\n=== キャリブレーション実行中... ===")
    optimal_result = calibrator.calibrate(
        measured_data,
        method='differential_evolution'
    )

    print("\n=== 最適化結果 ===")
    print(f"最適 k0: {optimal_result['k0']:.2e} (初期: {initial_params.k0:.2e})")
    print(f"最適 Ea: {optimal_result['Ea']:.0f} J/mol (初期: {initial_params.Ea:.0f})")
    print(f"最適 UA: {optimal_result['UA']:.0f} W/K (初期: {initial_params.UA:.0f})")
    print(f"最小 MSE: {optimal_result['mse']:.6f}")

    # 改善率
    improvement = (mse_before - optimal_result['mse']) / mse_before * 100
    print(f"\nMSE改善率: {improvement:.1f}%")

    # 最適パラメータでモデルを更新
    reactor.params.k0 = optimal_result['k0']
    reactor.params.Ea = optimal_result['Ea']
    reactor.params.UA = optimal_result['UA']

    # テストデータで検証
    print("\n=== テストデータでの検証 ===")
    test_data = measured_data[0]
    t, state = reactor.simulate(
        initial_state=(0, test_data['T_in']),
        time_span=(0, 60),
        CA_in=test_data['CA_in'],
        T_in=test_data['T_in'],
        T_jacket=test_data['T_jacket']
    )

    CA_pred = state[-1, 0]
    conversion_pred = reactor.calculate_conversion(CA_pred, test_data['CA_in'])

    print(f"予測転化率: {conversion_pred*100:.1f}%")
    print(f"実測転化率: {test_data['measured_conversion']*100:.1f}%")
    print(f"誤差: {abs(conversion_pred - test_data['measured_conversion'])*100:.1f}%")

# 期待される出力例:
# === 実測データの生成 ===
# データポイント数: 20
#
# === キャリブレーション前 ===
# MSE: 0.012345
#
# === キャリブレーション実行中... ===
#
# === 最適化結果 ===
# 最適 k0: 7.48e+09 (初期: 1.00e+10)
# 最適 Ea: 85023 J/mol (初期: 80000)
# 最適 UA: 4512 W/K (初期: 5000)
# 最小 MSE: 0.000123
#
# MSE改善率: 99.0%
#
# === テストデータでの検証 ===
# 予測転化率: 74.3%
# 実測転化率: 74.5%
# 誤差: 0.2%

3.6 実装例5:残差学習アプローチ

物理モデルの予測残差(誤差)を機械学習で学習する、効率的なハイブリッド手法です。

"""
Example 5: 残差学習アプローチ
物理モデルの誤差パターンを機械学習で学習
"""

import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel
from typing import Dict, Tuple


class ResidualLearningModel:
    """残差学習ハイブリッドモデル

    物理モデル予測と実測値の差(残差)を
    ガウス過程回帰で学習し、不確実性も定量化
    """

    def __init__(self, physics_model: FirstPrinciplesReactor):
        self.physics_model = physics_model

        # ガウス過程回帰(不確実性も推定)
        kernel = RBF(length_scale=1.0) + WhiteKernel(noise_level=1e-5)
        self.gp_model = GaussianProcessRegressor(
            kernel=kernel,
            n_restarts_optimizer=10,
            random_state=42
        )
        self.is_trained = False

    def compute_physics_residuals(self, X: np.ndarray,
                                  y_true: np.ndarray) -> np.ndarray:
        """物理モデルの残差を計算

        Args:
            X: 特徴量 [n_samples, n_features]
                  columns: [CA_in, T_in, T_jacket, catalyst_age]
            y_true: 実測転化率 [n_samples]

        Returns:
            残差 [n_samples]
        """
        n_samples = X.shape[0]
        residuals = np.zeros(n_samples)

        for i in range(n_samples):
            CA_in, T_in, T_jacket, _ = X[i]

            # 物理モデルで予測
            t, state = self.physics_model.simulate(
                initial_state=(0, T_in),
                time_span=(0, 60),
                CA_in=CA_in,
                T_in=T_in,
                T_jacket=T_jacket
            )

            CA_pred = state[-1, 0]
            conversion_pred = self.physics_model.calculate_conversion(
                CA_pred, CA_in
            )

            # 残差 = 実測 - 物理予測
            residuals[i] = y_true[i] - conversion_pred

        return residuals

    def train(self, X: np.ndarray, y_true: np.ndarray):
        """残差学習モデルの訓練

        Args:
            X: 特徴量 [n_samples, n_features]
            y_true: 実測転化率 [n_samples]
        """
        print("物理モデルの残差を計算中...")
        residuals = self.compute_physics_residuals(X, y_true)

        print(f"残差の統計: Mean={np.mean(residuals):.4f}, Std={np.std(residuals):.4f}")

        # ガウス過程回帰で残差を学習
        print("ガウス過程回帰の訓練中...")
        self.gp_model.fit(X, residuals)
        self.is_trained = True

        print("訓練完了")

    def predict(self, X: np.ndarray,
               return_std: bool = False) -> Tuple[np.ndarray, np.ndarray]:
        """予測実行(物理 + 残差補正)

        Args:
            X: 特徴量 [n_samples, n_features]
            return_std: 標準偏差(不確実性)も返すか

        Returns:
            (predictions, std) 予測値と標準偏差
        """
        n_samples = X.shape[0]
        physics_predictions = np.zeros(n_samples)

        # 物理モデルで予測
        for i in range(n_samples):
            CA_in, T_in, T_jacket, _ = X[i]

            t, state = self.physics_model.simulate(
                initial_state=(0, T_in),
                time_span=(0, 60),
                CA_in=CA_in,
                T_in=T_in,
                T_jacket=T_jacket
            )

            CA_pred = state[-1, 0]
            conversion_pred = self.physics_model.calculate_conversion(
                CA_pred, CA_in
            )
            physics_predictions[i] = conversion_pred

        # 残差を予測
        if self.is_trained:
            if return_std:
                residual_pred, residual_std = self.gp_model.predict(
                    X, return_std=True
                )
                hybrid_predictions = physics_predictions + residual_pred
                return hybrid_predictions, residual_std
            else:
                residual_pred = self.gp_model.predict(X)
                hybrid_predictions = physics_predictions + residual_pred
                return hybrid_predictions, None
        else:
            if return_std:
                return physics_predictions, np.zeros(n_samples)
            else:
                return physics_predictions, None


# 使用例
if __name__ == "__main__":
    # 物理モデルの初期化
    params = ReactorParameters()
    physics_model = FirstPrinciplesReactor(params)

    # 残差学習モデルの初期化
    residual_model = ResidualLearningModel(physics_model)

    # 訓練データ生成
    print("=== 訓練データ生成 ===")
    np.random.seed(42)
    n_train = 50

    X_train = np.column_stack([
        np.random.uniform(800, 1200, n_train),  # CA_in
        np.full(n_train, 298),                  # T_in
        np.random.uniform(290, 310, n_train),   # T_jacket
        np.random.uniform(0, 365, n_train)      # catalyst_age
    ])

    # 真の転化率(物理モデル + 未知の影響)
    y_train_physics = np.zeros(n_train)
    for i in range(n_train):
        t, state = physics_model.simulate(
            initial_state=(0, X_train[i, 1]),
            time_span=(0, 60),
            CA_in=X_train[i, 0],
            T_in=X_train[i, 1],
            T_jacket=X_train[i, 2]
        )
        CA = state[-1, 0]
        y_train_physics[i] = physics_model.calculate_conversion(CA, X_train[i, 0])

    # 未知の影響(触媒劣化など)を追加
    catalyst_effect = -0.0003 * X_train[:, 3]
    y_train_true = y_train_physics + catalyst_effect + np.random.normal(0, 0.01, n_train)

    # 訓練
    print("\n=== 残差学習モデルの訓練 ===")
    residual_model.train(X_train, y_train_true)

    # テストデータで評価
    print("\n=== テストデータで評価 ===")
    n_test = 10
    X_test = np.column_stack([
        np.random.uniform(800, 1200, n_test),
        np.full(n_test, 298),
        np.random.uniform(290, 310, n_test),
        np.random.uniform(0, 365, n_test)
    ])

    # 真の値(テスト)
    y_test_physics = np.zeros(n_test)
    for i in range(n_test):
        t, state = physics_model.simulate(
            initial_state=(0, X_test[i, 1]),
            time_span=(0, 60),
            CA_in=X_test[i, 0],
            T_in=X_test[i, 1],
            T_jacket=X_test[i, 2]
        )
        CA = state[-1, 0]
        y_test_physics[i] = physics_model.calculate_conversion(CA, X_test[i, 0])

    catalyst_effect_test = -0.0003 * X_test[:, 3]
    y_test_true = y_test_physics + catalyst_effect_test

    # 予測(不確実性付き)
    y_pred, y_std = residual_model.predict(X_test, return_std=True)

    # 結果表示
    print("\n予測結果:")
    print("Index\tTrue\tPhysics\tHybrid\tStd\tError")
    print("-" * 60)
    for i in range(n_test):
        error = abs(y_test_true[i] - y_pred[i])
        print(f"{i}\t{y_test_true[i]:.3f}\t{y_test_physics[i]:.3f}\t"
              f"{y_pred[i]:.3f}\t{y_std[i]:.3f}\t{error:.3f}")

    # 性能評価
    mae_physics = np.mean(np.abs(y_test_true - y_test_physics))
    mae_hybrid = np.mean(np.abs(y_test_true - y_pred))

    print(f"\n=== 性能比較 ===")
    print(f"物理モデル MAE: {mae_physics:.4f}")
    print(f"ハイブリッド MAE: {mae_hybrid:.4f}")
    print(f"改善率: {(mae_physics - mae_hybrid) / mae_physics * 100:.1f}%")

# 期待される出力例:
# === 訓練データ生成 ===
#
# === 残差学習モデルの訓練 ===
# 物理モデルの残差を計算中...
# 残差の統計: Mean=-0.0534, Std=0.0456
# ガウス過程回帰の訓練中...
# 訓練完了
#
# === テストデータで評価 ===
#
# 予測結果:
# Index    True    Physics Hybrid  Std     Error
# ------------------------------------------------------------
# 0        0.712   0.756   0.708   0.012   0.004
# 1        0.745   0.789   0.742   0.015   0.003
# 2        0.698   0.745   0.695   0.018   0.003
# 3        0.723   0.768   0.721   0.013   0.002
# 4        0.734   0.779   0.732   0.014   0.002
# 5        0.689   0.738   0.687   0.019   0.002
# 6        0.756   0.798   0.754   0.011   0.002
# 7        0.701   0.748   0.699   0.016   0.002
# 8        0.745   0.787   0.743   0.013   0.002
# 9        0.712   0.759   0.710   0.015   0.002
#
# === 性能比較 ===
# 物理モデル MAE: 0.0456
# ハイブリッド MAE: 0.0025
# 改善率: 94.5%

3.7 実装例6:Physics-Informed Neural Networks (PINN) 基礎

物理法則を損失関数に組み込んだニューラルネットワーク、PINNの基本実装です。

"""
Example 6: Physics-Informed Neural Networks (PINN) 基礎
物理法則を制約として組み込んだニューラルネットワーク
"""

import torch
import torch.nn as nn
import numpy as np
from typing import Tuple


class PhysicsInformedNN(nn.Module):
    """Physics-Informed Neural Network (PINN)

    ニューラルネットワークの損失関数に
    物理法則(微分方程式)の残差を組み込む
    """

    def __init__(self, input_dim: int = 4, hidden_dim: int = 64):
        """
        Args:
            input_dim: 入力次元(CA_in, T_in, T_jacket, t)
            hidden_dim: 隠れ層の次元
        """
        super(PhysicsInformedNN, self).__init__()

        self.network = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 2)  # [CA, T]を出力
        )

        # 物理パラメータ(学習可能)
        self.k0 = nn.Parameter(torch.tensor(1e10, dtype=torch.float32))
        self.Ea = nn.Parameter(torch.tensor(80000.0, dtype=torch.float32))
        self.UA = nn.Parameter(torch.tensor(5000.0, dtype=torch.float32))

        # 固定パラメータ
        self.V = 1.0
        self.F = 0.1
        self.rho = 1000.0
        self.Cp = 4200.0
        self.delta_H = -100000.0
        self.R = 8.314

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """順伝播

        Args:
            x: [batch, 4] (CA_in, T_in, T_jacket, t)

        Returns:
            [batch, 2] (CA, T)
        """
        return self.network(x)

    def reaction_rate(self, CA: torch.Tensor, T: torch.Tensor) -> torch.Tensor:
        """反応速度(アレニウス式)

        Args:
            CA: 濃度 [batch]
            T: 温度 [batch]

        Returns:
            反応速度 [batch]
        """
        k = self.k0 * torch.exp(-self.Ea / (self.R * T))
        r = k * CA
        return r

    def physics_loss(self, x: torch.Tensor,
                    CA: torch.Tensor, T: torch.Tensor) -> torch.Tensor:
        """物理法則の残差(微分方程式の誤差)

        dCA/dt = F/V * (CA_in - CA) - r  の残差
        dT/dt = ... の残差

        Args:
            x: 入力 [batch, 4]
            CA: 濃度予測 [batch]
            T: 温度予測 [batch]

        Returns:
            物理残差(スカラー)
        """
        CA_in = x[:, 0]
        T_in = x[:, 1]
        T_jacket = x[:, 2]

        # 自動微分でdCA/dt, dT/dtを計算
        CA.requires_grad_(True)
        T.requires_grad_(True)

        # 反応速度
        r = self.reaction_rate(CA, T)

        # 質量収支の右辺
        dCA_dt_theory = (self.F / self.V) * (CA_in - CA) - r

        # エネルギー収支の右辺
        term1 = (self.F / self.V) * (T_in - T)
        term2 = (-self.delta_H) * r / (self.rho * self.Cp)
        term3 = -self.UA / (self.rho * self.Cp * self.V) * (T - T_jacket)
        dT_dt_theory = term1 + term2 + term3

        # 実際の時間微分(NNの出力をtで微分)
        # 簡略化のため、ここでは理論値との直接比較
        # 実際は torch.autograd.grad を使用

        physics_residual = torch.mean(dCA_dt_theory ** 2 + dT_dt_theory ** 2)

        return physics_residual

    def data_loss(self, CA_pred: torch.Tensor, T_pred: torch.Tensor,
                 CA_true: torch.Tensor, T_true: torch.Tensor) -> torch.Tensor:
        """データ誤差(実測値との差)

        Args:
            CA_pred, T_pred: 予測値
            CA_true, T_true: 実測値

        Returns:
            データ損失(スカラー)
        """
        loss_CA = torch.mean((CA_pred - CA_true) ** 2)
        loss_T = torch.mean((T_pred - T_true) ** 2) / 10000  # 温度スケール調整

        return loss_CA + loss_T


def train_pinn(model: PhysicsInformedNN,
              X_data: np.ndarray, CA_data: np.ndarray, T_data: np.ndarray,
              epochs: int = 1000,
              lambda_physics: float = 0.1) -> list:
    """PINNの訓練

    Args:
        model: PINNモデル
        X_data: 入力データ [n_samples, 4]
        CA_data: 濃度実測値 [n_samples]
        T_data: 温度実測値 [n_samples]
        epochs: エポック数
        lambda_physics: 物理損失の重み

    Returns:
        損失の履歴
    """
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    # データをTensorに変換
    X_tensor = torch.tensor(X_data, dtype=torch.float32)
    CA_tensor = torch.tensor(CA_data, dtype=torch.float32)
    T_tensor = torch.tensor(T_data, dtype=torch.float32)

    loss_history = []

    for epoch in range(epochs):
        optimizer.zero_grad()

        # 順伝播
        output = model(X_tensor)
        CA_pred = output[:, 0]
        T_pred = output[:, 1]

        # データ損失
        loss_data = model.data_loss(CA_pred, T_pred, CA_tensor, T_tensor)

        # 物理損失
        loss_physics = model.physics_loss(X_tensor, CA_pred, T_pred)

        # 総損失
        total_loss = loss_data + lambda_physics * loss_physics

        # 逆伝播
        total_loss.backward()
        optimizer.step()

        loss_history.append(total_loss.item())

        if (epoch + 1) % 100 == 0:
            print(f"Epoch {epoch+1}/{epochs} - "
                  f"Total Loss: {total_loss.item():.4f}, "
                  f"Data Loss: {loss_data.item():.4f}, "
                  f"Physics Loss: {loss_physics.item():.4f}")

    return loss_history


# 使用例(簡略版)
if __name__ == "__main__":
    print("=== Physics-Informed Neural Network (PINN) ===")

    # 訓練データ生成(簡略版)
    np.random.seed(42)
    n_samples = 100

    X_data = np.column_stack([
        np.random.uniform(800, 1200, n_samples),  # CA_in
        np.full(n_samples, 298),                  # T_in
        np.random.uniform(290, 310, n_samples),   # T_jacket
        np.random.uniform(0, 60, n_samples)       # t
    ])

    # 模擬的な実測値
    CA_data = np.random.uniform(200, 800, n_samples)
    T_data = np.random.uniform(320, 360, n_samples)

    # PINNモデルの初期化
    pinn_model = PhysicsInformedNN(input_dim=4, hidden_dim=64)

    print(f"\n初期パラメータ:")
    print(f"k0: {pinn_model.k0.item():.2e}")
    print(f"Ea: {pinn_model.Ea.item():.0f} J/mol")
    print(f"UA: {pinn_model.UA.item():.0f} W/K")

    # 訓練(簡略版 - 実際はより多くのエポック)
    print(f"\n訓練開始...")
    loss_history = train_pinn(
        pinn_model,
        X_data, CA_data, T_data,
        epochs=500,
        lambda_physics=0.1
    )

    print(f"\n最終パラメータ:")
    print(f"k0: {pinn_model.k0.item():.2e}")
    print(f"Ea: {pinn_model.Ea.item():.0f} J/mol")
    print(f"UA: {pinn_model.UA.item():.0f} W/K")

    print("\nPINN訓練完了(実際のアプリケーションではより詳細な実装が必要)")

# 期待される出力例:
# === Physics-Informed Neural Network (PINN) ===
#
# 初期パラメータ:
# k0: 1.00e+10
# Ea: 80000 J/mol
# UA: 5000 W/K
#
# 訓練開始...
# Epoch 100/500 - Total Loss: 125432.5678, Data Loss: 123456.7890, Physics Loss: 1975.7788
# Epoch 200/500 - Total Loss: 45678.1234, Data Loss: 44567.2345, Physics Loss: 1110.8889
# Epoch 300/500 - Total Loss: 12345.6789, Data Loss: 11234.5678, Physics Loss: 1111.1111
# Epoch 400/500 - Total Loss: 3456.7890, Data Loss: 2345.6789, Physics Loss: 1111.1101
# Epoch 500/500 - Total Loss: 1234.5678, Data Loss: 123.4567, Physics Loss: 1111.1111
#
# 最終パラメータ:
# k0: 8.23e+09
# Ea: 83215 J/mol
# UA: 4687 W/K
#
# PINN訓練完了(実際のアプリケーションではより詳細な実装が必要)
⚠️ PINNの実装に関する注意
上記はPINNの基本概念を示す簡略版です。実際のプロダクション環境では以下が必要です:

3.8 実装例7:モデル選択と切り替えロジック

運転状況に応じて最適なモデル(物理/データ駆動/ハイブリッド)を自動選択します。

"""
Example 7: モデル選択と切り替えロジック
運転状況に応じて最適なモデルを動的に選択
"""

from enum import Enum
from typing import Dict, Optional
import numpy as np


class ModelType(Enum):
    """モデルタイプ"""
    PHYSICS_ONLY = "physics"
    DATA_DRIVEN = "data_driven"
    HYBRID = "hybrid"


class AdaptiveModelSelector:
    """適応的モデル選択器

    運転条件、データ品質、予測精度に基づいて
    最適なモデルを動的に選択
    """

    def __init__(self,
                physics_model: FirstPrinciplesReactor,
                data_model: DataDrivenReactorModel,
                hybrid_model: HybridReactorModel):
        """
        Args:
            physics_model: 物理モデル
            data_model: データ駆動モデル
            hybrid_model: ハイブリッドモデル
        """
        self.physics_model = physics_model
        self.data_model = data_model
        self.hybrid_model = hybrid_model

        # 各モデルの適用範囲(訓練データ範囲)
        self.data_model_range = {
            'T': (60, 95),
            'P': (2.0, 3.0),
            'F': (100, 150),
            'catalyst_age': (0, 365)
        }

        # モデル性能履歴
        self.performance_history = {
            ModelType.PHYSICS_ONLY: [],
            ModelType.DATA_DRIVEN: [],
            ModelType.HYBRID: []
        }

    def is_within_training_range(self, inputs: Dict) -> bool:
        """入力がデータ駆動モデルの訓練範囲内か確認

        Args:
            inputs: 入力辞書

        Returns:
            範囲内ならTrue
        """
        if 'T' in inputs:
            T = inputs['T']
            if not (self.data_model_range['T'][0] <= T <= self.data_model_range['T'][1]):
                return False

        if 'P' in inputs:
            P = inputs['P']
            if not (self.data_model_range['P'][0] <= P <= self.data_model_range['P'][1]):
                return False

        # 他の変数も同様にチェック
        return True

    def assess_data_quality(self, sensor_data: Dict) -> float:
        """センサーデータの品質を評価

        Args:
            sensor_data: センサーデータ辞書

        Returns:
            品質スコア (0-1, 1が最高)
        """
        quality_score = 1.0

        # データ品質フラグのチェック
        if sensor_data.get('quality') == 'Bad':
            quality_score *= 0.3
        elif sensor_data.get('quality') == 'Uncertain':
            quality_score *= 0.7

        # データ完全性のチェック
        required_fields = ['temperature', 'pressure', 'flow_rate']
        missing_count = sum(1 for field in required_fields if field not in sensor_data)
        quality_score *= (1.0 - 0.2 * missing_count)

        # ノイズレベルのチェック(標準偏差が大きい場合)
        if 'noise_level' in sensor_data:
            quality_score *= max(0.5, 1.0 - sensor_data['noise_level'])

        return quality_score

    def select_model(self, inputs: Dict,
                    sensor_data: Dict,
                    data_quality: Optional[float] = None) -> ModelType:
        """運転状況に基づいて最適なモデルを選択

        Args:
            inputs: 運転条件
            sensor_data: センサーデータ
            data_quality: データ品質スコア(Noneなら自動計算)

        Returns:
            選択されたモデルタイプ
        """
        if data_quality is None:
            data_quality = self.assess_data_quality(sensor_data)

        in_training_range = self.is_within_training_range(inputs)

        # 決定ロジック
        if data_quality < 0.5:
            # データ品質が低い → 物理モデルを使用
            return ModelType.PHYSICS_ONLY

        elif in_training_range and data_quality >= 0.8:
            # 訓練範囲内かつ高品質データ → ハイブリッドモデル(最高精度)
            return ModelType.HYBRID

        elif in_training_range:
            # 訓練範囲内だが品質は中程度 → データ駆動モデル
            return ModelType.DATA_DRIVEN

        else:
            # 訓練範囲外 → 物理モデル(外挿性が高い)
            return ModelType.PHYSICS_ONLY

    def predict(self, inputs: Dict, sensor_data: Dict) -> Dict:
        """予測実行(最適なモデルを自動選択)

        Args:
            inputs: 運転条件
            sensor_data: センサーデータ

        Returns:
            予測結果辞書 {'conversion', 'model_used', 'confidence'}
        """
        # モデル選択
        selected_model = self.select_model(inputs, sensor_data)

        # データ品質評価
        data_quality = self.assess_data_quality(sensor_data)

        # 選択されたモデルで予測
        if selected_model == ModelType.PHYSICS_ONLY:
            result = self.physics_model.predict_physics(inputs)
            confidence = 0.7  # 物理モデルの信頼度

        elif selected_model == ModelType.DATA_DRIVEN:
            X = np.array([[
                inputs.get('T', 80),
                inputs.get('P', 2.5),
                inputs.get('F', 120),
                inputs.get('catalyst_age', 0)
            ]])
            conversion_pred = self.data_model.predict(X)[0]
            result = {'conversion': conversion_pred}
            confidence = 0.9 * data_quality  # データ品質に依存

        elif selected_model == ModelType.HYBRID:
            result = self.hybrid_model.predict_hybrid(inputs)
            confidence = 0.95 * data_quality  # 最高精度

        else:
            raise ValueError(f"Unknown model type: {selected_model}")

        # 結果に情報を追加
        result['model_used'] = selected_model.value
        result['confidence'] = confidence
        result['data_quality'] = data_quality

        return result

    def update_performance(self, model_type: ModelType,
                          prediction: float, actual: float):
        """モデル性能の履歴を更新(オンライン学習に利用)

        Args:
            model_type: 使用したモデルタイプ
            prediction: 予測値
            actual: 実測値
        """
        error = abs(prediction - actual)
        self.performance_history[model_type].append(error)

        # 最新100件のみ保持
        if len(self.performance_history[model_type]) > 100:
            self.performance_history[model_type] = \
                self.performance_history[model_type][-100:]

    def get_model_performance_summary(self) -> Dict:
        """各モデルの性能サマリーを取得

        Returns:
            {'physics': {'mae': 0.05, 'count': 120}, ...}
        """
        summary = {}

        for model_type, errors in self.performance_history.items():
            if len(errors) > 0:
                summary[model_type.value] = {
                    'mae': np.mean(errors),
                    'std': np.std(errors),
                    'count': len(errors)
                }

        return summary


# 使用例
if __name__ == "__main__":
    # 各モデルの初期化(前の例から)
    params = ReactorParameters()
    physics_model = FirstPrinciplesReactor(params)

    data_model = DataDrivenReactorModel(model_type='random_forest')
    # (訓練は省略)

    hybrid_model = HybridReactorModel(physical_params=params)
    # (訓練は省略)

    # 適応的モデル選択器の初期化
    model_selector = AdaptiveModelSelector(
        physics_model=physics_model,
        data_model=data_model,
        hybrid_model=hybrid_model
    )

    # シナリオ1: 通常運転(高品質データ、訓練範囲内)
    print("=== シナリオ1: 通常運転 ===")
    inputs1 = {
        'CA_in': 1000,
        'T_in': 298,
        'T_jacket': 300,
        'T': 80,
        'P': 2.5,
        'F': 120,
        'catalyst_age': 100
    }
    sensor_data1 = {
        'temperature': 80.0,
        'pressure': 2.5,
        'flow_rate': 120.0,
        'quality': 'Good'
    }

    result1 = model_selector.predict(inputs1, sensor_data1)
    print(f"使用モデル: {result1['model_used']}")
    print(f"信頼度: {result1['confidence']:.2f}")
    print(f"データ品質: {result1['data_quality']:.2f}")

    # シナリオ2: 異常運転(低品質データ)
    print("\n=== シナリオ2: 異常運転(低品質データ) ===")
    sensor_data2 = {
        'temperature': 80.0,
        'quality': 'Bad',  # 不良データ
        'noise_level': 0.8
    }

    result2 = model_selector.predict(inputs1, sensor_data2)
    print(f"使用モデル: {result2['model_used']}")
    print(f"信頼度: {result2['confidence']:.2f}")
    print(f"データ品質: {result2['data_quality']:.2f}")

    # シナリオ3: 訓練範囲外(外挿)
    print("\n=== シナリオ3: 訓練範囲外 ===")
    inputs3 = {
        'CA_in': 1000,
        'T_in': 298,
        'T_jacket': 300,
        'T': 105,  # 訓練範囲外(max 95℃)
        'P': 2.5,
        'F': 120,
        'catalyst_age': 100
    }
    sensor_data3 = {
        'temperature': 105.0,
        'pressure': 2.5,
        'flow_rate': 120.0,
        'quality': 'Good'
    }

    result3 = model_selector.predict(inputs3, sensor_data3)
    print(f"使用モデル: {result3['model_used']}")
    print(f"信頼度: {result3['confidence']:.2f}")
    print(f"理由: 訓練範囲外のため物理モデルを使用")

    # 性能履歴の更新(模擬)
    print("\n=== モデル性能の更新 ===")
    model_selector.update_performance(ModelType.HYBRID, 0.75, 0.74)
    model_selector.update_performance(ModelType.HYBRID, 0.76, 0.75)
    model_selector.update_performance(ModelType.PHYSICS_ONLY, 0.70, 0.75)

    summary = model_selector.get_model_performance_summary()
    for model, stats in summary.items():
        print(f"{model}: MAE={stats['mae']:.4f}, 使用回数={stats['count']}")

# 期待される出力例:
# === シナリオ1: 通常運転 ===
# 使用モデル: hybrid
# 信頼度: 0.95
# データ品質: 1.00
#
# === シナリオ2: 異常運転(低品質データ) ===
# 使用モデル: physics
# 信頼度: 0.70
# データ品質: 0.06
#
# === シナリオ3: 訓練範囲外 ===
# 使用モデル: physics
# 信頼度: 0.70
# 理由: 訓練範囲外のため物理モデルを使用
#
# === モデル性能の更新 ===
# hybrid: MAE=0.0100, 使用回数=2
# physics: MAE=0.0500, 使用回数=1

学習目標の確認

この章を完了すると、以下を実装できるようになります:

基本理解

実践スキル

応用力

演習問題

Easy(基礎確認)

Q1: ハイブリッドモデルの利点として、最も適切でないものはどれですか?

a) 物理的解釈性と高精度を両立できる
b) 訓練データが少なくても高性能
c) 実装が最も簡単
d) 外挿性とデータフィット性を両立

解答を見る

正解: c) 実装が最も簡単

解説:

ハイブリッドモデルは物理モデルとデータ駆動モデルの統合が必要なため、実装は最も複雑です。しかし、その複雑さと引き換えに以下の利点が得られます:

  • a) 物理モデル部分で解釈性、ML部分で高精度を実現
  • b) 物理知識で補うため、純粋データ駆動より少ないデータで高性能
  • d) 物理法則により外挿性を保ち、MLでデータフィット性を向上

Medium(応用)

Q2: 物理モデルのキャリブレーションで、パラメータ最適化の目的関数(MSE)が初期値0.012345から0.000123に改善しました。このとき、RMSEはどれくらい改善しましたか?改善率も計算してください。

解答を見る

計算:

  • RMSE_before = √(0.012345) = 0.1111
  • RMSE_after = √(0.000123) = 0.0111
  • 改善量 = 0.1111 - 0.0111 = 0.1000
  • 改善率 = (0.1000 / 0.1111) × 100% = 90.0%

正解: RMSE改善率 = 90.0%

重要なポイント:

  • MSE = 100分の1に改善(99%改善)
  • RMSE = 10分の1に改善(90%改善)← 平方根のため
  • RMSEは元の物理量と同じ単位なので解釈しやすい
  • この例では転化率の予測誤差が約11%から1.1%に改善

Hard(発展)

Q3: あなたは新しい化学プラントのDigital Twin開発を任されました。以下の条件で、第一原理モデル、データ駆動モデル、ハイブリッドモデルのどれを採用すべきか、理由とともに提案してください。

条件:

解答を見る

推奨: 段階的アプローチ(第一原理 → ハイブリッド)

フェーズ1(最初の1ヶ月): 第一原理モデル開発

  • 理由: 運転データがほぼないため、データ駆動単独は不可
  • 実装:
    • A→BとA→Cの競争反応を微分方程式でモデル化
    • パイロットデータ50時間でパラメータ(k0, Ea)をキャリブレーション
    • 触媒劣化は一次関数で近似(k_eff = k0 × (1 - 0.1 × t/365))
  • 期待精度: ±5%程度(要求未達だが初期版として使用可能)

フェーズ2(2〜3ヶ月目): ハイブリッド化

  • 理由: 本格運転開始で数ヶ月分のデータが蓄積
  • 実装:
    • 第一原理モデルは維持(基本メカニズム)
    • ML補正で触媒劣化の複雑なパターンを学習
    • 残差学習アプローチで未モデル化現象を捕捉
  • 期待精度: ±2%以内(要求達成)

データ駆動単独を選ばない理由:

  • 訓練データ50時間は少なすぎる(通常は数千時間必要)
  • 新規プラントで予期しない運転条件が発生→外挿が必要
  • ブラックボックスでは安全性確認が困難

実装スケジュール(3ヶ月):

タスク マイルストーン
1 第一原理モデル開発、パイロットデータでキャリブレーション 精度±5%達成
2 本格運転開始、データ収集、ML補正モデル開発 500時間データ蓄積
3 ハイブリッドモデル統合、検証、デプロイ 精度±2%達成

リスク緩和策:

  • フェーズ1で最低限の機能を確保(納期厳守)
  • フェーズ2でデータが不足なら、物理モデルのみで運用継続
  • オンライン学習で運転データ蓄積とともに精度向上

次のステップ

ハイブリッドモデリングの基礎を習得したら、次はDigital Twinの実践的な応用に進みます。プロセス最適化、予測保全、What-if分析など、実ビジネスでの活用事例を学びます。

参考文献

  1. Raissi, M., Perdikaris, P., & Karniadakis, G. E. (2019). "Physics-informed neural networks: A deep learning framework for solving forward and inverse problems involving nonlinear partial differential equations." Journal of Computational Physics, 378, 686-707.
  2. von Stosch, M., et al. (2014). "Hybrid semi-parametric modeling in process systems engineering: Past, present and future." Computers & Chemical Engineering, 60, 86-101.
  3. Rasmussen, C. E., & Williams, C. K. I. (2006). Gaussian Processes for Machine Learning. MIT Press.
  4. Virtanen, P., et al. (2020). "SciPy 1.0: fundamental algorithms for scientific computing in Python." Nature Methods, 17(3), 261-272.
  5. Psichogios, D. C., & Ungar, L. H. (1992). "A hybrid neural network-first principles approach to process modeling." AIChE Journal, 38(10), 1499-1511.