第3章:リアルタイム最適化とAPC

化学プラントにおける経済的最適化とモデル予測制御の実装

📚 シリーズ:化学プラントへのAI応用
⏱️ 読了時間:40-50分
🎯 難易度:中級〜上級

この章で学ぶこと:

化学プラントの運転において、リアルタイム最適化(Real-Time Optimization, RTO)とAdvanced Process Control(APC)は経済性と安全性を両立させる重要技術です。本章では、SciPyやPyomoによる最適化、Model Predictive Control(MPC)の実装、さらに深層強化学習(DQN、PPO)を用いた次世代プロセス制御まで、実装レベルで習得します。

3.1 プロセス制御の階層構造

現代の化学プラントでは、制御システムが階層的に構成されています。各層は異なる時間スケールで動作し、上位層の最適化目標を下位層が実現します。

graph TB subgraph "階層型プロセス制御システム" RTO[リアルタイム最適化層
Real-Time Optimization
時間スケール: 数時間〜1日] APC[先進制御層
Advanced Process Control
時間スケール: 分〜時間] REG[制御層
Regulatory Control
時間スケール: 秒〜分] PROC[プロセス
化学プラント] end RTO -->|最適運転条件| APC APC -->|設定値| REG REG -->|操作量| PROC PROC -->|測定値| REG PROC -->|状態| APC PROC -->|経済性指標| RTO style RTO fill:#e3f2fd style APC fill:#fff3e0 style REG fill:#e8f5e9 style PROC fill:#f3e5f5
🎯 階層制御の実例

石油精製プラント(FCC装置)の場合:

この階層化により、プラント収益が年間数億円改善した実績があります。

3.2 オンライン最適化(SciPy)

リアルタイム最適化では、現在のプラント状態を入力として、経済的目標関数(利益最大化など)を満たす最適運転条件を計算します。連続攪拌槽反応器(CSTR)の運転最適化を例に実装します。

Example 1: オンライン最適化(SciPy) - CSTR運転条件の最適化
"""
===================================
Example 1: オンライン最適化(SciPy)
===================================

連続攪拌槽反応器(CSTR)における温度と流量の最適化。
反応速度と選択性のトレードオフを考慮し、単位時間あたりの収益を最大化する。

目的: 製品価値を最大化しながら、エネルギーコストと原料コストを最小化
"""

import numpy as np
from scipy.optimize import minimize, NonlinearConstraint
from typing import Dict, Tuple
import pandas as pd


class CSTROptimizer:
    """連続攪拌槽反応器のリアルタイム最適化"""

    def __init__(self):
        # プロセスパラメータ
        self.volume = 10.0  # 反応器容積 [m³]
        self.heat_capacity = 4.18  # 熱容量 [kJ/kg·K]
        self.density = 1000.0  # 密度 [kg/m³]

        # 反応速度定数 (Arrhenius式)
        self.A1 = 1.2e10  # 頻度因子(主反応) [1/h]
        self.E1 = 75000.0  # 活性化エネルギー(主反応) [J/mol]
        self.A2 = 3.5e9   # 頻度因子(副反応) [1/h]
        self.E2 = 68000.0  # 活性化エネルギー(副反応) [J/mol]

        # 経済パラメータ
        self.product_price = 150.0  # 製品価格 [$/kg]
        self.byproduct_price = 40.0  # 副生成物価格 [$/kg]
        self.feed_cost = 50.0  # 原料コスト [$/kg]
        self.energy_cost = 0.08  # エネルギーコスト [$/kWh]

        # 物理制約
        self.T_min, self.T_max = 320.0, 380.0  # 温度範囲 [K]
        self.F_min, self.F_max = 0.5, 5.0      # 流量範囲 [m³/h]
        self.T_feed = 298.0  # 原料温度 [K]

    def reaction_rates(self, T: float) -> Tuple[float, float]:
        """反応速度定数を計算(Arrhenius式)

        Args:
            T: 反応温度 [K]

        Returns:
            (主反応速度定数, 副反応速度定数) [1/h]
        """
        R = 8.314  # 気体定数 [J/mol·K]
        k1 = self.A1 * np.exp(-self.E1 / (R * T))
        k2 = self.A2 * np.exp(-self.E2 / (R * T))
        return k1, k2

    def conversion_selectivity(self, T: float, tau: float) -> Tuple[float, float]:
        """反応転化率と選択性を計算

        Args:
            T: 反応温度 [K]
            tau: 滞留時間 [h]

        Returns:
            (転化率, 選択性)
        """
        k1, k2 = self.reaction_rates(T)

        # 1次反応の転化率
        conversion = 1.0 - np.exp(-(k1 + k2) * tau)

        # 選択性(主生成物 / 全生成物)
        selectivity = k1 / (k1 + k2)

        return conversion, selectivity

    def heating_power(self, T: float, F: float) -> float:
        """加熱に必要な電力を計算

        Args:
            T: 反応温度 [K]
            F: 流量 [m³/h]

        Returns:
            加熱電力 [kW]
        """
        delta_T = T - self.T_feed
        mass_flow = F * self.density  # [kg/h]
        heat_duty = mass_flow * self.heat_capacity * delta_T  # [kJ/h]
        return heat_duty / 3600.0  # [kW]

    def objective(self, x: np.ndarray) -> float:
        """目的関数:利益の負値を計算(最小化問題に変換)

        Args:
            x: [温度 [K], 流量 [m³/h]]

        Returns:
            -利益 [$/h]
        """
        T, F = x
        tau = self.volume / F  # 滞留時間 [h]

        # 転化率と選択性
        conversion, selectivity = self.conversion_selectivity(T, tau)

        # 製品生成量
        feed_mass = F * self.density  # [kg/h]
        product_mass = feed_mass * conversion * selectivity
        byproduct_mass = feed_mass * conversion * (1 - selectivity)

        # 収益
        revenue = (product_mass * self.product_price +
                  byproduct_mass * self.byproduct_price)

        # コスト
        feed_cost_total = feed_mass * self.feed_cost
        energy_cost_total = self.heating_power(T, F) * self.energy_cost

        profit = revenue - feed_cost_total - energy_cost_total

        return -profit  # 最小化のため負値

    def optimize(self, initial_guess: np.ndarray = None) -> Dict:
        """最適化を実行

        Args:
            initial_guess: 初期推定値 [温度, 流量]

        Returns:
            最適化結果の辞書
        """
        if initial_guess is None:
            initial_guess = np.array([350.0, 2.0])  # [K, m³/h]

        # 境界制約
        bounds = [(self.T_min, self.T_max),
                 (self.F_min, self.F_max)]

        # 非線形制約:転化率が0.85以上(安全・品質要件)
        def conversion_constraint(x):
            T, F = x
            tau = self.volume / F
            conversion, _ = self.conversion_selectivity(T, tau)
            return conversion - 0.85

        nlc = NonlinearConstraint(conversion_constraint, 0, np.inf)

        # 最適化実行
        result = minimize(
            self.objective,
            initial_guess,
            method='SLSQP',
            bounds=bounds,
            constraints=[nlc],
            options={'ftol': 1e-6, 'disp': False}
        )

        # 結果を整理
        T_opt, F_opt = result.x
        tau_opt = self.volume / F_opt
        conversion, selectivity = self.conversion_selectivity(T_opt, tau_opt)

        return {
            'success': result.success,
            'temperature': T_opt,
            'flow_rate': F_opt,
            'residence_time': tau_opt,
            'conversion': conversion,
            'selectivity': selectivity,
            'profit_per_hour': -result.fun,
            'heating_power': self.heating_power(T_opt, F_opt)
        }


# ===================================
# 実行例
# ===================================
if __name__ == "__main__":
    optimizer = CSTROptimizer()

    print("="*70)
    print("CSTR リアルタイム最適化")
    print("="*70)

    # 最適化実行
    result = optimizer.optimize()

    if result['success']:
        print("\n【最適運転条件】")
        print(f"  反応温度: {result['temperature']:.1f} K ({result['temperature']-273.15:.1f} °C)")
        print(f"  流量: {result['flow_rate']:.2f} m³/h")
        print(f"  滞留時間: {result['residence_time']:.2f} h")
        print(f"\n【プロセス性能】")
        print(f"  転化率: {result['conversion']:.1%}")
        print(f"  選択性: {result['selectivity']:.1%}")
        print(f"  加熱電力: {result['heating_power']:.1f} kW")
        print(f"\n【経済性】")
        print(f"  利益: ${result['profit_per_hour']:.2f}/h")
        print(f"  年間利益: ${result['profit_per_hour'] * 8760:.0f}/year")
    else:
        print("最適化に失敗しました。")

    # 感度分析:原料価格の変動に対する最適条件の変化
    print("\n" + "="*70)
    print("感度分析:原料価格の影響")
    print("="*70)

    feed_costs = [40, 50, 60, 70]
    results = []

    for cost in feed_costs:
        optimizer.feed_cost = cost
        res = optimizer.optimize()
        results.append({
            '原料コスト [$/kg]': cost,
            '最適温度 [K]': res['temperature'],
            '最適流量 [m³/h]': res['flow_rate'],
            '時間利益 [$/h]': res['profit_per_hour']
        })

    df = pd.DataFrame(results)
    print(df.to_string(index=False))

    print("\n✓ 原料価格上昇時は高温・低流量に最適化(選択性重視)")

3.3 経済的最適化(Pyomo)

より複雑な最適化問題では、代数的モデリング言語Pyomoを使用します。製品価値の最大化とユーティリティコストの最小化を同時に考慮した定式化を実装します。

Example 2: 経済的最適化(Pyomo) - 製品価値最大化とコスト最小化
"""
===================================
Example 2: 経済的最適化(Pyomo)
===================================

Pyomoを用いた化学プラントの経済最適化。
複数の製品を持つプロセスにおいて、製品価値を最大化しながら
ユーティリティコスト(蒸気、電力、冷却水)を最小化する。

Pyomoは実際にはインストールが必要ですが、ここでは概念的な実装を示します。
"""

import numpy as np
from scipy.optimize import minimize
from typing import Dict, List
import pandas as pd


class EconomicOptimizer:
    """化学プラントの経済最適化システム"""

    def __init__(self):
        # 製品価格 [$/ton]
        self.product_prices = {
            'ProductA': 800.0,   # 高付加価値製品
            'ProductB': 500.0,   # 中間製品
            'ProductC': 300.0    # 汎用製品
        }

        # ユーティリティコスト
        self.steam_cost = 25.0    # 蒸気 [$/ton]
        self.power_cost = 0.10    # 電力 [$/kWh]
        self.cooling_cost = 0.5   # 冷却水 [$/ton]

        # プロセス制約パラメータ
        self.max_capacity = 100.0  # 最大処理能力 [ton/h]
        self.min_turndown = 0.4    # 最小負荷率

    def production_model(self, feed_rate: float, temperature: float,
                        pressure: float) -> Dict[str, float]:
        """生産モデル:操作条件から製品収率を計算

        Args:
            feed_rate: 原料流量 [ton/h]
            temperature: 反応温度 [K]
            pressure: 反応圧力 [bar]

        Returns:
            各製品の生産量 [ton/h]
        """
        # 簡略化した収率モデル(実際は詳細な反応モデルを使用)
        T_ref = 400.0  # 基準温度 [K]
        P_ref = 20.0   # 基準圧力 [bar]

        # 温度・圧力の影響因子
        temp_factor = np.exp(-0.005 * (temperature - T_ref)**2)
        press_factor = 1.0 + 0.02 * (pressure - P_ref)

        # 各製品の基準収率
        yield_A_base = 0.35 * temp_factor * press_factor
        yield_B_base = 0.45 * (2.0 - temp_factor)
        yield_C_base = 0.20

        # 収率制約(合計≤1.0)
        total_yield = yield_A_base + yield_B_base + yield_C_base
        if total_yield > 1.0:
            scale = 1.0 / total_yield
            yield_A_base *= scale
            yield_B_base *= scale
            yield_C_base *= scale

        return {
            'ProductA': feed_rate * yield_A_base,
            'ProductB': feed_rate * yield_B_base,
            'ProductC': feed_rate * yield_C_base
        }

    def utility_consumption(self, feed_rate: float, temperature: float,
                           pressure: float) -> Dict[str, float]:
        """ユーティリティ消費量を計算

        Args:
            feed_rate: 原料流量 [ton/h]
            temperature: 反応温度 [K]
            pressure: 反応圧力 [bar]

        Returns:
            ユーティリティ消費量
        """
        # 蒸気消費(加熱負荷に比例)
        T_feed = 298.0  # 原料温度 [K]
        heating_load = feed_rate * 2.5 * (temperature - T_feed)  # 簡略式
        steam = heating_load / 2000.0  # [ton/h]

        # 電力消費(圧縮機、ポンプ、攪拌)
        compressor_power = 50.0 * (pressure / 20.0)**0.8  # [kW]
        pump_power = 10.0 * feed_rate
        agitator_power = 15.0
        power = compressor_power + pump_power + agitator_power

        # 冷却水(反応熱除去)
        exothermic_heat = feed_rate * 500.0  # [kW] (仮定)
        cooling_water = exothermic_heat / 40.0  # [ton/h]

        return {
            'steam': steam,
            'power': power,
            'cooling': cooling_water
        }

    def objective_function(self, x: np.ndarray) -> float:
        """目的関数:利益の負値(最小化問題)

        Args:
            x: [feed_rate, temperature, pressure]

        Returns:
            -利益 [$/h]
        """
        feed_rate, temperature, pressure = x

        # 製品生産量
        products = self.production_model(feed_rate, temperature, pressure)

        # 収益
        revenue = sum(
            products[prod] * self.product_prices[prod]
            for prod in products
        )

        # ユーティリティ消費
        utilities = self.utility_consumption(feed_rate, temperature, pressure)

        # コスト
        utility_cost = (
            utilities['steam'] * self.steam_cost +
            utilities['power'] * self.power_cost +
            utilities['cooling'] * self.cooling_cost
        )

        # 原料コスト(仮定: $200/ton)
        feed_cost = feed_rate * 200.0

        profit = revenue - utility_cost - feed_cost

        return -profit

    def optimize_economics(self) -> Dict:
        """経済最適化を実行

        Returns:
            最適化結果
        """
        # 初期推定値: [feed_rate, temperature, pressure]
        x0 = np.array([60.0, 400.0, 20.0])

        # 境界制約
        bounds = [
            (self.max_capacity * self.min_turndown, self.max_capacity),  # feed_rate
            (350.0, 450.0),  # temperature [K]
            (10.0, 40.0)     # pressure [bar]
        ]

        # 最適化
        result = minimize(
            self.objective_function,
            x0,
            method='L-BFGS-B',
            bounds=bounds,
            options={'ftol': 1e-6}
        )

        feed_opt, temp_opt, press_opt = result.x

        # 結果を整理
        products = self.production_model(feed_opt, temp_opt, press_opt)
        utilities = self.utility_consumption(feed_opt, temp_opt, press_opt)

        return {
            'success': result.success,
            'feed_rate': feed_opt,
            'temperature': temp_opt,
            'pressure': press_opt,
            'products': products,
            'utilities': utilities,
            'profit_per_hour': -result.fun
        }


# ===================================
# 実行例
# ===================================
if __name__ == "__main__":
    optimizer = EconomicOptimizer()

    print("="*70)
    print("化学プラント経済最適化")
    print("="*70)

    result = optimizer.optimize_economics()

    if result['success']:
        print("\n【最適運転条件】")
        print(f"  原料流量: {result['feed_rate']:.1f} ton/h")
        print(f"  反応温度: {result['temperature']:.1f} K ({result['temperature']-273.15:.1f} °C)")
        print(f"  反応圧力: {result['pressure']:.1f} bar")

        print("\n【製品生産量】")
        for prod, amount in result['products'].items():
            price = optimizer.product_prices[prod]
            value = amount * price
            print(f"  {prod}: {amount:.2f} ton/h (価値: ${value:.2f}/h)")

        print("\n【ユーティリティ消費】")
        util = result['utilities']
        print(f"  蒸気: {util['steam']:.2f} ton/h (${util['steam'] * optimizer.steam_cost:.2f}/h)")
        print(f"  電力: {util['power']:.1f} kW (${util['power'] * optimizer.power_cost:.2f}/h)")
        print(f"  冷却水: {util['cooling']:.2f} ton/h (${util['cooling'] * optimizer.cooling_cost:.2f}/h)")

        print("\n【経済性】")
        print(f"  時間利益: ${result['profit_per_hour']:.2f}/h")
        print(f"  日間利益: ${result['profit_per_hour'] * 24:.2f}/day")
        print(f"  年間利益: ${result['profit_per_hour'] * 8760:.0f}/year")

    # 製品価格の変動に対する感度分析
    print("\n" + "="*70)
    print("感度分析:製品A価格の影響")
    print("="*70)

    original_price = optimizer.product_prices['ProductA']
    price_scenarios = [600, 700, 800, 900, 1000]
    results_table = []

    for price in price_scenarios:
        optimizer.product_prices['ProductA'] = price
        res = optimizer.optimize_economics()
        results_table.append({
            '製品A価格 [$/ton]': price,
            '最適流量 [ton/h]': res['feed_rate'],
            '最適温度 [K]': res['temperature'],
            '製品A生産量 [ton/h]': res['products']['ProductA'],
            '時間利益 [$/h]': res['profit_per_hour']
        })

    df = pd.DataFrame(results_table)
    print(df.to_string(index=False))

    print("\n✓ 高付加価値製品の価格上昇時、高温運転で収率を最大化")

3.4 モデル予測制御(MPC)

Model Predictive Control(MPC)は、プロセスの動的モデルを用いて未来の挙動を予測し、制約条件を満たしながら操作量を最適化する先進制御手法です。蒸留塔の温度制御を例に基礎実装を示します。

Example 3: MPCの基礎実装 - 蒸留塔温度制御
"""
===================================
Example 3: MPCの基礎実装
===================================

蒸留塔における温度制御にModel Predictive Control(MPC)を適用。
未来の挙動を予測しながら、リフラックス比と還流量を最適化。

MPCの特長:
- 多変数制御(複数の操作変数・制御変数)
- 制約条件の明示的な考慮
- 外乱の予測と補償
"""

import numpy as np
from scipy.optimize import minimize
from typing import List, Tuple
import matplotlib.pyplot as plt


class DistillationMPC:
    """蒸留塔のモデル予測制御"""

    def __init__(self, prediction_horizon: int = 10, control_horizon: int = 5):
        """
        Args:
            prediction_horizon: 予測ホライズン(ステップ数)
            control_horizon: 制御ホライズン(ステップ数)
        """
        self.Np = prediction_horizon
        self.Nc = control_horizon
        self.dt = 1.0  # サンプリング時間 [min]

        # プロセスモデル(状態空間モデル)
        # x[k+1] = A*x[k] + B*u[k]
        # y[k] = C*x[k]
        # 状態: x = [塔頂温度偏差, 塔底温度偏差]
        # 入力: u = [リフラックス比変化, 還流量変化]
        # 出力: y = [塔頂温度, 塔底温度]

        self.A = np.array([
            [0.85, 0.10],
            [0.05, 0.90]
        ])
        self.B = np.array([
            [0.5, 0.1],
            [0.1, 0.4]
        ])
        self.C = np.eye(2)

        # 制約条件
        self.u_min = np.array([-2.0, -5.0])  # [リフラックス比, 還流量 kg/min]
        self.u_max = np.array([2.0, 5.0])
        self.delta_u_max = np.array([0.5, 1.0])  # 変化率制約

        # 重み行列
        self.Q = np.diag([10.0, 8.0])   # 出力追従の重み
        self.R = np.diag([1.0, 1.0])    # 入力変化の重み

    def predict(self, x0: np.ndarray, u_sequence: np.ndarray) -> np.ndarray:
        """予測ホライズンにわたる出力を予測

        Args:
            x0: 現在の状態
            u_sequence: 入力系列 (Nc x 2)

        Returns:
            予測出力系列 (Np x 2)
        """
        x = x0.copy()
        y_pred = np.zeros((self.Np, 2))

        for k in range(self.Np):
            # 制御ホライズン内は最適化変数、それ以降は最後の値を保持
            if k < self.Nc:
                u = u_sequence[k]
            else:
                u = u_sequence[-1]

            # 状態更新
            x = self.A @ x + self.B @ u

            # 出力計算
            y_pred[k] = self.C @ x

        return y_pred

    def mpc_objective(self, u_flat: np.ndarray, x0: np.ndarray,
                     r: np.ndarray, u_prev: np.ndarray) -> float:
        """MPC目的関数

        Args:
            u_flat: 平坦化された入力系列 (Nc*2,)
            x0: 現在の状態
            r: 目標値系列 (Np x 2)
            u_prev: 前ステップの入力

        Returns:
            評価関数値
        """
        # 入力系列を再構成
        u_sequence = u_flat.reshape(self.Nc, 2)

        # 予測
        y_pred = self.predict(x0, u_sequence)

        # 追従誤差
        tracking_error = 0.0
        for k in range(self.Np):
            e = y_pred[k] - r[k]
            tracking_error += e.T @ self.Q @ e

        # 入力変化ペナルティ
        control_effort = 0.0
        for k in range(self.Nc):
            if k == 0:
                du = u_sequence[k] - u_prev
            else:
                du = u_sequence[k] - u_sequence[k-1]
            control_effort += du.T @ self.R @ du

        return tracking_error + control_effort

    def solve(self, x0: np.ndarray, r: np.ndarray,
             u_prev: np.ndarray) -> np.ndarray:
        """MPC最適化問題を解く

        Args:
            x0: 現在の状態
            r: 目標値系列 (Np x 2)
            u_prev: 前ステップの入力

        Returns:
            最適入力系列の最初の値 (2,)
        """
        # 初期推定値
        u0_flat = np.zeros(self.Nc * 2)

        # 境界制約
        bounds = []
        for _ in range(self.Nc):
            bounds.extend([
                (self.u_min[0], self.u_max[0]),
                (self.u_min[1], self.u_max[1])
            ])

        # 変化率制約
        def delta_u_constraint(u_flat):
            u_seq = u_flat.reshape(self.Nc, 2)
            violations = []
            for k in range(self.Nc):
                if k == 0:
                    du = np.abs(u_seq[k] - u_prev)
                else:
                    du = np.abs(u_seq[k] - u_seq[k-1])
                violations.extend((self.delta_u_max - du).tolist())
            return np.array(violations)

        from scipy.optimize import NonlinearConstraint
        nlc = NonlinearConstraint(delta_u_constraint, 0, np.inf)

        # 最適化
        result = minimize(
            lambda u: self.mpc_objective(u, x0, r, u_prev),
            u0_flat,
            method='SLSQP',
            bounds=bounds,
            constraints=[nlc],
            options={'ftol': 1e-4, 'disp': False}
        )

        u_opt = result.x.reshape(self.Nc, 2)
        return u_opt[0]  # 最初のステップのみ実行


# ===================================
# シミュレーション実行
# ===================================
if __name__ == "__main__":
    mpc = DistillationMPC(prediction_horizon=10, control_horizon=5)

    # シミュレーション設定
    T_sim = 50  # シミュレーション時間 [min]
    x = np.zeros(2)  # 初期状態(目標値からの偏差)
    u = np.zeros(2)  # 初期入力

    # 目標値の変化(ステップ変化)
    r_top = np.zeros(T_sim)
    r_bottom = np.zeros(T_sim)
    r_top[10:] = -1.5  # 10分後に塔頂温度を1.5℃下げる
    r_bottom[30:] = 1.0  # 30分後に塔底温度を1.0℃上げる

    # 記録用
    x_history = [x.copy()]
    u_history = [u.copy()]

    print("="*70)
    print("蒸留塔MPC制御シミュレーション")
    print("="*70)

    for k in range(T_sim):
        # 目標値系列(予測ホライズン分)
        r_horizon = np.zeros((mpc.Np, 2))
        for i in range(mpc.Np):
            if k + i < T_sim:
                r_horizon[i] = [r_top[k+i], r_bottom[k+i]]
            else:
                r_horizon[i] = [r_top[-1], r_bottom[-1]]

        # MPCで最適入力を計算
        u = mpc.solve(x, r_horizon, u)

        # プロセス更新(外乱を加える)
        disturbance = np.random.randn(2) * 0.05
        x = mpc.A @ x + mpc.B @ u + disturbance

        # 記録
        x_history.append(x.copy())
        u_history.append(u.copy())

        if k % 10 == 0:
            print(f"時刻 {k:2d}分: 塔頂偏差={x[0]:+.2f}℃, 塔底偏差={x[1]:+.2f}℃, "
                  f"リフラックス={u[0]:+.2f}, 還流={u[1]:+.2f} kg/min")

    # 結果を配列に変換
    x_history = np.array(x_history)
    u_history = np.array(u_history)

    print("\n" + "="*70)
    print("制御性能評価")
    print("="*70)

    # 定常偏差
    steady_state_error_top = np.abs(x_history[-10:, 0] - r_top[-1]).mean()
    steady_state_error_bottom = np.abs(x_history[-10:, 1] - r_bottom[-1]).mean()

    print(f"塔頂温度の定常偏差: {steady_state_error_top:.3f} ℃")
    print(f"塔底温度の定常偏差: {steady_state_error_bottom:.3f} ℃")
    print(f"\n✓ MPCにより制約条件下で高精度な温度制御を達成")
    print(f"✓ 複数の設定値変更に対しても良好な追従性能")

3.5 非線形MPC(CasADi)

非線形プロセスに対しては、非線形MPCが必要です。CasADiを用いた反応器の非線形MPC実装を示します(概念的実装)。

Example 4: 非線形MPC(CasADi) - 反応器の非線形制御
"""
===================================
Example 4: 非線形MPC(CasADi)
===================================

CasADiを用いた非線形モデル予測制御。
連続攪拌槽反応器(CSTR)の濃度と温度を同時制御。

非線形プロセスモデル:
- 質量バランス: dC/dt = (C_in - C)/tau - k*C
- エネルギーバランス: dT/dt = (T_in - T)/tau + (-ΔH)*k*C/(ρ*Cp) + Q/(V*ρ*Cp)

CasADiは実際にはインストールが必要ですが、ここでは数値的な実装で代替します。
"""

import numpy as np
from scipy.integrate import odeint
from scipy.optimize import minimize
from typing import Tuple


class NonlinearCSTRMPC:
    """非線形CSTR反応器のMPC"""

    def __init__(self, prediction_horizon: int = 20, control_horizon: int = 10):
        self.Np = prediction_horizon
        self.Nc = control_horizon
        self.dt = 0.5  # サンプリング時間 [min]

        # プロセスパラメータ
        self.V = 1.0  # 反応器容積 [m³]
        self.rho = 1000.0  # 密度 [kg/m³]
        self.Cp = 4.18  # 比熱 [kJ/kg·K]
        self.delta_H = -50000.0  # 反応熱 [kJ/kmol]

        # Arrhenius反応速度
        self.A = 1.0e10  # 頻度因子 [1/min]
        self.Ea = 75000.0  # 活性化エネルギー [J/mol]
        self.R = 8.314  # 気体定数 [J/mol·K]

        # 操作変数の制約
        self.F_min, self.F_max = 0.02, 0.20  # 流量 [m³/min]
        self.Q_min, self.Q_max = -5000, 5000  # 加熱・冷却 [kJ/min]

        # 入力
        self.C_in = 10.0  # 入口濃度 [kmol/m³]
        self.T_in = 300.0  # 入口温度 [K]

    def reaction_rate(self, C: float, T: float) -> float:
        """反応速度定数(Arrhenius式)"""
        return self.A * np.exp(-self.Ea / (self.R * T)) * C

    def cstr_model(self, state: np.ndarray, t: float,
                   F: float, Q: float) -> np.ndarray:
        """CSTR反応器の微分方程式

        Args:
            state: [濃度 C, 温度 T]
            t: 時間
            F: 流量 [m³/min]
            Q: 加熱量 [kJ/min]

        Returns:
            [dC/dt, dT/dt]
        """
        C, T = state
        tau = self.V / F  # 滞留時間

        # 反応速度
        r = self.reaction_rate(C, T)

        # 質量バランス
        dC_dt = (self.C_in - C) / tau - r

        # エネルギーバランス
        heat_reaction = (-self.delta_H) * r / (self.rho * self.Cp)
        heat_exchange = Q / (self.V * self.rho * self.Cp)
        dT_dt = (self.T_in - T) / tau + heat_reaction + heat_exchange

        return np.array([dC_dt, dT_dt])

    def simulate_step(self, state: np.ndarray,
                     F: float, Q: float) -> np.ndarray:
        """1ステップ分のシミュレーション

        Args:
            state: [C, T]
            F: 流量
            Q: 加熱量

        Returns:
            次のステップの状態
        """
        t_span = [0, self.dt]
        result = odeint(self.cstr_model, state, t_span, args=(F, Q))
        return result[-1]

    def predict_trajectory(self, state0: np.ndarray,
                          u_sequence: np.ndarray) -> np.ndarray:
        """予測軌道を計算

        Args:
            state0: 初期状態 [C, T]
            u_sequence: 入力系列 (Nc x 2) [F, Q]

        Returns:
            予測状態系列 (Np x 2)
        """
        state = state0.copy()
        trajectory = np.zeros((self.Np, 2))

        for k in range(self.Np):
            if k < self.Nc:
                F, Q = u_sequence[k]
            else:
                F, Q = u_sequence[-1]

            state = self.simulate_step(state, F, Q)
            trajectory[k] = state

        return trajectory

    def mpc_objective(self, u_flat: np.ndarray, state0: np.ndarray,
                     setpoint: np.ndarray) -> float:
        """非線形MPC目的関数

        Args:
            u_flat: 平坦化された入力系列
            state0: 現在の状態
            setpoint: 目標値 [C_sp, T_sp]

        Returns:
            評価関数値
        """
        u_sequence = u_flat.reshape(self.Nc, 2)

        # 予測軌道
        trajectory = self.predict_trajectory(state0, u_sequence)

        # 追従誤差(重み付き二乗和)
        Q = np.diag([10.0, 5.0])  # 濃度と温度の重み
        tracking_cost = 0.0
        for k in range(self.Np):
            error = trajectory[k] - setpoint
            tracking_cost += error.T @ Q @ error

        # 入力変化のペナルティ
        R = np.diag([100.0, 0.01])  # 流量と加熱量の重み
        control_cost = 0.0
        for k in range(self.Nc):
            if k > 0:
                du = u_sequence[k] - u_sequence[k-1]
                control_cost += du.T @ R @ du

        return tracking_cost + control_cost

    def solve(self, state0: np.ndarray, setpoint: np.ndarray,
             u_prev: np.ndarray) -> np.ndarray:
        """非線形MPC最適化

        Args:
            state0: 現在の状態
            setpoint: 目標値
            u_prev: 前ステップの入力

        Returns:
            最適入力
        """
        # 初期推定値(前回の入力を保持)
        u0_flat = np.tile(u_prev, self.Nc)

        # 境界制約
        bounds = []
        for _ in range(self.Nc):
            bounds.append((self.F_min, self.F_max))
            bounds.append((self.Q_min, self.Q_max))

        # 最適化
        result = minimize(
            lambda u: self.mpc_objective(u, state0, setpoint),
            u0_flat,
            method='L-BFGS-B',
            bounds=bounds,
            options={'ftol': 1e-3, 'maxiter': 50}
        )

        u_opt = result.x.reshape(self.Nc, 2)
        return u_opt[0]


# ===================================
# シミュレーション実行
# ===================================
if __name__ == "__main__":
    mpc = NonlinearCSTRMPC(prediction_horizon=20, control_horizon=10)

    # 初期状態
    state = np.array([2.0, 350.0])  # [C=2 kmol/m³, T=350 K]
    u = np.array([0.1, 0.0])  # [F=0.1 m³/min, Q=0 kJ/min]

    # 目標値
    C_setpoint = 5.0  # kmol/m³
    T_setpoint = 360.0  # K
    setpoint = np.array([C_setpoint, T_setpoint])

    # シミュレーション
    T_sim = 100  # ステップ数
    state_history = [state.copy()]
    u_history = [u.copy()]

    print("="*70)
    print("非線形CSTR反応器のMPC制御")
    print("="*70)
    print(f"目標値: C={C_setpoint} kmol/m³, T={T_setpoint} K\n")

    for k in range(T_sim):
        # 外乱(入口濃度の変動)
        if k == 40:
            mpc.C_in = 12.0  # 濃度が上昇
            print(f"時刻 {k*mpc.dt:.1f}分: 外乱発生(入口濃度 10→12 kmol/m³)\n")

        # MPCで最適入力を計算
        u = mpc.solve(state, setpoint, u)

        # プロセス更新
        state = mpc.simulate_step(state, u[0], u[1])

        # ノイズ追加
        state += np.random.randn(2) * [0.05, 0.5]

        # 記録
        state_history.append(state.copy())
        u_history.append(u.copy())

        if k % 20 == 0:
            print(f"時刻 {k*mpc.dt:5.1f}分: C={state[0]:.2f} kmol/m³, T={state[1]:.1f} K, "
                  f"F={u[0]:.3f} m³/min, Q={u[1]:+6.1f} kJ/min")

    state_history = np.array(state_history)
    u_history = np.array(u_history)

    print("\n" + "="*70)
    print("制御性能")
    print("="*70)

    # 定常状態での評価(最後の20ステップ)
    C_error = np.abs(state_history[-20:, 0] - C_setpoint).mean()
    T_error = np.abs(state_history[-20:, 1] - T_setpoint).mean()

    print(f"濃度誤差(定常): {C_error:.3f} kmol/m³")
    print(f"温度誤差(定常): {T_error:.3f} K")
    print(f"\n✓ 非線形プロセスに対しても高精度な制御を実現")
    print(f"✓ 外乱に対する迅速なリジェクション")

3.6 深層強化学習による制御

強化学習は、試行錯誤を通じて最適な制御方策を学習します。Deep Q-Network(DQN)を用いたバッチ反応器の軌道最適化を実装します。

Example 5: DQNによるバッチプロセス制御 - バッチ反応器軌道最適化
"""
===================================
Example 5: DQNによるバッチプロセス制御
===================================

Deep Q-Network(DQN)を用いたバッチ反応器の温度軌道最適化。
目標: 製品純度を最大化しながら、バッチ時間を最小化する。

強化学習の定式化:
- 状態: [濃度A, 濃度B, 温度, 経過時間]
- 行動: 温度の増減(離散化)
- 報酬: 製品収率 - 時間ペナルティ
"""

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
from typing import Tuple, List


class BatchReactorEnv:
    """バッチ反応器環境"""

    def __init__(self):
        self.dt = 1.0  # 時間ステップ [min]
        self.max_time = 120.0  # 最大バッチ時間 [min]

        # 反応速度パラメータ
        self.A1 = 1.0e8  # A→B(目的反応)
        self.E1 = 70000.0
        self.A2 = 5.0e7  # B→C(副反応)
        self.E2 = 65000.0
        self.R = 8.314

        # 温度制約
        self.T_min, self.T_max = 320.0, 380.0  # [K]

        self.reset()

    def reset(self) -> np.ndarray:
        """環境をリセット"""
        self.C_A = 10.0  # 初期濃度A [mol/L]
        self.C_B = 0.0   # 初期濃度B [mol/L]
        self.C_C = 0.0   # 初期濃度C [mol/L]
        self.T = 340.0   # 初期温度 [K]
        self.time = 0.0

        return self._get_state()

    def _get_state(self) -> np.ndarray:
        """状態ベクトルを取得"""
        return np.array([
            self.C_A / 10.0,   # 正規化
            self.C_B / 10.0,
            (self.T - 350.0) / 30.0,
            self.time / self.max_time
        ], dtype=np.float32)

    def step(self, action: int) -> Tuple[np.ndarray, float, bool]:
        """1ステップ実行

        Args:
            action: 0=冷却, 1=保持, 2=加熱

        Returns:
            (next_state, reward, done)
        """
        # 温度変化
        delta_T = [-2.0, 0.0, 2.0][action]
        self.T = np.clip(self.T + delta_T, self.T_min, self.T_max)

        # 反応速度定数
        k1 = self.A1 * np.exp(-self.E1 / (self.R * self.T))
        k2 = self.A2 * np.exp(-self.E2 / (self.R * self.T))

        # 濃度更新(1次反応)
        dC_A = -k1 * self.C_A * self.dt
        dC_B = (k1 * self.C_A - k2 * self.C_B) * self.dt
        dC_C = k2 * self.C_B * self.dt

        self.C_A += dC_A
        self.C_B += dC_B
        self.C_C += dC_C

        self.time += self.dt

        # 報酬の計算
        # 製品B濃度を最大化(目標: 8 mol/L以上)
        product_reward = self.C_B

        # 副生成物Cにペナルティ
        byproduct_penalty = -0.5 * self.C_C

        # 時間ペナルティ(早く終わらせたい)
        time_penalty = -0.01 * self.time

        reward = product_reward + byproduct_penalty + time_penalty

        # 終了条件
        done = (self.time >= self.max_time) or (self.C_A < 0.5)

        # ボーナス: 目標達成時
        if done and self.C_B >= 7.5:
            reward += 50.0

        return self._get_state(), reward, done


class DQN(nn.Module):
    """Deep Q-Network"""

    def __init__(self, state_dim: int, action_dim: int):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, action_dim)
        )

    def forward(self, x):
        return self.fc(x)


class DQNAgent:
    """DQNエージェント"""

    def __init__(self, state_dim: int, action_dim: int):
        self.state_dim = state_dim
        self.action_dim = action_dim

        self.q_network = DQN(state_dim, action_dim)
        self.target_network = DQN(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())

        self.optimizer = optim.Adam(self.q_network.parameters(), lr=1e-3)
        self.memory = deque(maxlen=10000)
        self.batch_size = 64
        self.gamma = 0.99

        self.epsilon = 1.0
        self.epsilon_min = 0.05
        self.epsilon_decay = 0.995

    def select_action(self, state: np.ndarray) -> int:
        """ε-greedy行動選択"""
        if random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)

        with torch.no_grad():
            state_t = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.q_network(state_t)
            return q_values.argmax().item()

    def store_transition(self, state, action, reward, next_state, done):
        """経験を保存"""
        self.memory.append((state, action, reward, next_state, done))

    def train(self):
        """ネットワークを訓練"""
        if len(self.memory) < self.batch_size:
            return

        # バッチサンプリング
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(np.array(states))
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(np.array(next_states))
        dones = torch.FloatTensor(dones)

        # 現在のQ値
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1))

        # 目標Q値
        with torch.no_grad():
            next_q = self.target_network(next_states).max(1)[0]
            target_q = rewards + self.gamma * next_q * (1 - dones)

        # 損失計算と更新
        loss = nn.MSELoss()(current_q.squeeze(), target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # ε減衰
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def update_target_network(self):
        """ターゲットネットワークを更新"""
        self.target_network.load_state_dict(self.q_network.state_dict())


# ===================================
# 学習実行
# ===================================
if __name__ == "__main__":
    env = BatchReactorEnv()
    agent = DQNAgent(state_dim=4, action_dim=3)

    num_episodes = 200
    rewards_history = []

    print("="*70)
    print("DQNによるバッチ反応器制御の学習")
    print("="*70)

    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        done = False

        while not done:
            action = agent.select_action(state)
            next_state, reward, done = env.step(action)

            agent.store_transition(state, action, reward, next_state, done)
            agent.train()

            state = next_state
            episode_reward += reward

        rewards_history.append(episode_reward)

        # ターゲットネットワーク更新
        if episode % 10 == 0:
            agent.update_target_network()

        if episode % 20 == 0:
            avg_reward = np.mean(rewards_history[-20:])
            print(f"Episode {episode:3d}: 平均報酬={avg_reward:6.2f}, "
                  f"ε={agent.epsilon:.3f}, 最終C_B={env.C_B:.2f} mol/L")

    print("\n" + "="*70)
    print("学習完了 - テスト実行")
    print("="*70)

    # テスト実行(貪欲方策)
    agent.epsilon = 0.0
    state = env.reset()
    done = False

    trajectory = []
    while not done:
        action = agent.select_action(state)
        trajectory.append({
            'time': env.time,
            'C_A': env.C_A,
            'C_B': env.C_B,
            'C_C': env.C_C,
            'T': env.T,
            'action': ['冷却', '保持', '加熱'][action]
        })
        next_state, reward, done = env.step(action)
        state = next_state

    print(f"\nバッチ完了時間: {env.time:.1f} min")
    print(f"最終製品濃度 C_B: {env.C_B:.2f} mol/L")
    print(f"副生成物 C_C: {env.C_C:.2f} mol/L")
    print(f"選択性: {env.C_B / (env.C_B + env.C_C):.1%}")

    print("\n最適温度軌道(最初の10ステップ):")
    for i in range(min(10, len(trajectory))):
        t = trajectory[i]
        print(f"  {t['time']:5.1f}分: T={t['T']:.1f}K, C_B={t['C_B']:.2f}, 操作={t['action']}")

    print("\n✓ DQNによりバッチ時間と製品純度を両立する最適軌道を学習")

3.7 連続プロセスへの強化学習適用

連続値の行動空間を持つプロセスには、Proximal Policy Optimization(PPO)が有効です。CSTR制御への適用例を示します。

Example 6: PPOによる連続プロセス制御 - CSTR温度・流量制御
"""
===================================
Example 6: PPOによる連続プロセス制御
===================================

Proximal Policy Optimization(PPO)を用いた連続攪拌槽反応器の制御。
連続値の行動空間(温度、流量)を扱うため、Actor-Criticアーキテクチャを使用。

目標: 製品濃度を目標値に維持しながら、エネルギー消費を最小化
"""

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal
from typing import Tuple


class CSTREnv:
    """連続攪拌槽反応器環境"""

    def __init__(self):
        self.dt = 0.5  # サンプリング時間 [min]
        self.V = 1.0   # 反応器容積 [m³]

        # 反応速度パラメータ
        self.A = 1.0e9
        self.Ea = 72000.0
        self.R = 8.314

        # 目標値
        self.C_target = 3.0  # 目標製品濃度 [mol/L]

        self.reset()

    def reset(self) -> np.ndarray:
        """環境をリセット"""
        self.C = 2.0 + np.random.randn() * 0.5  # 濃度 [mol/L]
        self.T = 350.0 + np.random.randn() * 5.0  # 温度 [K]
        self.C_in = 8.0  # 入口濃度 [mol/L]

        return self._get_state()

    def _get_state(self) -> np.ndarray:
        """状態ベクトル"""
        return np.array([
            (self.C - self.C_target) / self.C_target,  # 濃度偏差(正規化)
            (self.T - 350.0) / 30.0  # 温度偏差(正規化)
        ], dtype=np.float32)

    def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool]:
        """1ステップ実行

        Args:
            action: [流量変化率, 温度変化](-1〜1に正規化)

        Returns:
            (next_state, reward, done)
        """
        # 行動を実際の物理量に変換
        F = 0.1 + 0.05 * action[0]  # 流量 0.05〜0.15 [m³/min]
        delta_T = 5.0 * action[1]    # 温度変化 -5〜+5 [K]

        self.T = np.clip(self.T + delta_T, 320.0, 380.0)

        # 反応速度定数
        k = self.A * np.exp(-self.Ea / (self.R * self.T))

        # 濃度更新(CSTR物質収支)
        tau = self.V / F
        dC = ((self.C_in - self.C) / tau - k * self.C) * self.dt
        self.C += dC

        # ノイズ(外乱)
        self.C += np.random.randn() * 0.05
        self.T += np.random.randn() * 1.0

        # 報酬の計算
        # 1. 濃度追従(主目的)
        error = abs(self.C - self.C_target)
        tracking_reward = -10.0 * error

        # 2. エネルギーペナルティ(加熱コスト)
        heating_cost = -0.01 * abs(delta_T)

        # 3. 流量変化ペナルティ(スムーズな操作)
        flow_penalty = -0.1 * abs(action[0])

        reward = tracking_reward + heating_cost + flow_penalty

        # ボーナス: 目標範囲内(±0.2 mol/L)
        if error < 0.2:
            reward += 5.0

        done = False  # 連続プロセスなので終了なし

        return self._get_state(), reward, done


class ActorCritic(nn.Module):
    """Actor-Criticネットワーク"""

    def __init__(self, state_dim: int, action_dim: int):
        super(ActorCritic, self).__init__()

        # 共有層
        self.shared = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.Tanh(),
            nn.Linear(64, 64),
            nn.Tanh()
        )

        # Actor(方策)
        self.actor_mean = nn.Linear(64, action_dim)
        self.actor_log_std = nn.Parameter(torch.zeros(action_dim))

        # Critic(価値関数)
        self.critic = nn.Linear(64, 1)

    def forward(self, state):
        """順伝播"""
        shared_features = self.shared(state)

        # Actor: 正規分布のパラメータ
        action_mean = torch.tanh(self.actor_mean(shared_features))
        action_std = torch.exp(self.actor_log_std)

        # Critic: 状態価値
        value = self.critic(shared_features)

        return action_mean, action_std, value

    def get_action(self, state):
        """行動をサンプリング"""
        action_mean, action_std, value = self.forward(state)
        dist = Normal(action_mean, action_std)
        action = dist.sample()
        log_prob = dist.log_prob(action).sum(-1)

        return action, log_prob, value


class PPOAgent:
    """PPOエージェント"""

    def __init__(self, state_dim: int, action_dim: int):
        self.ac = ActorCritic(state_dim, action_dim)
        self.optimizer = optim.Adam(self.ac.parameters(), lr=3e-4)

        self.gamma = 0.99
        self.lam = 0.95  # GAE lambda
        self.clip_epsilon = 0.2
        self.epochs = 10

    def compute_gae(self, rewards, values, dones):
        """Generalized Advantage Estimation"""
        advantages = []
        gae = 0

        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_value = 0
            else:
                next_value = values[t + 1]

            delta = rewards[t] + self.gamma * next_value - values[t]
            gae = delta + self.gamma * self.lam * gae
            advantages.insert(0, gae)

        return advantages

    def update(self, states, actions, old_log_probs, returns, advantages):
        """PPO更新"""
        states = torch.FloatTensor(np.array(states))
        actions = torch.FloatTensor(np.array(actions))
        old_log_probs = torch.FloatTensor(old_log_probs)
        returns = torch.FloatTensor(returns)
        advantages = torch.FloatTensor(advantages)

        # 正規化
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        for _ in range(self.epochs):
            action_mean, action_std, values = self.ac(states)
            dist = Normal(action_mean, action_std)
            new_log_probs = dist.log_prob(actions).sum(-1)

            # Importance sampling ratio
            ratio = torch.exp(new_log_probs - old_log_probs)

            # Clipped surrogate objective
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()

            # Value loss
            critic_loss = nn.MSELoss()(values.squeeze(), returns)

            # 合計損失
            loss = actor_loss + 0.5 * critic_loss

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()


# ===================================
# 学習実行
# ===================================
if __name__ == "__main__":
    env = CSTREnv()
    agent = PPOAgent(state_dim=2, action_dim=2)

    num_episodes = 300
    steps_per_episode = 200

    print("="*70)
    print("PPOによるCSTR連続制御の学習")
    print("="*70)

    for episode in range(num_episodes):
        state = env.reset()

        states, actions, log_probs, rewards, values = [], [], [], [], []

        for step in range(steps_per_episode):
            state_t = torch.FloatTensor(state)
            action, log_prob, value = agent.ac.get_action(state_t.unsqueeze(0))

            action_np = action.squeeze().detach().numpy()
            next_state, reward, done = env.step(action_np)

            states.append(state)
            actions.append(action_np)
            log_probs.append(log_prob.item())
            rewards.append(reward)
            values.append(value.item())

            state = next_state

        # GAE計算
        advantages = agent.compute_gae(rewards, values, [False] * len(rewards))
        returns = [adv + val for adv, val in zip(advantages, values)]

        # PPO更新
        agent.update(states, actions, log_probs, returns, advantages)

        if episode % 30 == 0:
            avg_reward = np.mean(rewards)
            final_error = abs(env.C - env.C_target)
            print(f"Episode {episode:3d}: 平均報酬={avg_reward:6.2f}, "
                  f"最終誤差={final_error:.3f} mol/L")

    print("\n" + "="*70)
    print("学習完了 - テスト実行")
    print("="*70)

    # テスト実行
    state = env.reset()
    trajectory = []

    for step in range(100):
        state_t = torch.FloatTensor(state)
        with torch.no_grad():
            action_mean, _, _ = agent.ac(state_t.unsqueeze(0))
            action = action_mean.squeeze().numpy()

        trajectory.append({
            'step': step,
            'C': env.C,
            'T': env.T,
            'action_flow': action[0],
            'action_temp': action[1]
        })

        next_state, reward, done = env.step(action)
        state = next_state

    # 性能評価
    concentrations = [t['C'] for t in trajectory]
    mean_error = np.mean([abs(c - env.C_target) for c in concentrations])
    std_error = np.std([abs(c - env.C_target) for c in concentrations])

    print(f"\n平均追従誤差: {mean_error:.3f} mol/L")
    print(f"誤差標準偏差: {std_error:.3f} mol/L")

    print("\n制御軌道(最初の10ステップ):")
    for i in range(10):
        t = trajectory[i]
        print(f"  Step {t['step']:3d}: C={t['C']:.2f} mol/L, T={t['T']:.1f}K, "
              f"Flow action={t['action_flow']:+.2f}, Temp action={t['action_temp']:+.2f}")

    print("\n✓ PPOにより連続値制御を学習し、目標値追従を実現")

3.8 マルチ目的最適化

化学プラントでは、収率とエネルギー消費など複数の目標がトレードオフ関係にあります。NSGA-IIによるマルチ目的最適化を実装します。

Example 7: マルチ目的最適化(NSGA-II) - 収率とエネルギーのトレードオフ
"""
===================================
Example 7: マルチ目的最適化(NSGA-II)
===================================

NSGA-II(Non-dominated Sorting Genetic Algorithm II)を用いた
化学プロセスのマルチ目的最適化。

目的:
1. 製品収率の最大化
2. エネルギー消費の最小化

これらは互いにトレードオフ関係にあり、パレート最適解集合を求める。

pymooは実際にはインストールが必要ですが、ここでは基本的な遺伝的アルゴリズムで代替実装します。
"""

import numpy as np
from typing import List, Tuple
import pandas as pd


class MultiObjectiveOptimizer:
    """マルチ目的最適化器(NSGA-II風)"""

    def __init__(self, population_size: int = 50, generations: int = 100):
        self.pop_size = population_size
        self.n_gen = generations

        # 決定変数の範囲
        # x = [温度 [K], 圧力 [bar], 流量 [m³/h], 触媒量 [kg]]
        self.bounds_lower = np.array([350.0, 10.0, 1.0, 50.0])
        self.bounds_upper = np.array([420.0, 50.0, 5.0, 200.0])

    def process_model(self, x: np.ndarray) -> Tuple[float, float]:
        """プロセスモデル:収率とエネルギーを計算

        Args:
            x: [温度, 圧力, 流量, 触媒量]

        Returns:
            (収率, エネルギー消費)
        """
        T, P, F, Cat = x

        # 収率モデル(簡略化)
        # 高温・高圧・多触媒で収率向上、ただし飽和あり
        T_factor = 1.0 / (1.0 + np.exp(-(T - 380.0) / 10.0))
        P_factor = np.log(P / 10.0) / np.log(5.0)
        Cat_factor = np.sqrt(Cat / 50.0)

        yield_fraction = 0.5 + 0.4 * T_factor + 0.2 * P_factor + 0.15 * Cat_factor

        # 流量に比例した生産量
        productivity = F * yield_fraction  # [m³/h * fraction]

        # エネルギー消費モデル
        # 加熱エネルギー(温度に比例)
        heating_energy = F * 1000.0 * 4.18 * (T - 300.0) / 3600.0  # [kW]

        # 圧縮エネルギー(圧力に非線形)
        compression_energy = 10.0 * F * (P / 10.0)**1.2  # [kW]

        total_energy = heating_energy + compression_energy

        return productivity, total_energy

    def objectives(self, x: np.ndarray) -> np.ndarray:
        """目的関数(最小化問題に統一)

        Returns:
            [-収率(最大化→最小化), エネルギー消費(最小化)]
        """
        productivity, energy = self.process_model(x)
        return np.array([-productivity, energy])

    def dominates(self, obj1: np.ndarray, obj2: np.ndarray) -> bool:
        """obj1がobj2を支配するか判定"""
        return np.all(obj1 <= obj2) and np.any(obj1 < obj2)

    def non_dominated_sort(self, objectives: np.ndarray) -> List[List[int]]:
        """非支配ソート

        Args:
            objectives: (pop_size x 2) 目的関数値

        Returns:
            フロントごとのインデックスリスト
        """
        pop_size = len(objectives)
        domination_count = np.zeros(pop_size, dtype=int)
        dominated_solutions = [[] for _ in range(pop_size)]

        fronts = [[]]

        for i in range(pop_size):
            for j in range(i + 1, pop_size):
                if self.dominates(objectives[i], objectives[j]):
                    dominated_solutions[i].append(j)
                    domination_count[j] += 1
                elif self.dominates(objectives[j], objectives[i]):
                    dominated_solutions[j].append(i)
                    domination_count[i] += 1

        for i in range(pop_size):
            if domination_count[i] == 0:
                fronts[0].append(i)

        current_front = 0
        while len(fronts[current_front]) > 0:
            next_front = []
            for i in fronts[current_front]:
                for j in dominated_solutions[i]:
                    domination_count[j] -= 1
                    if domination_count[j] == 0:
                        next_front.append(j)
            current_front += 1
            fronts.append(next_front)

        return fronts[:-1]  # 最後の空リストを除く

    def crowding_distance(self, objectives: np.ndarray, front: List[int]) -> np.ndarray:
        """混雑度距離を計算"""
        n = len(front)
        if n <= 2:
            return np.full(n, np.inf)

        distances = np.zeros(n)

        for m in range(objectives.shape[1]):  # 各目的について
            sorted_idx = np.argsort(objectives[front, m])

            distances[sorted_idx[0]] = np.inf
            distances[sorted_idx[-1]] = np.inf

            obj_range = objectives[front[sorted_idx[-1]], m] - objectives[front[sorted_idx[0]], m]
            if obj_range == 0:
                continue

            for i in range(1, n - 1):
                distances[sorted_idx[i]] += (
                    (objectives[front[sorted_idx[i + 1]], m] -
                     objectives[front[sorted_idx[i - 1]], m]) / obj_range
                )

        return distances

    def optimize(self) -> Tuple[np.ndarray, np.ndarray]:
        """NSGA-II最適化を実行

        Returns:
            (パレート解集合, 目的関数値)
        """
        # 初期個体群を生成
        population = np.random.uniform(
            self.bounds_lower,
            self.bounds_upper,
            (self.pop_size, len(self.bounds_lower))
        )

        for generation in range(self.n_gen):
            # 目的関数評価
            objectives = np.array([self.objectives(ind) for ind in population])

            # 非支配ソート
            fronts = self.non_dominated_sort(objectives)

            # 次世代の選択
            next_population = []
            for front in fronts:
                if len(next_population) + len(front) <= self.pop_size:
                    next_population.extend(front)
                else:
                    # 混雑度距離でソート
                    distances = self.crowding_distance(objectives, front)
                    sorted_idx = np.argsort(distances)[::-1]
                    remaining = self.pop_size - len(next_population)
                    next_population.extend([front[i] for i in sorted_idx[:remaining]])
                    break

            # 交叉と突然変異
            selected = population[next_population]
            offspring = []

            for i in range(0, len(selected) - 1, 2):
                # SBX交叉(簡略版)
                alpha = np.random.rand(len(self.bounds_lower))
                child1 = alpha * selected[i] + (1 - alpha) * selected[i + 1]
                child2 = (1 - alpha) * selected[i] + alpha * selected[i + 1]

                # 多項式突然変異(簡略版)
                if np.random.rand() < 0.1:
                    child1 += np.random.randn(len(self.bounds_lower)) * 0.1 * (self.bounds_upper - self.bounds_lower)
                if np.random.rand() < 0.1:
                    child2 += np.random.randn(len(self.bounds_lower)) * 0.1 * (self.bounds_upper - self.bounds_lower)

                # 境界制約
                child1 = np.clip(child1, self.bounds_lower, self.bounds_upper)
                child2 = np.clip(child2, self.bounds_lower, self.bounds_upper)

                offspring.extend([child1, child2])

            population = np.array(offspring[:self.pop_size])

            if generation % 20 == 0:
                print(f"世代 {generation}: パレートフロントサイズ={len(fronts[0])}")

        # 最終評価
        objectives = np.array([self.objectives(ind) for ind in population])
        fronts = self.non_dominated_sort(objectives)
        pareto_front = fronts[0]

        return population[pareto_front], objectives[pareto_front]


# ===================================
# 実行例
# ===================================
if __name__ == "__main__":
    optimizer = MultiObjectiveOptimizer(population_size=50, generations=100)

    print("="*70)
    print("マルチ目的最適化(NSGA-II)")
    print("="*70)
    print("目的1: 製品収率の最大化")
    print("目的2: エネルギー消費の最小化\n")

    # 最適化実行
    pareto_solutions, pareto_objectives = optimizer.optimize()

    print("\n" + "="*70)
    print(f"パレート最適解: {len(pareto_solutions)} 個")
    print("="*70)

    # 結果を整理
    results = []
    for i, (sol, obj) in enumerate(zip(pareto_solutions, pareto_objectives)):
        T, P, F, Cat = sol
        productivity = -obj[0]  # 最小化問題から戻す
        energy = obj[1]

        results.append({
            '解番号': i + 1,
            '温度 [K]': T,
            '圧力 [bar]': P,
            '流量 [m³/h]': F,
            '触媒 [kg]': Cat,
            '収率生産性 [m³/h]': productivity,
            'エネルギー [kW]': energy,
            'エネルギー原単位 [kW/(m³/h)]': energy / productivity
        })

    df = pd.DataFrame(results)

    # 代表的な解を表示(収率重視、バランス、省エネ重視)
    print("\n【代表的なパレート解】")
    print("\n1. 収率重視(エネルギー消費大):")
    idx_max_yield = df['収率生産性 [m³/h]'].idxmax()
    print(df.loc[idx_max_yield].to_string())

    print("\n2. バランス型:")
    df['バランススコア'] = (df['収率生産性 [m³/h]'].rank() + (1 / df['エネルギー [kW]']).rank()) / 2
    idx_balanced = df['バランススコア'].idxmax()
    print(df.loc[idx_balanced].to_string())

    print("\n3. 省エネ重視(収率低):")
    idx_min_energy = df['エネルギー [kW]'].idxmin()
    print(df.loc[idx_min_energy].to_string())

    print("\n" + "="*70)
    print("トレードオフ分析")
    print("="*70)

    # 収率10%向上のためのエネルギーコスト増加
    sorted_df = df.sort_values('収率生産性 [m³/h]')
    if len(sorted_df) > 1:
        yield_range = sorted_df['収率生産性 [m³/h]'].max() - sorted_df['収率生産性 [m³/h]'].min()
        energy_range = sorted_df['エネルギー [kW]'].max() - sorted_df['エネルギー [kW]'].min()

        print(f"収率10%向上のためのエネルギー増加: 約{energy_range / yield_range * 0.1 * sorted_df['収率生産性 [m³/h]'].mean():.1f} kW")

    print("\n✓ パレートフロントにより、意思決定者が収率とエネルギーのトレードオフを選択可能")

3.9 統合APC+最適化システム

最後に、RTO層とAPC層を統合した実用的なプラント制御システムの実装例を示します。

Example 8: 統合APC+最適化システム - 階層型プラント制御
"""
===================================
Example 8: 統合APC+最適化システム
===================================

リアルタイム最適化(RTO)層とAdvanced Process Control(APC)層を
統合した階層型プラント制御システム。

階層構造:
- RTO層(上位): 経済的最適化により最適運転条件を決定
- APC層(下位): MPCでRTOの目標値を高精度に追従

化学プラント(蒸留塔+反応器)への適用例
"""

import numpy as np
from scipy.optimize import minimize
from typing import Dict, Tuple
import pandas as pd


class RTOLayer:
    """リアルタイム最適化層"""

    def __init__(self):
        # 経済パラメータ
        self.product_price = 120.0  # $/ton
        self.feed_cost = 50.0  # $/ton
        self.steam_cost = 25.0  # $/ton
        self.power_cost = 0.10  # $/kWh

    def steady_state_model(self, x: np.ndarray) -> Dict:
        """定常状態プロセスモデル

        Args:
            x: [反応温度, 蒸留還流比]

        Returns:
            プロセス出力の辞書
        """
        T_reactor, reflux_ratio = x

        # 反応収率(温度依存)
        yield_base = 0.75
        temp_effect = 0.002 * (T_reactor - 370.0)
        yield_fraction = yield_base + temp_effect

        # 製品純度(還流比依存)
        purity_base = 0.90
        reflux_effect = 0.15 * (1.0 - np.exp(-0.5 * (reflux_ratio - 2.0)))
        purity = min(0.99, purity_base + reflux_effect)

        # ユーティリティ消費
        reactor_heat = 50.0 + 0.5 * (T_reactor - 350.0)**2  # kW
        steam_consumption = 2.0 + 0.8 * reflux_ratio  # ton/h
        power = 30.0 + 5.0 * reflux_ratio  # kW

        return {
            'yield': yield_fraction,
            'purity': purity,
            'reactor_heat': reactor_heat,
            'steam': steam_consumption,
            'power': power
        }

    def economic_objective(self, x: np.ndarray, feed_rate: float) -> float:
        """経済目的関数:利益の負値

        Args:
            x: [反応温度, 還流比]
            feed_rate: 原料流量 [ton/h]

        Returns:
            -利益 [$/h]
        """
        outputs = self.steady_state_model(x)

        # 製品生産量
        product_rate = feed_rate * outputs['yield'] * outputs['purity']

        # 収益
        revenue = product_rate * self.product_price

        # コスト
        feed_cost = feed_rate * self.feed_cost
        steam_cost = outputs['steam'] * self.steam_cost
        power_cost = outputs['power'] * self.power_cost

        profit = revenue - feed_cost - steam_cost - power_cost

        return -profit  # 最小化のため負値

    def optimize(self, feed_rate: float) -> Dict:
        """RTOを実行

        Args:
            feed_rate: 現在の原料流量 [ton/h]

        Returns:
            最適運転条件
        """
        # 初期推定値
        x0 = np.array([370.0, 3.0])

        # 境界制約
        bounds = [
            (350.0, 390.0),  # 反応温度 [K]
            (2.0, 5.0)       # 還流比
        ]

        # 制約: 製品純度95%以上
        def purity_constraint(x):
            outputs = self.steady_state_model(x)
            return outputs['purity'] - 0.95

        from scipy.optimize import NonlinearConstraint
        nlc = NonlinearConstraint(purity_constraint, 0, np.inf)

        # 最適化
        result = minimize(
            lambda x: self.economic_objective(x, feed_rate),
            x0,
            method='SLSQP',
            bounds=bounds,
            constraints=[nlc]
        )

        T_opt, reflux_opt = result.x
        outputs = self.steady_state_model(result.x)

        return {
            'T_reactor_sp': T_opt,
            'reflux_ratio_sp': reflux_opt,
            'predicted_yield': outputs['yield'],
            'predicted_purity': outputs['purity'],
            'predicted_profit': -result.fun
        }


class APCLayer:
    """先進制御層(MPC)"""

    def __init__(self):
        # プロセスモデル(線形化近似)
        # 状態: [反応温度偏差, 還流比偏差]
        self.A = np.array([
            [0.90, 0.05],
            [0.00, 0.85]
        ])
        self.B = np.array([
            [0.8, 0.0],
            [0.0, 0.6]
        ])

        # 制御ホライズン
        self.Np = 15
        self.Nc = 8

    def mpc_control(self, current_state: np.ndarray,
                    setpoint: np.ndarray) -> np.ndarray:
        """MPC制御

        Args:
            current_state: 現在の状態偏差 [T偏差, 還流比偏差]
            setpoint: 目標値偏差(RTOからの指令)

        Returns:
            最適操作量
        """
        # 簡略化したMPC(実際はより詳細な実装が必要)
        # ここでは比例制御で代替
        Kp = np.array([2.0, 1.5])
        u = Kp * (setpoint - current_state)

        # 操作量制約
        u = np.clip(u, [-5.0, -0.5], [5.0, 0.5])

        return u


class IntegratedControlSystem:
    """統合制御システム"""

    def __init__(self):
        self.rto = RTOLayer()
        self.apc = APCLayer()

        # RTO実行間隔(APCより長い)
        self.rto_interval = 30  # APCサイクルの30倍

        # 現在の状態
        self.T_reactor = 365.0
        self.reflux_ratio = 3.2

    def run_rto(self, feed_rate: float) -> Dict:
        """RTO層を実行"""
        print("\n" + "="*70)
        print("RTO層: 経済最適化を実行")
        print("="*70)

        result = self.rto.optimize(feed_rate)

        print(f"  最適反応温度: {result['T_reactor_sp']:.1f} K")
        print(f"  最適還流比: {result['reflux_ratio_sp']:.2f}")
        print(f"  予測収率: {result['predicted_yield']:.1%}")
        print(f"  予測純度: {result['predicted_purity']:.1%}")
        print(f"  予測利益: ${result['predicted_profit']:.2f}/h")

        return result

    def run_apc(self, rto_setpoint: Dict) -> np.ndarray:
        """APC層を実行"""
        # 現在の偏差
        current_state = np.array([
            self.T_reactor - rto_setpoint['T_reactor_sp'],
            self.reflux_ratio - rto_setpoint['reflux_ratio_sp']
        ])

        # 目標偏差(ゼロ)
        setpoint = np.array([0.0, 0.0])

        # MPC制御
        u = self.apc.mpc_control(current_state, setpoint)

        return u

    def simulate(self, feed_rate: float, simulation_steps: int = 100):
        """統合システムをシミュレーション

        Args:
            feed_rate: 原料流量 [ton/h]
            simulation_steps: シミュレーションステップ数
        """
        print("="*70)
        print("統合APC+最適化システム シミュレーション")
        print("="*70)

        # 初回RTO実行
        rto_result = self.run_rto(feed_rate)

        history = []

        for step in range(simulation_steps):
            # RTO更新(周期的)
            if step % self.rto_interval == 0 and step > 0:
                rto_result = self.run_rto(feed_rate)

            # APC実行(毎ステップ)
            u = self.run_apc(rto_result)

            # プロセス更新(簡略化)
            self.T_reactor += u[0] * 0.5 + np.random.randn() * 0.5
            self.reflux_ratio += u[1] * 0.3 + np.random.randn() * 0.05

            # 物理制約
            self.T_reactor = np.clip(self.T_reactor, 350.0, 390.0)
            self.reflux_ratio = np.clip(self.reflux_ratio, 2.0, 5.0)

            # 記録
            outputs = self.rto.steady_state_model(
                [self.T_reactor, self.reflux_ratio]
            )

            history.append({
                'step': step,
                'T_reactor': self.T_reactor,
                'T_setpoint': rto_result['T_reactor_sp'],
                'reflux': self.reflux_ratio,
                'reflux_setpoint': rto_result['reflux_ratio_sp'],
                'yield': outputs['yield'],
                'purity': outputs['purity']
            })

            if step % 20 == 0:
                print(f"\nステップ {step:3d}:")
                print(f"  反応温度: {self.T_reactor:.1f}K (SP: {rto_result['T_reactor_sp']:.1f}K)")
                print(f"  還流比: {self.reflux_ratio:.2f} (SP: {rto_result['reflux_ratio_sp']:.2f})")
                print(f"  収率: {outputs['yield']:.1%}, 純度: {outputs['purity']:.1%}")

        return pd.DataFrame(history)


# ===================================
# 実行例
# ===================================
if __name__ == "__main__":
    system = IntegratedControlSystem()

    # 原料流量
    feed_rate = 10.0  # ton/h

    # シミュレーション実行
    df = system.simulate(feed_rate, simulation_steps=100)

    print("\n" + "="*70)
    print("制御性能評価")
    print("="*70)

    # 追従性能
    T_error = np.abs(df['T_reactor'] - df['T_setpoint']).mean()
    reflux_error = np.abs(df['reflux'] - df['reflux_setpoint']).mean()

    print(f"\n反応温度の平均追従誤差: {T_error:.2f} K")
    print(f"還流比の平均追従誤差: {reflux_error:.3f}")

    # プロセス性能
    avg_yield = df['yield'].mean()
    avg_purity = df['purity'].mean()

    print(f"\n平均収率: {avg_yield:.1%}")
    print(f"平均純度: {avg_purity:.1%}")

    print("\n" + "="*70)
    print("システムの特長")
    print("="*70)
    print("✓ RTO層で経済的最適運転条件を決定")
    print("✓ APC層(MPC)で高精度な目標値追従を実現")
    print("✓ 階層化により計算負荷を分散、実時間制御を達成")
    print("✓ プラント全体の経済性を最大化しながら品質を保証")

まとめ

本章では、化学プラントにおけるリアルタイム最適化とAdvanced Process Controlの実装技術を学びました。主要なポイントは以下の通りです:

学習内容の振り返り

  1. オンライン最適化(SciPy):CSTR運転条件を経済的目標関数で最適化し、利益最大化を実現
  2. 経済的最適化(Pyomo):製品価値とユーティリティコストのトレードオフを考慮した複雑な最適化問題を定式化
  3. MPC基礎実装:蒸留塔の温度制御において、未来予測と制約条件を考慮した最適操作を計算
  4. 非線形MPC(CasADi):反応器の非線形ダイナミクスに対応した高度なモデル予測制御を実装
  5. DQNバッチ制御:深層強化学習によりバッチ反応器の最適温度軌道を自律的に学習
  6. PPO連続制御:連続値行動空間を扱うPPOで、CSTR濃度制御の最適方策を獲得
  7. マルチ目的最適化:NSGA-IIにより収率とエネルギーのパレート最適解集合を探索
  8. 統合APC+RTO:階層型制御システムで経済最適化と高精度追従制御を両立
🎯 実務への適用

導入効果の実例:

次章への接続

次章では、プラント全体のサプライチェーン最適化とデジタルツインについて学びます。需要予測、生産計画、在庫最適化、さらにリアルタイムシミュレーションによるプラント運用支援まで、実装レベルで習得します。