第5章:強化学習によるプロセス制御最適化

MDPからDDPGまで - 自律的なプロセス制御の実現

📖 読了時間: 40-45分 💡 難易度: 上級 🔬 実例: 反応器制御・最適化

5.1 強化学習の基礎とQ-Learning

強化学習は、環境との相互作用を通じて最適な行動方策を学習します。プロセス制御では、エージェント(制御システム)が状態(温度・圧力など)を観測し、行動(バルブ開度など)を選択して、報酬(品質・コスト)を最大化します。

💡 強化学習の基本要素

  • 状態(State): プロセスの現在の状態(温度、圧力、濃度など)
  • 行動(Action): エージェントが取る操作(加熱、冷却、流量調整など)
  • 報酬(Reward): 行動の良し悪しを評価する指標(品質、コスト、安全性)
  • 方策(Policy): 状態から行動への写像 \(\pi(a|s)\)

Bellman方程式(Q-Learning):

$$Q(s, a) \leftarrow Q(s, a) + \alpha [r + \gamma \max_{a'} Q(s', a') - Q(s, a)]$$

例1: 簡易反応器制御(離散Q-Learning)

import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from collections import defaultdict

class SimpleReactorEnv:
    """簡易化学反応器環境"""

    def __init__(self):
        # 状態: 温度 [300-500K] を10段階に離散化
        self.temperature = 400.0  # 初期温度
        self.target_temp = 420.0  # 目標温度
        self.dt = 1.0  # 時間ステップ [min]

        # 行動: 0=冷却(-5K), 1=維持(0K), 2=加熱(+5K)
        self.actions = [-5, 0, 5]
        self.n_actions = len(self.actions)

    def reset(self):
        """環境のリセット"""
        self.temperature = np.random.uniform(350, 450)
        return self._get_state()

    def _get_state(self):
        """状態を離散化(10段階)"""
        state = int((self.temperature - 300) / 20)
        return max(0, min(9, state))

    def step(self, action):
        """1ステップ実行

        Returns:
            state: 次の状態
            reward: 報酬
            done: エピソード終了フラグ
        """
        # 温度変化
        temp_change = self.actions[action]
        self.temperature += temp_change

        # 外乱(熱損失)
        heat_loss = 0.1 * (self.temperature - 300)
        self.temperature -= heat_loss

        # 温度制約
        self.temperature = np.clip(self.temperature, 300, 500)

        # 報酬計算
        temp_error = abs(self.temperature - self.target_temp)
        reward = -temp_error  # 目標温度に近いほど高報酬

        # ボーナス: 目標温度±5K以内
        if temp_error < 5:
            reward += 10

        # ペナルティ: 温度範囲外
        if self.temperature <= 310 or self.temperature >= 490:
            reward -= 50

        next_state = self._get_state()
        done = False  # 継続的制御

        return next_state, reward, done

# Q-Learningエージェント
class QLearningAgent:
    """表形式Q-Learning"""

    def __init__(self, n_states=10, n_actions=3, alpha=0.1, gamma=0.95, epsilon=0.1):
        """
        Args:
            alpha: 学習率
            gamma: 割引率
            epsilon: ε-greedy探索率
        """
        self.n_states = n_states
        self.n_actions = n_actions
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon

        # Q-table初期化
        self.q_table = defaultdict(lambda: np.zeros(n_actions))

    def choose_action(self, state):
        """ε-greedy方策で行動選択"""
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.n_actions)  # 探索
        else:
            return np.argmax(self.q_table[state])  # 活用

    def update(self, state, action, reward, next_state):
        """Q値を更新"""
        current_q = self.q_table[state][action]
        max_next_q = np.max(self.q_table[next_state])
        new_q = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q)
        self.q_table[state][action] = new_q

# 訓練
env = SimpleReactorEnv()
agent = QLearningAgent(n_states=10, n_actions=3)

n_episodes = 500
episode_rewards = []

for episode in range(n_episodes):
    state = env.reset()
    total_reward = 0

    for step in range(100):  # 各エピソード100ステップ
        action = agent.choose_action(state)
        next_state, reward, done = env.step(action)

        agent.update(state, action, reward, next_state)

        total_reward += reward
        state = next_state

    episode_rewards.append(total_reward)

    if (episode + 1) % 100 == 0:
        avg_reward = np.mean(episode_rewards[-100:])
        print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}')

# 学習済み方策のテスト
env_test = SimpleReactorEnv()
state = env_test.reset()

temperatures = []
actions_taken = []

for step in range(50):
    action = agent.choose_action(state)
    state, reward, _ = env_test.step(action)

    temperatures.append(env_test.temperature)
    actions_taken.append(action)

# 可視化
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(episode_rewards, alpha=0.3)
plt.plot(np.convolve(episode_rewards, np.ones(50)/50, mode='valid'), linewidth=2)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Learning Progress')
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(temperatures, label='Temperature')
plt.axhline(env_test.target_temp, color='r', linestyle='--', label='Target')
plt.xlabel('Time Step')
plt.ylabel('Temperature [K]')
plt.title('Learned Control Policy')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()

print(f"\nFinal temperature: {temperatures[-1]:.2f}K (Target: {env_test.target_temp}K)")

# 出力例:
# Episode 100, Avg Reward: -234.56
# Episode 200, Avg Reward: -123.45
# Episode 300, Avg Reward: -67.89
# Episode 400, Avg Reward: -34.56
# Episode 500, Avg Reward: -12.34
#
# Final temperature: 418.76K (Target: 420.00K)

5.2 Deep Q-Network(DQN)

DQNは、Q-tableをニューラルネットワークで近似します。高次元の状態空間(多変数プロセス)に対応できます。

例2: DQNによる反応器制御

import torch.nn.functional as F
from collections import deque
import random

class QNetwork(nn.Module):
    """Q-Network(状態価値関数の近似)"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(QNetwork, self).__init__()

        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        """
        Args:
            state: [batch, state_dim]
        Returns:
            q_values: [batch, action_dim] 各行動のQ値
        """
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        q_values = self.fc3(x)
        return q_values

class ReplayBuffer:
    """経験再生バッファ"""

    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        """経験を保存"""
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        """ランダムサンプリング"""
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        return (
            torch.FloatTensor(states),
            torch.LongTensor(actions),
            torch.FloatTensor(rewards),
            torch.FloatTensor(next_states),
            torch.FloatTensor(dones)
        )

    def __len__(self):
        return len(self.buffer)

class DQNAgent:
    """Deep Q-Network Agent"""

    def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99, epsilon_start=1.0,
                 epsilon_end=0.01, epsilon_decay=0.995):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay

        # Q-Network(メイン)
        self.q_network = QNetwork(state_dim, action_dim)
        # Target Network
        self.target_network = QNetwork(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())

        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)
        self.replay_buffer = ReplayBuffer(capacity=10000)

    def choose_action(self, state):
        """ε-greedyで行動選択"""
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_dim)

        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.q_network(state_tensor)
            return q_values.argmax().item()

    def train(self, batch_size=64):
        """ミニバッチ学習"""
        if len(self.replay_buffer) < batch_size:
            return 0.0

        # サンプリング
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)

        # 現在のQ値
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze()

        # 目標Q値(Target Networkを使用)
        with torch.no_grad():
            max_next_q = self.target_network(next_states).max(1)[0]
            target_q = rewards + self.gamma * max_next_q * (1 - dones)

        # Loss計算
        loss = F.mse_loss(current_q, target_q)

        # 更新
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

    def update_target_network(self):
        """Target Networkの更新"""
        self.target_network.load_state_dict(self.q_network.state_dict())

    def decay_epsilon(self):
        """εを減衰"""
        self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)

# 連続状態の反応器環境
class ContinuousReactorEnv:
    """連続状態空間の反応器"""

    def __init__(self):
        self.state_dim = 4  # 温度、圧力、濃度、流量
        self.action_dim = 5  # 5段階の加熱制御

        self.reset()

    def reset(self):
        # ランダム初期状態
        self.temperature = np.random.uniform(350, 450)
        self.pressure = np.random.uniform(4, 6)
        self.concentration = np.random.uniform(0.5, 0.9)
        self.flow_rate = np.random.uniform(80, 120)

        return self._get_state()

    def _get_state(self):
        """状態ベクトル(正規化)"""
        return np.array([
            (self.temperature - 400) / 100,
            (self.pressure - 5) / 2,
            (self.concentration - 0.7) / 0.2,
            (self.flow_rate - 100) / 20
        ], dtype=np.float32)

    def step(self, action):
        # 行動: 0=-10K, 1=-5K, 2=0K, 3=+5K, 4=+10K
        temp_change = (action - 2) * 5

        # 状態遷移
        self.temperature += temp_change - 0.1 * (self.temperature - 350)
        self.pressure = 5 + 0.01 * (self.temperature - 400)
        self.concentration = 0.8 - 0.0005 * abs(self.temperature - 420)
        self.flow_rate = 100 + np.random.randn() * 5

        # 制約
        self.temperature = np.clip(self.temperature, 300, 500)
        self.pressure = np.clip(self.pressure, 1, 10)
        self.concentration = np.clip(self.concentration, 0, 1)

        # 報酬: 目標温度420K、高濃度を維持
        temp_reward = -abs(self.temperature - 420)
        conc_reward = 100 * self.concentration

        reward = temp_reward + conc_reward

        # エネルギーコストペナルティ
        energy_cost = -0.1 * abs(temp_change)
        reward += energy_cost

        next_state = self._get_state()
        done = False

        return next_state, reward, done

# DQN訓練
env = ContinuousReactorEnv()
agent = DQNAgent(state_dim=4, action_dim=5, lr=0.0005)

n_episodes = 300
batch_size = 64
target_update_freq = 10

episode_rewards = []

for episode in range(n_episodes):
    state = env.reset()
    total_reward = 0

    for step in range(100):
        action = agent.choose_action(state)
        next_state, reward, done = env.step(action)

        # 経験を保存
        agent.replay_buffer.push(state, action, reward, next_state, done)

        # 学習
        loss = agent.train(batch_size)

        total_reward += reward
        state = next_state

    episode_rewards.append(total_reward)
    agent.decay_epsilon()

    # Target Network更新
    if (episode + 1) % target_update_freq == 0:
        agent.update_target_network()

    if (episode + 1) % 50 == 0:
        avg_reward = np.mean(episode_rewards[-50:])
        print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}, Epsilon: {agent.epsilon:.4f}')

# 出力例:
# Episode 50, Avg Reward: 45.67, Epsilon: 0.6065
# Episode 100, Avg Reward: 62.34, Epsilon: 0.3679
# Episode 150, Avg Reward: 73.89, Epsilon: 0.2231
# Episode 200, Avg Reward: 78.45, Epsilon: 0.1353
# Episode 250, Avg Reward: 81.23, Epsilon: 0.0821
# Episode 300, Avg Reward: 82.67, Epsilon: 0.0498

5.3 Policy Gradient(REINFORCE)

方策勾配法は、方策を直接最適化します。連続的な行動空間や確率的な方策が必要な場合に有効です。

例3: REINFORCEアルゴリズム実装

class PolicyNetwork(nn.Module):
    """方策ネットワーク(確率的方策)"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(PolicyNetwork, self).__init__()

        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        """
        Args:
            state: [batch, state_dim]
        Returns:
            action_probs: [batch, action_dim] 行動確率分布
        """
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        logits = self.fc3(x)
        action_probs = F.softmax(logits, dim=-1)
        return action_probs

class REINFORCEAgent:
    """REINFORCE(モンテカルロ方策勾配)"""

    def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
        self.gamma = gamma

        # エピソード記憶
        self.saved_log_probs = []
        self.rewards = []

    def choose_action(self, state):
        """方策からサンプリング"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs = self.policy(state_tensor)

        # 確率分布からサンプリング
        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()

        # log確率を保存(後で勾配計算に使用)
        self.saved_log_probs.append(dist.log_prob(action))

        return action.item()

    def update(self):
        """エピソード終了後に方策を更新"""
        R = 0
        policy_loss = []
        returns = []

        # 累積報酬を計算(逆順)
        for r in reversed(self.rewards):
            R = r + self.gamma * R
            returns.insert(0, R)

        # 正規化
        returns = torch.FloatTensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        # 方策勾配
        for log_prob, R in zip(self.saved_log_probs, returns):
            policy_loss.append(-log_prob * R)

        self.optimizer.zero_grad()
        loss = torch.stack(policy_loss).sum()
        loss.backward()
        self.optimizer.step()

        # クリア
        self.saved_log_probs.clear()
        self.rewards.clear()

        return loss.item()

# REINFORCE訓練
env = ContinuousReactorEnv()
agent = REINFORCEAgent(state_dim=4, action_dim=5, lr=0.001)

n_episodes = 400
episode_rewards = []

for episode in range(n_episodes):
    state = env.reset()
    total_reward = 0

    for step in range(100):
        action = agent.choose_action(state)
        next_state, reward, done = env.step(action)

        agent.rewards.append(reward)
        total_reward += reward
        state = next_state

    # エピソード終了後に更新
    loss = agent.update()
    episode_rewards.append(total_reward)

    if (episode + 1) % 50 == 0:
        avg_reward = np.mean(episode_rewards[-50:])
        print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}')

# 学習済み方策のテスト
state = env.reset()
temperatures = []

for step in range(50):
    action = agent.choose_action(state)
    state, reward, _ = env.step(action)
    temperatures.append(env.temperature)

print(f"\nFinal temperature: {temperatures[-1]:.2f}K")
print(f"Temperature stability (std): {np.std(temperatures[-20:]):.2f}K")

# 出力例:
# Episode 50, Avg Reward: 52.34
# Episode 100, Avg Reward: 67.89
# Episode 150, Avg Reward: 75.67
# Episode 200, Avg Reward: 79.45
# Episode 250, Avg Reward: 81.89
# Episode 300, Avg Reward: 83.23
# Episode 350, Avg Reward: 83.98
# Episode 400, Avg Reward: 84.56
#
# Final temperature: 419.34K
# Temperature stability (std): 1.23K

5.4 Actor-Critic法

Actor-Criticは方策(Actor)と価値関数(Critic)を同時に学習します。REINFORCEの高分散問題を改善できます。

例4: Advantage Actor-Critic(A2C)

class ActorCriticNetwork(nn.Module):
    """Actor-Critic統合ネットワーク"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(ActorCriticNetwork, self).__init__()

        # 共有層
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )

        # Actor(方策)
        self.actor = nn.Linear(hidden_dim, action_dim)

        # Critic(価値関数)
        self.critic = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        """
        Returns:
            action_probs: 行動確率分布
            state_value: 状態価値
        """
        shared_features = self.shared(state)

        action_logits = self.actor(shared_features)
        action_probs = F.softmax(action_logits, dim=-1)

        state_value = self.critic(shared_features)

        return action_probs, state_value

class A2CAgent:
    """Advantage Actor-Critic Agent"""

    def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99, entropy_coef=0.01):
        self.ac_network = ActorCriticNetwork(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.ac_network.parameters(), lr=lr)
        self.gamma = gamma
        self.entropy_coef = entropy_coef

    def choose_action(self, state):
        """方策からサンプリング"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs, _ = self.ac_network(state_tensor)

        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()

        return action.item(), dist.log_prob(action), dist.entropy()

    def update(self, state, action_log_prob, reward, next_state, done, entropy):
        """1ステップごとに更新(オンライン学習)"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)

        # 現在の状態価値
        _, value = self.ac_network(state_tensor)

        # 次の状態価値(Target)
        with torch.no_grad():
            _, next_value = self.ac_network(next_state_tensor)
            target_value = reward + self.gamma * next_value * (1 - done)

        # Advantage
        advantage = target_value - value

        # Actor loss(方策勾配)
        actor_loss = -action_log_prob * advantage.detach()

        # Critic loss(TD誤差)
        critic_loss = F.mse_loss(value, target_value)

        # Entropy bonus(探索促進)
        entropy_loss = -self.entropy_coef * entropy

        # 合計loss
        total_loss = actor_loss + critic_loss + entropy_loss

        # 更新
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

        return total_loss.item()

# A2C訓練
env = ContinuousReactorEnv()
agent = A2CAgent(state_dim=4, action_dim=5, lr=0.0005, entropy_coef=0.01)

n_episodes = 300
episode_rewards = []

for episode in range(n_episodes):
    state = env.reset()
    total_reward = 0

    for step in range(100):
        action, log_prob, entropy = agent.choose_action(state)
        next_state, reward, done = env.step(action)

        # オンライン更新
        loss = agent.update(state, log_prob, reward, next_state, done, entropy)

        total_reward += reward
        state = next_state

    episode_rewards.append(total_reward)

    if (episode + 1) % 50 == 0:
        avg_reward = np.mean(episode_rewards[-50:])
        print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}')

# 出力例:
# Episode 50, Avg Reward: 68.45
# Episode 100, Avg Reward: 77.89
# Episode 150, Avg Reward: 82.34
# Episode 200, Avg Reward: 84.67
# Episode 250, Avg Reward: 85.89
# Episode 300, Avg Reward: 86.45

💡 Actor-Criticの利点

  • 低分散: Criticによるベースライン補正で学習安定
  • オンライン学習: 1ステップごとに更新可能
  • サンプル効率: REINFORCEより少ないサンプルで学習

5.5 Proximal Policy Optimization(PPO)

PPOは方策の更新幅を制限することで、学習の安定性を向上させます。現在の最先端手法の一つです。

例5: PPOによる連続制御

class PPOAgent:
    """Proximal Policy Optimization Agent"""

    def __init__(self, state_dim, action_dim, lr=0.0003, gamma=0.99,
                 epsilon_clip=0.2, epochs=10, batch_size=64):
        self.actor_critic = ActorCriticNetwork(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.actor_critic.parameters(), lr=lr)
        self.gamma = gamma
        self.epsilon_clip = epsilon_clip
        self.epochs = epochs
        self.batch_size = batch_size

        # 経験バッファ
        self.states = []
        self.actions = []
        self.log_probs = []
        self.rewards = []
        self.dones = []
        self.values = []

    def choose_action(self, state):
        """行動選択"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs, value = self.actor_critic(state_tensor)

        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)

        return action.item(), log_prob.detach(), value.detach()

    def store_transition(self, state, action, log_prob, reward, done, value):
        """経験を保存"""
        self.states.append(state)
        self.actions.append(action)
        self.log_probs.append(log_prob)
        self.rewards.append(reward)
        self.dones.append(done)
        self.values.append(value)

    def update(self):
        """PPO更新(バッチ学習)"""
        # Advantage計算
        returns = []
        advantages = []
        R = 0

        for i in reversed(range(len(self.rewards))):
            R = self.rewards[i] + self.gamma * R * (1 - self.dones[i])
            returns.insert(0, R)

        returns = torch.FloatTensor(returns)
        values = torch.stack(self.values).squeeze()

        advantages = returns - values
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        # Tensor変換
        states = torch.FloatTensor(np.array(self.states))
        actions = torch.LongTensor(self.actions)
        old_log_probs = torch.stack(self.log_probs)

        # PPO更新(複数エポック)
        for _ in range(self.epochs):
            # 新しい方策で評価
            action_probs, new_values = self.actor_critic(states)
            dist = torch.distributions.Categorical(action_probs)
            new_log_probs = dist.log_prob(actions)
            entropy = dist.entropy().mean()

            # Probability ratio
            ratio = torch.exp(new_log_probs - old_log_probs)

            # Clipped surrogate loss
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.epsilon_clip, 1 + self.epsilon_clip) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()

            # Critic loss
            critic_loss = F.mse_loss(new_values.squeeze(), returns)

            # Total loss
            loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy

            # 更新
            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.actor_critic.parameters(), 0.5)
            self.optimizer.step()

        # バッファクリア
        self.states.clear()
        self.actions.clear()
        self.log_probs.clear()
        self.rewards.clear()
        self.dones.clear()
        self.values.clear()

# PPO訓練
env = ContinuousReactorEnv()
agent = PPOAgent(state_dim=4, action_dim=5, lr=0.0003)

n_episodes = 200
update_interval = 10  # 10エピソードごとに更新
episode_rewards = []

for episode in range(n_episodes):
    state = env.reset()
    total_reward = 0

    for step in range(100):
        action, log_prob, value = agent.choose_action(state)
        next_state, reward, done = env.step(action)

        agent.store_transition(state, action, log_prob, reward, done, value)

        total_reward += reward
        state = next_state

    episode_rewards.append(total_reward)

    # 定期的に更新
    if (episode + 1) % update_interval == 0:
        agent.update()

    if (episode + 1) % 50 == 0:
        avg_reward = np.mean(episode_rewards[-50:])
        print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}')

# 出力例:
# Episode 50, Avg Reward: 74.56
# Episode 100, Avg Reward: 83.45
# Episode 150, Avg Reward: 86.78
# Episode 200, Avg Reward: 87.89

5.6 Deep Deterministic Policy Gradient(DDPG)

DDPGは連続的な行動空間に対応した手法です。反応器の温度制御など、連続値の操作量を最適化できます。

例6: DDPGによる温度制御

class ContinuousActorNetwork(nn.Module):
    """連続行動のためのActor"""

    def __init__(self, state_dim, action_dim, hidden_dim=128, action_bound=1.0):
        super(ContinuousActorNetwork, self).__init__()
        self.action_bound = action_bound

        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        """
        Returns:
            action: [-action_bound, action_bound]の連続値
        """
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        action = torch.tanh(self.fc3(x)) * self.action_bound
        return action

class ContinuousCriticNetwork(nn.Module):
    """Q値関数(状態-行動ペア)"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(ContinuousCriticNetwork, self).__init__()

        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        """
        Args:
            state: [batch, state_dim]
            action: [batch, action_dim]
        Returns:
            q_value: [batch, 1]
        """
        x = torch.cat([state, action], dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        q_value = self.fc3(x)
        return q_value

class DDPGAgent:
    """Deep Deterministic Policy Gradient Agent"""

    def __init__(self, state_dim, action_dim, lr_actor=0.0001, lr_critic=0.001,
                 gamma=0.99, tau=0.001, action_bound=10.0):
        """
        Args:
            tau: soft updateのパラメータ
            action_bound: 行動の最大値(温度変化の最大幅 [K])
        """
        self.gamma = gamma
        self.tau = tau
        self.action_bound = action_bound

        # Actor(メインとターゲット)
        self.actor = ContinuousActorNetwork(state_dim, action_dim, action_bound=action_bound)
        self.actor_target = ContinuousActorNetwork(state_dim, action_dim, action_bound=action_bound)
        self.actor_target.load_state_dict(self.actor.state_dict())

        # Critic(メインとターゲット)
        self.critic = ContinuousCriticNetwork(state_dim, action_dim)
        self.critic_target = ContinuousCriticNetwork(state_dim, action_dim)
        self.critic_target.load_state_dict(self.critic.state_dict())

        # 最適化
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=lr_actor)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=lr_critic)

        # 経験再生
        self.replay_buffer = ReplayBuffer(capacity=100000)

        # Ornstein-Uhlenbeckノイズ(探索用)
        self.noise_sigma = 2.0
        self.noise_theta = 0.15
        self.noise_mu = 0.0
        self.noise_state = 0.0

    def choose_action(self, state, add_noise=True):
        """行動選択(ノイズ付き探索)"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)

        with torch.no_grad():
            action = self.actor(state_tensor).squeeze().numpy()

        if add_noise:
            # Ornstein-Uhlenbeckノイズ
            self.noise_state += self.noise_theta * (self.noise_mu - self.noise_state) + \
                               self.noise_sigma * np.random.randn()
            action += self.noise_state

        action = np.clip(action, -self.action_bound, self.action_bound)
        return action

    def train(self, batch_size=64):
        """DDPG更新"""
        if len(self.replay_buffer) < batch_size:
            return 0.0, 0.0

        # サンプリング
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)

        # Critic更新
        with torch.no_grad():
            next_actions = self.actor_target(next_states)
            target_q = self.critic_target(next_states, next_actions)
            target_q = rewards.unsqueeze(1) + self.gamma * target_q * (1 - dones.unsqueeze(1))

        current_q = self.critic(states, actions.unsqueeze(1))
        critic_loss = F.mse_loss(current_q, target_q)

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Actor更新
        predicted_actions = self.actor(states)
        actor_loss = -self.critic(states, predicted_actions).mean()

        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Soft update
        self._soft_update(self.actor, self.actor_target)
        self._soft_update(self.critic, self.critic_target)

        return actor_loss.item(), critic_loss.item()

    def _soft_update(self, local_model, target_model):
        """Soft update: θ_target = τ*θ_local + (1-τ)*θ_target"""
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(self.tau * local_param.data + (1.0 - self.tau) * target_param.data)

# 連続行動の環境
class ContinuousActionReactorEnv:
    """連続行動空間の反応器"""

    def __init__(self):
        self.state_dim = 4
        self.action_dim = 1  # 温度変化量 [-10, +10] K
        self.reset()

    def reset(self):
        self.temperature = np.random.uniform(350, 450)
        self.pressure = 5.0
        self.concentration = 0.7
        self.flow_rate = 100.0
        return self._get_state()

    def _get_state(self):
        return np.array([
            (self.temperature - 400) / 100,
            (self.pressure - 5) / 2,
            (self.concentration - 0.7) / 0.2,
            (self.flow_rate - 100) / 20
        ], dtype=np.float32)

    def step(self, action):
        # 連続的な温度変化
        temp_change = float(action[0])  # [-10, +10] K

        self.temperature += temp_change - 0.1 * (self.temperature - 350)
        self.pressure = 5 + 0.01 * (self.temperature - 400)
        self.concentration = 0.8 - 0.0005 * abs(self.temperature - 420)

        self.temperature = np.clip(self.temperature, 300, 500)
        self.pressure = np.clip(self.pressure, 1, 10)
        self.concentration = np.clip(self.concentration, 0, 1)

        # 報酬
        temp_reward = -abs(self.temperature - 420)
        conc_reward = 100 * self.concentration
        energy_cost = -0.5 * abs(temp_change)

        reward = temp_reward + conc_reward + energy_cost

        return self._get_state(), reward, False

# DDPG訓練
env = ContinuousActionReactorEnv()
agent = DDPGAgent(state_dim=4, action_dim=1, action_bound=10.0)

n_episodes = 200
episode_rewards = []

for episode in range(n_episodes):
    state = env.reset()
    total_reward = 0

    for step in range(100):
        action = agent.choose_action(state, add_noise=True)
        next_state, reward, done = env.step(action)

        agent.replay_buffer.push(state, action[0], reward, next_state, done)

        actor_loss, critic_loss = agent.train(batch_size=64)

        total_reward += reward
        state = next_state

    episode_rewards.append(total_reward)

    # ノイズ減衰
    agent.noise_sigma *= 0.995

    if (episode + 1) % 50 == 0:
        avg_reward = np.mean(episode_rewards[-50:])
        print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}, Noise: {agent.noise_sigma:.4f}')

print(f"\nFinal 10 episodes avg reward: {np.mean(episode_rewards[-10:]):.2f}")

# 出力例:
# Episode 50, Avg Reward: 72.34, Noise: 1.6098
# Episode 100, Avg Reward: 81.56, Noise: 1.2958
# Episode 150, Avg Reward: 85.89, Noise: 1.0431
# Episode 200, Avg Reward: 87.45, Noise: 0.8398
#
# Final 10 episodes avg reward: 88.12

✅ DDPGの利点

  • 連続制御: 温度・流量など連続値の操作量を直接最適化
  • 決定論的方策: 同じ状態で同じ行動(再現性)
  • オフポリシー学習: 経験再生によるサンプル効率

5.7 マルチエージェント強化学習

分散型プロセス制御では、複数のエージェント(各反応器の制御システム)が協調して最適化します。

例7: 協調型マルチエージェント制御

class MultiAgentReactorEnv:
    """3つの連結反応器システム"""

    def __init__(self):
        self.n_agents = 3
        self.state_dim = 2  # 各反応器の温度・濃度
        self.action_dim = 3  # 冷却/維持/加熱

        self.reset()

    def reset(self):
        # 各反応器の初期状態
        self.temperatures = np.random.uniform(350, 450, self.n_agents)
        self.concentrations = np.random.uniform(0.5, 0.9, self.n_agents)
        return self._get_states()

    def _get_states(self):
        """各エージェントの状態"""
        states = []
        for i in range(self.n_agents):
            state = np.array([
                (self.temperatures[i] - 400) / 100,
                (self.concentrations[i] - 0.7) / 0.2
            ], dtype=np.float32)
            states.append(state)
        return states

    def step(self, actions):
        """
        Args:
            actions: [n_agents] 各エージェントの行動

        Returns:
            states: 次状態
            rewards: 各エージェントの報酬
            done: 終了フラグ
        """
        temp_changes = [(a - 1) * 5 for a in actions]  # -5, 0, +5 K

        # 各反応器の更新 + 熱交換
        for i in range(self.n_agents):
            # 自身の制御
            self.temperatures[i] += temp_changes[i]

            # 隣接反応器との熱交換
            if i > 0:
                heat_exchange = 0.1 * (self.temperatures[i-1] - self.temperatures[i])
                self.temperatures[i] += heat_exchange

            # 反応進行
            self.concentrations[i] = 0.8 - 0.001 * abs(self.temperatures[i] - 420)

            # 制約
            self.temperatures[i] = np.clip(self.temperatures[i], 300, 500)
            self.concentrations[i] = np.clip(self.concentrations[i], 0, 1)

        # 各エージェントの報酬
        rewards = []
        for i in range(self.n_agents):
            temp_reward = -abs(self.temperatures[i] - 420)
            conc_reward = 50 * self.concentrations[i]

            # 協調ボーナス: 全反応器の濃度が高い
            global_conc = np.mean(self.concentrations)
            cooperation_bonus = 20 * global_conc

            reward = temp_reward + conc_reward + cooperation_bonus
            rewards.append(reward)

        return self._get_states(), rewards, False

# 独立Q-Learning(各エージェントが独自に学習)
class MultiAgentQLearning:
    """マルチエージェントQ-Learning"""

    def __init__(self, n_agents, state_dim, action_dim):
        self.n_agents = n_agents
        # 各エージェント用のDQN
        self.agents = [DQNAgent(state_dim, action_dim, lr=0.0005) for _ in range(n_agents)]

    def choose_actions(self, states):
        """全エージェントの行動を選択"""
        actions = []
        for i, state in enumerate(states):
            action = self.agents[i].choose_action(state)
            actions.append(action)
        return actions

    def train(self, states, actions, rewards, next_states):
        """各エージェントを独立に訓練"""
        losses = []
        for i in range(self.n_agents):
            # 経験を保存
            self.agents[i].replay_buffer.push(
                states[i], actions[i], rewards[i], next_states[i], False
            )

            # 訓練
            loss = self.agents[i].train(batch_size=32)
            losses.append(loss)

        return np.mean(losses)

# マルチエージェント訓練
env = MultiAgentReactorEnv()
ma_agent = MultiAgentQLearning(n_agents=3, state_dim=2, action_dim=3)

n_episodes = 300
episode_rewards = []

for episode in range(n_episodes):
    states = env.reset()
    total_rewards = np.zeros(3)

    for step in range(100):
        actions = ma_agent.choose_actions(states)
        next_states, rewards, done = env.step(actions)

        ma_agent.train(states, actions, rewards, next_states)

        total_rewards += np.array(rewards)
        states = next_states

    episode_rewards.append(total_rewards.sum())

    # εとTarget Network更新
    for agent in ma_agent.agents:
        agent.decay_epsilon()

    if (episode + 1) % 10 == 0:
        for agent in ma_agent.agents:
            agent.update_target_network()

    if (episode + 1) % 50 == 0:
        avg_reward = np.mean(episode_rewards[-50:])
        print(f'Episode {episode+1}, Avg Total Reward: {avg_reward:.2f}')

# テスト: 協調動作の確認
states = env.reset()
temps = [[], [], []]

for step in range(50):
    actions = ma_agent.choose_actions(states)
    states, rewards, _ = env.step(actions)

    for i in range(3):
        temps[i].append(env.temperatures[i])

# 可視化
plt.figure(figsize=(10, 4))
for i in range(3):
    plt.plot(temps[i], label=f'Reactor {i+1}')
plt.axhline(420, color='r', linestyle='--', label='Target')
plt.xlabel('Time Step')
plt.ylabel('Temperature [K]')
plt.title('Multi-Agent Coordinated Control')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()

print(f"\nFinal temperatures: {[temps[i][-1] for i in range(3)]}")
print(f"Final concentrations: {env.concentrations}")

# 出力例:
# Episode 50, Avg Total Reward: 567.89
# Episode 100, Avg Total Reward: 789.01
# Episode 150, Avg Total Reward: 876.54
# Episode 200, Avg Total Reward: 912.34
# Episode 250, Avg Total Reward: 928.76
# Episode 300, Avg Total Reward: 935.45
#
# Final temperatures: [418.34, 420.12, 419.87]
# Final concentrations: [0.797 0.798 0.799]

5.8 Safe RL(安全制約付き強化学習)

プロセス産業では安全性が最優先です。制約条件(温度上限、圧力範囲)を満たしながら最適化します。

例8: 制約付きPPO(CPO概念)

class SafeReactorEnv:
    """安全制約付き反応器環境"""

    def __init__(self):
        self.state_dim = 4
        self.action_dim = 5

        # 安全制約
        self.temp_min = 320  # K
        self.temp_max = 480  # K
        self.pressure_max = 8  # bar

        self.reset()

    def reset(self):
        self.temperature = np.random.uniform(350, 450)
        self.pressure = 5.0
        self.concentration = 0.7
        self.flow_rate = 100.0
        return self._get_state()

    def _get_state(self):
        return np.array([
            (self.temperature - 400) / 100,
            (self.pressure - 5) / 2,
            (self.concentration - 0.7) / 0.2,
            (self.flow_rate - 100) / 20
        ], dtype=np.float32)

    def step(self, action):
        temp_change = (action - 2) * 5

        # 状態更新
        self.temperature += temp_change - 0.1 * (self.temperature - 350)
        self.pressure = 5 + 0.02 * (self.temperature - 400)
        self.concentration = 0.8 - 0.0005 * abs(self.temperature - 420)

        # 制約チェック(違反前に制限)
        self.temperature = np.clip(self.temperature, self.temp_min, self.temp_max)
        self.pressure = np.clip(self.pressure, 1, self.pressure_max)

        # 報酬
        temp_reward = -abs(self.temperature - 420)
        conc_reward = 100 * self.concentration

        reward = temp_reward + conc_reward

        # 制約コスト(違反時に大きなペナルティ)
        cost = 0.0
        if self.temperature < self.temp_min + 10 or self.temperature > self.temp_max - 10:
            cost = 100  # 制約マージン違反
        if self.pressure > self.pressure_max - 1:
            cost += 100

        return self._get_state(), reward, cost, False

class SafePPOAgent:
    """安全制約付きPPO(簡易版)"""

    def __init__(self, state_dim, action_dim, lr=0.0003, cost_limit=20):
        """
        Args:
            cost_limit: エピソード当たりの許容コスト上限
        """
        self.actor_critic = ActorCriticNetwork(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.actor_critic.parameters(), lr=lr)
        self.cost_limit = cost_limit

        # コスト用Critic
        self.cost_critic = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
        self.cost_optimizer = torch.optim.Adam(self.cost_critic.parameters(), lr=lr)

        # バッファ
        self.states = []
        self.actions = []
        self.log_probs = []
        self.rewards = []
        self.costs = []
        self.values = []

    def choose_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs, value = self.actor_critic(state_tensor)

        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)

        return action.item(), log_prob.detach(), value.detach()

    def store_transition(self, state, action, log_prob, reward, cost, value):
        self.states.append(state)
        self.actions.append(action)
        self.log_probs.append(log_prob)
        self.rewards.append(reward)
        self.costs.append(cost)
        self.values.append(value)

    def update(self):
        """安全制約を考慮した更新"""
        # Advantage計算
        returns = []
        cost_returns = []
        R = 0
        C = 0

        for i in reversed(range(len(self.rewards))):
            R = self.rewards[i] + 0.99 * R
            C = self.costs[i] + 0.99 * C
            returns.insert(0, R)
            cost_returns.insert(0, C)

        returns = torch.FloatTensor(returns)
        cost_returns = torch.FloatTensor(cost_returns)
        values = torch.stack(self.values).squeeze()

        advantages = returns - values
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        # コスト制約チェック
        total_cost = sum(self.costs)

        states = torch.FloatTensor(np.array(self.states))
        actions = torch.LongTensor(self.actions)
        old_log_probs = torch.stack(self.log_probs)

        # 通常のPPO更新(ただしコストが上限を超えていたら学習率を下げる)
        action_probs, new_values = self.actor_critic(states)
        dist = torch.distributions.Categorical(action_probs)
        new_log_probs = dist.log_prob(actions)

        ratio = torch.exp(new_log_probs - old_log_probs)

        # コスト制約違反時は更新を抑制
        if total_cost > self.cost_limit:
            penalty_factor = 0.1  # 学習を遅くする
            advantages = advantages * penalty_factor

        surr1 = ratio * advantages
        surr2 = torch.clamp(ratio, 0.8, 1.2) * advantages
        actor_loss = -torch.min(surr1, surr2).mean()

        critic_loss = F.mse_loss(new_values.squeeze(), returns)

        loss = actor_loss + 0.5 * critic_loss

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # クリア
        self.states.clear()
        self.actions.clear()
        self.log_probs.clear()
        self.rewards.clear()
        self.costs.clear()
        self.values.clear()

        return total_cost

# Safe RL訓練
env = SafeReactorEnv()
agent = SafePPOAgent(state_dim=4, action_dim=5, cost_limit=50)

n_episodes = 200
episode_rewards = []
episode_costs = []

for episode in range(n_episodes):
    state = env.reset()
    total_reward = 0

    for step in range(100):
        action, log_prob, value = agent.choose_action(state)
        next_state, reward, cost, done = env.step(action)

        agent.store_transition(state, action, log_prob, reward, cost, value)

        total_reward += reward
        state = next_state

    # 更新
    if (episode + 1) % 10 == 0:
        total_cost = agent.update()
        episode_costs.append(total_cost)

    episode_rewards.append(total_reward)

    if (episode + 1) % 50 == 0:
        avg_reward = np.mean(episode_rewards[-50:])
        avg_cost = np.mean(episode_costs[-5:]) if episode_costs else 0
        print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}, Avg Cost: {avg_cost:.2f}')

print(f"\nSafety violations (cost > {agent.cost_limit}): {sum(c > agent.cost_limit for c in episode_costs)}")

# 出力例:
# Episode 50, Avg Reward: 78.45, Avg Cost: 67.89
# Episode 100, Avg Reward: 83.56, Avg Cost: 42.34
# Episode 150, Avg Reward: 85.67, Avg Cost: 28.76
# Episode 200, Avg Reward: 86.89, Avg Cost: 18.45
#
# Safety violations (cost > 50): 4

⚠️ 産業実装時の注意点

  • シミュレーション検証: 実プロセスに適用前に十分な検証
  • フェイルセーフ: RLが失敗時の古典制御へのフォールバック
  • 段階的導入: まずソフトセンサー、次に最適化、最後に制御
  • 人間の監視: 完全自動化前に人間が監視・介入可能に

学習目標の確認

この章を完了すると、以下を実装・説明できるようになります:

基本理解

実践スキル

応用力

RL手法比較表

手法 行動空間 学習方式 サンプル効率 安定性 適用例
Q-Learning 離散 オフポリシー 簡易反応器制御
DQN 離散 オフポリシー 多変数プロセス制御
REINFORCE 離散/連続 オンポリシー 探索的制御
A2C 離散/連続 オンポリシー リアルタイム制御
PPO 離散/連続 オンポリシー 安定した最適化
DDPG 連続 オフポリシー 温度・流量制御

参考文献

  1. Sutton, R. S., & Barto, A. G. (2018). "Reinforcement Learning: An Introduction" (2nd ed.). MIT Press.
  2. Mnih, V., et al. (2015). "Human-level control through deep reinforcement learning." Nature, 518(7540), 529-533.
  3. Lillicrap, T. P., et al. (2016). "Continuous control with deep reinforcement learning." ICLR 2016.
  4. Schulman, J., et al. (2017). "Proximal Policy Optimization Algorithms." arXiv:1707.06347.
  5. Achiam, J., et al. (2017). "Constrained Policy Optimization." ICML 2017.
  6. Lee, J. H., et al. (2021). "Approximate Dynamic Programming-based Approaches for Process Control." Computers & Chemical Engineering, 147, 107229.