第4章:仮想プロセス最適化

デジタルツインで実現する安全で効率的な最適化

📚 難易度: 中級 ⏱️ 読了時間: 30-35分 🔧 Python実装: 7例

4.1 What-ifシナリオ分析

デジタルツインの最大の価値は、実プロセスに影響を与えずに様々な「もしも」シナリオを試せることです。条件変更の影響を事前評価し、最適な運転戦略を見出します。

4.1.1 基本的なWhat-if分析

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import differential_evolution

class WhatIfAnalyzer:
    """デジタルツインを使ったWhat-ifシナリオ分析"""

    def __init__(self, digital_twin_model):
        self.model = digital_twin_model
        self.scenarios = []

    def create_scenario(self, name, parameter_changes):
        """シナリオ作成

        Args:
            name: シナリオ名
            parameter_changes: {param: value}の辞書
        """
        baseline = self.model.get_current_state()

        # シナリオ適用
        modified_state = baseline.copy()
        modified_state.update(parameter_changes)

        # シミュレーション実行
        result = self.model.simulate(modified_state)

        scenario = {
            'name': name,
            'changes': parameter_changes,
            'baseline': baseline,
            'result': result,
            'impact': self._calculate_impact(baseline, result)
        }

        self.scenarios.append(scenario)
        return scenario

    def _calculate_impact(self, baseline, result):
        """影響度計算"""
        return {
            'yield_change': ((result['yield'] - baseline['yield']) /
                           baseline['yield'] * 100),
            'quality_change': result['quality'] - baseline['quality'],
            'cost_change': result['cost'] - baseline['cost']
        }

    def compare_scenarios(self):
        """シナリオ比較"""
        df = pd.DataFrame([
            {
                'Scenario': s['name'],
                'Yield Change (%)': s['impact']['yield_change'],
                'Quality Change': s['impact']['quality_change'],
                'Cost Change': s['impact']['cost_change']
            }
            for s in self.scenarios
        ])
        return df

# 使用例
class SimpleReactorModel:
    def __init__(self):
        self.state = {'temp': 350, 'pressure': 5.0, 'flow': 100}

    def get_current_state(self):
        return self.state.copy()

    def simulate(self, state):
        # 簡易反応器モデル
        temp = state['temp']
        pressure = state['pressure']
        flow = state['flow']

        # 収率モデル(最適点: 360℃, 6 bar)
        yield_val = 85 - 0.05*(temp-360)**2 - 2*(pressure-6)**2
        quality = 95 - 0.02*abs(temp-360)
        cost = 0.5*temp + 10*pressure + 0.1*flow

        return {
            'yield': max(0, yield_val),
            'quality': quality,
            'cost': cost,
            **state
        }

# 実行
model = SimpleReactorModel()
analyzer = WhatIfAnalyzer(model)

# シナリオ1: 温度上昇
analyzer.create_scenario('High Temp', {'temp': 370})

# シナリオ2: 圧力増加
analyzer.create_scenario('High Pressure', {'pressure': 7.0})

# シナリオ3: 同時変更
analyzer.create_scenario('Combined', {'temp': 365, 'pressure': 6.5})

# 比較
print(analyzer.compare_scenarios())
# 出力:
#        Scenario  Yield Change (%)  Quality Change  Cost Change
# 0     High Temp            -5.88           -0.20        10.00
# 1  High Pressure            -7.06            0.00        20.00
# 2      Combined             3.53           -0.10        27.50
💡 実務Tips:
  • ベースラインとの差分を常に可視化する
  • 制約条件(安全限界)を明示する
  • 不確実性を考慮した感度分析も実施

4.2 仮想センサー(ソフトセンサー)

測定困難または高コストな変数を、測定可能な変数から推定するソフトセンサーをデジタルツインに統合します。

4.2.1 機械学習ベースソフトセンサー

from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_absolute_error
import joblib

class SoftSensor:
    """機械学習ベースのソフトセンサー"""

    def __init__(self, target_variable):
        self.target = target_variable
        self.model = RandomForestRegressor(
            n_estimators=100,
            max_depth=10,
            random_state=42
        )
        self.feature_names = None
        self.is_trained = False

    def train(self, X, y, test_size=0.2):
        """モデル訓練

        Args:
            X: 測定可能な変数(DataFrame)
            y: 目標変数(測定困難な変数)
        """
        self.feature_names = X.columns.tolist()

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42
        )

        self.model.fit(X_train, y_train)

        # 性能評価
        y_pred = self.model.predict(X_test)
        r2 = r2_score(y_test, y_pred)
        mae = mean_absolute_error(y_test, y_pred)

        self.is_trained = True

        return {
            'r2_score': r2,
            'mae': mae,
            'n_train': len(X_train),
            'n_test': len(X_test)
        }

    def predict(self, X):
        """リアルタイム推定"""
        if not self.is_trained:
            raise ValueError("Model not trained")

        return self.model.predict(X)

    def get_feature_importance(self):
        """特徴量重要度"""
        if not self.is_trained:
            return None

        importance = pd.DataFrame({
            'feature': self.feature_names,
            'importance': self.model.feature_importances_
        }).sort_values('importance', ascending=False)

        return importance

    def save(self, filepath):
        """モデル保存"""
        joblib.dump({
            'model': self.model,
            'feature_names': self.feature_names,
            'target': self.target
        }, filepath)

# 使用例
# 製品品質(測定困難)を運転条件(測定容易)から推定
np.random.seed(42)
n_samples = 500

# 訓練データ生成(実際はプロセスデータ)
data = pd.DataFrame({
    'temp': np.random.uniform(340, 370, n_samples),
    'pressure': np.random.uniform(4, 8, n_samples),
    'flow': np.random.uniform(80, 120, n_samples),
    'residence_time': np.random.uniform(10, 30, n_samples)
})

# 目標変数(製品純度)
data['purity'] = (
    92 + 0.2*data['temp'] - 0.01*data['temp']**2 +
    2*data['pressure'] + 0.1*data['residence_time'] +
    np.random.normal(0, 1, n_samples)
)

# ソフトセンサー訓練
X = data[['temp', 'pressure', 'flow', 'residence_time']]
y = data['purity']

soft_sensor = SoftSensor('product_purity')
metrics = soft_sensor.train(X, y)

print(f"R² Score: {metrics['r2_score']:.3f}")
print(f"MAE: {metrics['mae']:.3f}")
print("\nFeature Importance:")
print(soft_sensor.get_feature_importance())

# リアルタイム推定
new_data = pd.DataFrame({
    'temp': [360],
    'pressure': [6.0],
    'flow': [100],
    'residence_time': [20]
})
predicted_purity = soft_sensor.predict(new_data)
print(f"\nPredicted Purity: {predicted_purity[0]:.2f}%")

4.3 予知保全

デジタルツインで設備劣化を予測し、故障前にメンテナンスを実施します。

import warnings
warnings.filterwarnings('ignore')

class PredictiveMaintenanceSystem:
    """予知保全システム"""

    def __init__(self, equipment_name):
        self.equipment = equipment_name
        self.health_index = 100  # 健全度指数
        self.degradation_rate = 0.1
        self.threshold_warning = 70
        self.threshold_critical = 50

    def update_health(self, operating_hours, stress_factors):
        """健全度更新

        Args:
            operating_hours: 稼働時間
            stress_factors: ストレス要因辞書
        """
        # 基本劣化
        degradation = self.degradation_rate * operating_hours

        # ストレス要因による加速劣化
        temp_stress = max(0, stress_factors.get('temp_excess', 0)) * 0.5
        vibration_stress = stress_factors.get('vibration', 0) * 0.3
        load_stress = stress_factors.get('overload', 0) * 0.4

        total_degradation = degradation + temp_stress + vibration_stress + load_stress

        self.health_index -= total_degradation
        self.health_index = max(0, self.health_index)

        return self.health_index

    def predict_remaining_life(self, current_stress_factors):
        """残存寿命予測"""
        if self.health_index <= 0:
            return 0

        # 現在の劣化速度計算
        temp_stress = max(0, current_stress_factors.get('temp_excess', 0)) * 0.5
        vibration_stress = current_stress_factors.get('vibration', 0) * 0.3
        load_stress = current_stress_factors.get('overload', 0) * 0.4

        degradation_per_hour = (self.degradation_rate +
                                temp_stress + vibration_stress + load_stress)

        if degradation_per_hour <= 0:
            return float('inf')

        # 臨界値到達までの時間
        remaining_hours = (self.health_index - self.threshold_critical) / degradation_per_hour

        return max(0, remaining_hours)

    def get_maintenance_recommendation(self):
        """メンテナンス推奨"""
        if self.health_index >= self.threshold_warning:
            return {
                'status': 'HEALTHY',
                'action': 'None',
                'urgency': 'Low',
                'health': self.health_index
            }
        elif self.health_index >= self.threshold_critical:
            return {
                'status': 'WARNING',
                'action': 'Schedule maintenance within 7 days',
                'urgency': 'Medium',
                'health': self.health_index
            }
        else:
            return {
                'status': 'CRITICAL',
                'action': 'Immediate maintenance required',
                'urgency': 'High',
                'health': self.health_index
            }

# 使用例
pm_system = PredictiveMaintenanceSystem('Heat Exchanger HX-101')

# シミュレーション: 100時間稼働
for hour in range(1, 101):
    stress = {
        'temp_excess': 5 if hour % 20 == 0 else 0,  # 時々過熱
        'vibration': 2 if hour > 50 else 1,  # 振動増加
        'overload': 3 if hour > 80 else 0  # 後半に過負荷
    }

    health = pm_system.update_health(1, stress)

    if hour % 25 == 0:
        remaining = pm_system.predict_remaining_life(stress)
        rec = pm_system.get_maintenance_recommendation()
        print(f"Hour {hour}: Health={health:.1f}%, "
              f"Remaining Life={remaining:.1f}h, "
              f"Status={rec['status']}")

# 最終推奨
final_rec = pm_system.get_maintenance_recommendation()
print(f"\nFinal Recommendation: {final_rec['action']}")

4.4 リアルタイム最適化(RTO)

デジタルツインを使って現在の状態から最適な運転条件を動的に計算します。

from scipy.optimize import minimize

class RealTimeOptimizer:
    """リアルタイム最適化システム"""

    def __init__(self, digital_twin, objective='profit'):
        self.twin = digital_twin
        self.objective = objective

    def objective_function(self, x, weights):
        """目的関数

        Args:
            x: 決定変数 [temp, pressure, flow]
            weights: 重み係数
        """
        temp, pressure, flow = x

        # デジタルツインでシミュレーション
        state = {'temp': temp, 'pressure': pressure, 'flow': flow}
        result = self.twin.simulate(state)

        # 複合目的関数(利益最大化)
        revenue = result['yield'] * weights['product_price']
        cost = result['cost']
        quality_penalty = max(0, 90 - result['quality']) * weights['quality_penalty']

        profit = revenue - cost - quality_penalty

        return -profit  # 最小化問題として

    def optimize(self, initial_guess, bounds, constraints=None):
        """最適化実行

        Args:
            initial_guess: 初期値 [temp, pressure, flow]
            bounds: 変数範囲 [(min, max), ...]
            constraints: 制約条件
        """
        weights = {
            'product_price': 10.0,
            'quality_penalty': 50.0
        }

        # 制約条件
        cons = []
        if constraints:
            # 安全制約
            cons.append({
                'type': 'ineq',
                'fun': lambda x: constraints['max_temp'] - x[0]
            })
            cons.append({
                'type': 'ineq',
                'fun': lambda x: constraints['max_pressure'] - x[1]
            })

        # 最適化
        result = minimize(
            self.objective_function,
            initial_guess,
            args=(weights,),
            method='SLSQP',
            bounds=bounds,
            constraints=cons if cons else None
        )

        if result.success:
            opt_temp, opt_pressure, opt_flow = result.x
            opt_state = {
                'temp': opt_temp,
                'pressure': opt_pressure,
                'flow': opt_flow
            }
            opt_result = self.twin.simulate(opt_state)

            return {
                'success': True,
                'optimal_conditions': opt_state,
                'expected_performance': opt_result,
                'profit': -result.fun
            }
        else:
            return {'success': False, 'message': result.message}

# 使用例
twin = SimpleReactorModel()
rto = RealTimeOptimizer(twin, objective='profit')

# 最適化実行
initial = [350, 5.0, 100]
bounds = [(340, 380), (4.0, 8.0), (80, 120)]
constraints = {
    'max_temp': 375,
    'max_pressure': 7.5
}

opt_result = rto.optimize(initial, bounds, constraints)

if opt_result['success']:
    print("Optimal Conditions:")
    print(f"  Temperature: {opt_result['optimal_conditions']['temp']:.1f}°C")
    print(f"  Pressure: {opt_result['optimal_conditions']['pressure']:.1f} bar")
    print(f"  Flow: {opt_result['optimal_conditions']['flow']:.1f} kg/h")
    print(f"\nExpected Performance:")
    print(f"  Yield: {opt_result['expected_performance']['yield']:.1f}%")
    print(f"  Quality: {opt_result['expected_performance']['quality']:.1f}")
    print(f"  Profit: {opt_result['profit']:.2f}")

4.5 モデル予測制御(MPC)統合

デジタルツインとMPCを統合し、予測ホライズンを考慮した最適制御を実現します。

class ModelPredictiveController:
    """モデル予測制御(簡易版)"""

    def __init__(self, digital_twin, horizon=10, dt=1.0):
        self.twin = digital_twin
        self.horizon = horizon  # 予測ホライズン
        self.dt = dt  # サンプリング時間

    def predict_trajectory(self, initial_state, control_sequence):
        """軌道予測

        Args:
            initial_state: 初期状態
            control_sequence: 制御入力系列
        """
        trajectory = [initial_state]
        state = initial_state.copy()

        for control in control_sequence:
            # 状態更新(簡易離散化モデル)
            state = self.twin.simulate({**state, **control})
            trajectory.append(state)

        return trajectory

    def optimize_control(self, current_state, setpoint):
        """制御入力最適化

        Args:
            current_state: 現在状態
            setpoint: 目標値 {'yield': 85, 'quality': 95}
        """
        def mpc_objective(u_flat):
            # 制御入力を再構成 [temp1, pres1, temp2, pres2, ...]
            controls = []
            for i in range(0, len(u_flat), 2):
                controls.append({
                    'temp': u_flat[i],
                    'pressure': u_flat[i+1]
                })

            # 軌道予測
            trajectory = self.predict_trajectory(current_state, controls)

            # コスト計算
            tracking_cost = 0
            control_cost = 0

            for state in trajectory[1:]:
                # トラッキング誤差
                tracking_cost += (state['yield'] - setpoint['yield'])**2
                tracking_cost += (state['quality'] - setpoint['quality'])**2

                # 制御コスト
                control_cost += 0.01 * state['cost']

            return tracking_cost + control_cost

        # 初期推定(現在値を維持)
        u0 = []
        for _ in range(self.horizon):
            u0.extend([current_state['temp'], current_state['pressure']])

        # 制約
        bounds = [(340, 380), (4.0, 8.0)] * self.horizon

        # 最適化
        result = minimize(
            mpc_objective,
            u0,
            method='L-BFGS-B',
            bounds=bounds
        )

        # 最初の制御入力のみ適用(リセディングホライズン)
        optimal_temp = result.x[0]
        optimal_pressure = result.x[1]

        return {
            'temp': optimal_temp,
            'pressure': optimal_pressure,
            'flow': current_state['flow']  # 流量は固定
        }

# 使用例
twin = SimpleReactorModel()
mpc = ModelPredictiveController(twin, horizon=5)

current = {'temp': 350, 'pressure': 5.0, 'flow': 100}
target = {'yield': 85, 'quality': 95}

optimal_control = mpc.optimize_control(current, target)

print("MPC Optimal Control:")
print(f"  Temperature setpoint: {optimal_control['temp']:.1f}°C")
print(f"  Pressure setpoint: {optimal_control['pressure']:.2f} bar")

# 制御適用後の予測
future_state = twin.simulate(optimal_control)
print(f"\nPredicted Performance:")
print(f"  Yield: {future_state['yield']:.1f}%")
print(f"  Quality: {future_state['quality']:.1f}")

4.6 多目的最適化

収率、品質、コスト、環境負荷など複数の目標を同時に考慮したパレート最適解を探索します。

from scipy.optimize import differential_evolution

class MultiObjectiveOptimizer:
    """多目的最適化"""

    def __init__(self, digital_twin):
        self.twin = digital_twin
        self.pareto_solutions = []

    def evaluate_objectives(self, x):
        """複数目的関数評価

        Returns:
            [yield, quality, cost, energy]
        """
        state = {'temp': x[0], 'pressure': x[1], 'flow': x[2]}
        result = self.twin.simulate(state)

        # エネルギー消費量推定
        energy = 0.5*x[0] + 20*x[1]**1.5  # 簡易モデル

        return {
            'yield': result['yield'],
            'quality': result['quality'],
            'cost': result['cost'],
            'energy': energy
        }

    def weighted_sum_method(self, bounds, weights):
        """重み付け和法

        Args:
            bounds: [(min, max), ...]
            weights: {'yield': w1, 'quality': w2, ...}
        """
        def objective(x):
            obj = self.evaluate_objectives(x)

            # 正規化(仮の最大値)
            yield_norm = obj['yield'] / 100
            quality_norm = obj['quality'] / 100
            cost_norm = obj['cost'] / 500
            energy_norm = obj['energy'] / 300

            # 重み付け和(コストとエネルギーは最小化)
            score = (weights.get('yield', 0) * yield_norm +
                    weights.get('quality', 0) * quality_norm -
                    weights.get('cost', 0) * cost_norm -
                    weights.get('energy', 0) * energy_norm)

            return -score  # 最大化→最小化

        result = differential_evolution(objective, bounds, seed=42)

        if result.success:
            optimal_x = result.x
            objectives = self.evaluate_objectives(optimal_x)

            return {
                'conditions': {
                    'temp': optimal_x[0],
                    'pressure': optimal_x[1],
                    'flow': optimal_x[2]
                },
                'objectives': objectives
            }
        return None

    def pareto_frontier(self, bounds, n_points=10):
        """パレートフロンティア探索"""
        pareto_solutions = []

        # 異なる重み設定で最適化
        for i in range(n_points):
            alpha = i / (n_points - 1)

            weights = {
                'yield': alpha,
                'quality': 1 - alpha,
                'cost': 0.5,
                'energy': 0.3
            }

            solution = self.weighted_sum_method(bounds, weights)
            if solution:
                pareto_solutions.append(solution)

        self.pareto_solutions = pareto_solutions
        return pareto_solutions

# 使用例
twin = SimpleReactorModel()
mo_optimizer = MultiObjectiveOptimizer(twin)

bounds = [(340, 380), (4.0, 8.0), (80, 120)]

# シナリオ1: 収率重視
solution1 = mo_optimizer.weighted_sum_method(
    bounds,
    {'yield': 1.0, 'quality': 0.3, 'cost': 0.5, 'energy': 0.2}
)

print("Scenario 1: Yield Priority")
print(f"  Conditions: {solution1['conditions']}")
print(f"  Yield: {solution1['objectives']['yield']:.1f}%")
print(f"  Quality: {solution1['objectives']['quality']:.1f}")
print(f"  Cost: {solution1['objectives']['cost']:.1f}")

# シナリオ2: 品質重視
solution2 = mo_optimizer.weighted_sum_method(
    bounds,
    {'yield': 0.3, 'quality': 1.0, 'cost': 0.5, 'energy': 0.2}
)

print("\nScenario 2: Quality Priority")
print(f"  Conditions: {solution2['conditions']}")
print(f"  Yield: {solution2['objectives']['yield']:.1f}%")
print(f"  Quality: {solution2['objectives']['quality']:.1f}")

4.7 リスク評価とシナリオプランニング

デジタルツインで外乱や故障のシナリオを評価し、リスクを定量化します。

import random

class RiskAssessmentSystem:
    """リスク評価システム"""

    def __init__(self, digital_twin):
        self.twin = digital_twin
        self.risk_scenarios = []

    def define_risk_scenario(self, name, disturbances, probability):
        """リスクシナリオ定義

        Args:
            name: シナリオ名
            disturbances: 外乱 {'temp': +10, ...}
            probability: 発生確率
        """
        self.risk_scenarios.append({
            'name': name,
            'disturbances': disturbances,
            'probability': probability
        })

    def monte_carlo_simulation(self, baseline_state, n_simulations=1000):
        """モンテカルロシミュレーション"""
        results = []

        for _ in range(n_simulations):
            # ランダムにシナリオ選択
            scenario = random.choices(
                self.risk_scenarios,
                weights=[s['probability'] for s in self.risk_scenarios]
            )[0]

            # 外乱適用
            disturbed_state = baseline_state.copy()
            for key, disturbance in scenario['disturbances'].items():
                disturbed_state[key] = disturbed_state.get(key, 0) + disturbance

            # シミュレーション
            result = self.twin.simulate(disturbed_state)
            result['scenario'] = scenario['name']
            results.append(result)

        return pd.DataFrame(results)

    def calculate_risk_metrics(self, simulation_results):
        """リスク指標計算"""
        # Value at Risk (VaR): 5%タイル値
        var_yield = simulation_results['yield'].quantile(0.05)
        var_quality = simulation_results['quality'].quantile(0.05)

        # 期待値
        expected_yield = simulation_results['yield'].mean()
        expected_quality = simulation_results['quality'].mean()

        # 標準偏差
        std_yield = simulation_results['yield'].std()
        std_quality = simulation_results['quality'].std()

        return {
            'expected_yield': expected_yield,
            'expected_quality': expected_quality,
            'var_yield_5%': var_yield,
            'var_quality_5%': var_quality,
            'std_yield': std_yield,
            'std_quality': std_quality
        }

# 使用例
twin = SimpleReactorModel()
risk_system = RiskAssessmentSystem(twin)

# リスクシナリオ定義
risk_system.define_risk_scenario(
    'Normal Operation',
    {'temp': 0, 'pressure': 0},
    probability=0.70
)

risk_system.define_risk_scenario(
    'Heat Exchanger Fouling',
    {'temp': -10},  # 冷却能力低下
    probability=0.15
)

risk_system.define_risk_scenario(
    'Feed Composition Shift',
    {'temp': 5, 'pressure': -0.5},
    probability=0.10
)

risk_system.define_risk_scenario(
    'Compressor Failure',
    {'pressure': -2.0},
    probability=0.05
)

# モンテカルロシミュレーション
baseline = {'temp': 360, 'pressure': 6.0, 'flow': 100}
mc_results = risk_system.monte_carlo_simulation(baseline, n_simulations=1000)

# リスク指標
risk_metrics = risk_system.calculate_risk_metrics(mc_results)

print("Risk Assessment Results:")
print(f"Expected Yield: {risk_metrics['expected_yield']:.2f}%")
print(f"Yield VaR (5%): {risk_metrics['var_yield_5%']:.2f}%")
print(f"Yield Std Dev: {risk_metrics['std_yield']:.2f}%")
print(f"\nExpected Quality: {risk_metrics['expected_quality']:.2f}")
print(f"Quality VaR (5%): {risk_metrics['var_quality_5%']:.2f}")

# シナリオ別集計
scenario_stats = mc_results.groupby('scenario')['yield'].agg(['mean', 'std', 'min'])
print("\nYield by Scenario:")
print(scenario_stats)
📊 産業実績:
  • Shell: デジタルツインとRTOで精製プロセスの収益を年間$数百万改善
  • BASF: 仮想最適化で新製品開発期間を30%短縮
  • Dow Chemical: 予知保全で計画外停止を40%削減

学習目標の確認

この章を完了すると、以下を説明・実装できるようになります:

基本理解

実践スキル

応用力

演習問題

Easy(基礎確認)

Q1: ソフトセンサーが特に有用なのはどのような場合ですか?

解答を見る

正解: 以下の場合にソフトセンサーが有用です:

  1. 測定装置が高コスト(質量分析計など)
  2. 測定に時間がかかる(ラボ分析が必要)
  3. オンライン測定が困難(製品品質など)
  4. センサーが過酷環境で使えない

解説: ソフトセンサーは測定可能な変数(温度、圧力など)から測定困難な変数を推定します。例えば、反応器内の成分組成をオンラインで測定するのは困難ですが、温度・圧力・流量から機械学習モデルで推定できます。

Medium(応用)

Q2: RTOとMPCの主な違いを3つ挙げてください。

解答を見る

正解:

項目 RTO(Real-Time Optimization) MPC(Model Predictive Control)
実行頻度 低頻度(数時間~日単位) 高頻度(秒~分単位)
目的 定常状態の経済最適化 動的制御と制約管理
出力 目標設定値(setpoints) 直接的な制御入力

解説: RTOは経済的最適化に焦点を当て、MPCはその目標値を達成するための動的制御を担当します。実務では両者を階層的に組み合わせることが多いです。

Hard(発展)

Q3: 提供されたWhatIfAnalyzerクラスを拡張し、不確実性を考慮したシナリオ分析を実装してください。各パラメータに±5%の変動を与え、100回のモンテカルロシミュレーションで結果の分布を評価するコードを書いてください。

解答例を見る
class UncertaintyWhatIfAnalyzer(WhatIfAnalyzer):
    """不確実性を考慮したWhat-if分析"""

    def create_uncertain_scenario(self, name, parameter_changes,
                                  uncertainty=0.05, n_sim=100):
        """不確実性を考慮したシナリオ分析

        Args:
            name: シナリオ名
            parameter_changes: 名目値
            uncertainty: 変動幅(±5% = 0.05)
            n_sim: シミュレーション回数
        """
        results = []

        for _ in range(n_sim):
            # パラメータに不確実性を追加
            uncertain_changes = {}
            for param, value in parameter_changes.items():
                noise = np.random.uniform(-uncertainty, uncertainty)
                uncertain_changes[param] = value * (1 + noise)

            # ベースライン取得
            baseline = self.model.get_current_state()
            modified_state = baseline.copy()
            modified_state.update(uncertain_changes)

            # シミュレーション
            result = self.model.simulate(modified_state)
            results.append(result)

        # 統計分析
        df = pd.DataFrame(results)

        return {
            'name': name,
            'nominal_changes': parameter_changes,
            'mean_yield': df['yield'].mean(),
            'std_yield': df['yield'].std(),
            'yield_5_percentile': df['yield'].quantile(0.05),
            'yield_95_percentile': df['yield'].quantile(0.95),
            'mean_quality': df['quality'].mean(),
            'std_quality': df['quality'].std()
        }

# 使用例
model = SimpleReactorModel()
uncertain_analyzer = UncertaintyWhatIfAnalyzer(model)

uncertain_result = uncertain_analyzer.create_uncertain_scenario(
    'High Temp with Uncertainty',
    {'temp': 370},
    uncertainty=0.05,
    n_sim=100
)

print(f"Scenario: {uncertain_result['name']}")
print(f"Mean Yield: {uncertain_result['mean_yield']:.2f}% "
      f"± {uncertain_result['std_yield']:.2f}%")
print(f"Yield 90% Confidence Interval: "
      f"[{uncertain_result['yield_5_percentile']:.2f}, "
      f"{uncertain_result['yield_95_percentile']:.2f}]%")

解説: この実装では、各パラメータに正規分布ノイズを加え、モンテカルロ法で結果の分布を評価しています。実務では、不確実性の定量化がリスク管理に不可欠です。

次のステップ

第4章では、デジタルツインを使った仮想最適化の各種手法を学びました。次章では、これらをどのように実環境にデプロイするか、本番運用のベストプラクティスを解説します。