5.1 強化学習の基礎とQ-Learning
強化学習は、環境との相互作用を通じて最適な行動方策を学習します。プロセス制御では、エージェント(制御システム)が状態(温度・圧力など)を観測し、行動(バルブ開度など)を選択して、報酬(品質・コスト)を最大化します。
💡 強化学習の基本要素
- 状態(State): プロセスの現在の状態(温度、圧力、濃度など)
- 行動(Action): エージェントが取る操作(加熱、冷却、流量調整など)
- 報酬(Reward): 行動の良し悪しを評価する指標(品質、コスト、安全性)
- 方策(Policy): 状態から行動への写像 \(\pi(a|s)\)
Bellman方程式(Q-Learning):
$$Q(s, a) \leftarrow Q(s, a) + \alpha [r + \gamma \max_{a'} Q(s', a') - Q(s, a)]$$
例1: 簡易反応器制御(離散Q-Learning)
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from collections import defaultdict
class SimpleReactorEnv:
"""簡易化学反応器環境"""
def __init__(self):
# 状態: 温度 [300-500K] を10段階に離散化
self.temperature = 400.0 # 初期温度
self.target_temp = 420.0 # 目標温度
self.dt = 1.0 # 時間ステップ [min]
# 行動: 0=冷却(-5K), 1=維持(0K), 2=加熱(+5K)
self.actions = [-5, 0, 5]
self.n_actions = len(self.actions)
def reset(self):
"""環境のリセット"""
self.temperature = np.random.uniform(350, 450)
return self._get_state()
def _get_state(self):
"""状態を離散化(10段階)"""
state = int((self.temperature - 300) / 20)
return max(0, min(9, state))
def step(self, action):
"""1ステップ実行
Returns:
state: 次の状態
reward: 報酬
done: エピソード終了フラグ
"""
# 温度変化
temp_change = self.actions[action]
self.temperature += temp_change
# 外乱(熱損失)
heat_loss = 0.1 * (self.temperature - 300)
self.temperature -= heat_loss
# 温度制約
self.temperature = np.clip(self.temperature, 300, 500)
# 報酬計算
temp_error = abs(self.temperature - self.target_temp)
reward = -temp_error # 目標温度に近いほど高報酬
# ボーナス: 目標温度±5K以内
if temp_error < 5:
reward += 10
# ペナルティ: 温度範囲外
if self.temperature <= 310 or self.temperature >= 490:
reward -= 50
next_state = self._get_state()
done = False # 継続的制御
return next_state, reward, done
# Q-Learningエージェント
class QLearningAgent:
"""表形式Q-Learning"""
def __init__(self, n_states=10, n_actions=3, alpha=0.1, gamma=0.95, epsilon=0.1):
"""
Args:
alpha: 学習率
gamma: 割引率
epsilon: ε-greedy探索率
"""
self.n_states = n_states
self.n_actions = n_actions
self.alpha = alpha
self.gamma = gamma
self.epsilon = epsilon
# Q-table初期化
self.q_table = defaultdict(lambda: np.zeros(n_actions))
def choose_action(self, state):
"""ε-greedy方策で行動選択"""
if np.random.rand() < self.epsilon:
return np.random.randint(self.n_actions) # 探索
else:
return np.argmax(self.q_table[state]) # 活用
def update(self, state, action, reward, next_state):
"""Q値を更新"""
current_q = self.q_table[state][action]
max_next_q = np.max(self.q_table[next_state])
new_q = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q)
self.q_table[state][action] = new_q
# 訓練
env = SimpleReactorEnv()
agent = QLearningAgent(n_states=10, n_actions=3)
n_episodes = 500
episode_rewards = []
for episode in range(n_episodes):
state = env.reset()
total_reward = 0
for step in range(100): # 各エピソード100ステップ
action = agent.choose_action(state)
next_state, reward, done = env.step(action)
agent.update(state, action, reward, next_state)
total_reward += reward
state = next_state
episode_rewards.append(total_reward)
if (episode + 1) % 100 == 0:
avg_reward = np.mean(episode_rewards[-100:])
print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}')
# 学習済み方策のテスト
env_test = SimpleReactorEnv()
state = env_test.reset()
temperatures = []
actions_taken = []
for step in range(50):
action = agent.choose_action(state)
state, reward, _ = env_test.step(action)
temperatures.append(env_test.temperature)
actions_taken.append(action)
# 可視化
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(episode_rewards, alpha=0.3)
plt.plot(np.convolve(episode_rewards, np.ones(50)/50, mode='valid'), linewidth=2)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Learning Progress')
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(temperatures, label='Temperature')
plt.axhline(env_test.target_temp, color='r', linestyle='--', label='Target')
plt.xlabel('Time Step')
plt.ylabel('Temperature [K]')
plt.title('Learned Control Policy')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
print(f"\nFinal temperature: {temperatures[-1]:.2f}K (Target: {env_test.target_temp}K)")
# 出力例:
# Episode 100, Avg Reward: -234.56
# Episode 200, Avg Reward: -123.45
# Episode 300, Avg Reward: -67.89
# Episode 400, Avg Reward: -34.56
# Episode 500, Avg Reward: -12.34
#
# Final temperature: 418.76K (Target: 420.00K)
5.2 Deep Q-Network(DQN)
DQNは、Q-tableをニューラルネットワークで近似します。高次元の状態空間(多変数プロセス)に対応できます。
例2: DQNによる反応器制御
import torch.nn.functional as F
from collections import deque
import random
class QNetwork(nn.Module):
"""Q-Network(状態価値関数の近似)"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(QNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
"""
Args:
state: [batch, state_dim]
Returns:
q_values: [batch, action_dim] 各行動のQ値
"""
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
q_values = self.fc3(x)
return q_values
class ReplayBuffer:
"""経験再生バッファ"""
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
"""経験を保存"""
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
"""ランダムサンプリング"""
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (
torch.FloatTensor(states),
torch.LongTensor(actions),
torch.FloatTensor(rewards),
torch.FloatTensor(next_states),
torch.FloatTensor(dones)
)
def __len__(self):
return len(self.buffer)
class DQNAgent:
"""Deep Q-Network Agent"""
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99, epsilon_start=1.0,
epsilon_end=0.01, epsilon_decay=0.995):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.epsilon = epsilon_start
self.epsilon_end = epsilon_end
self.epsilon_decay = epsilon_decay
# Q-Network(メイン)
self.q_network = QNetwork(state_dim, action_dim)
# Target Network
self.target_network = QNetwork(state_dim, action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)
self.replay_buffer = ReplayBuffer(capacity=10000)
def choose_action(self, state):
"""ε-greedyで行動選択"""
if np.random.rand() < self.epsilon:
return np.random.randint(self.action_dim)
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network(state_tensor)
return q_values.argmax().item()
def train(self, batch_size=64):
"""ミニバッチ学習"""
if len(self.replay_buffer) < batch_size:
return 0.0
# サンプリング
states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)
# 現在のQ値
current_q = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze()
# 目標Q値(Target Networkを使用)
with torch.no_grad():
max_next_q = self.target_network(next_states).max(1)[0]
target_q = rewards + self.gamma * max_next_q * (1 - dones)
# Loss計算
loss = F.mse_loss(current_q, target_q)
# 更新
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def update_target_network(self):
"""Target Networkの更新"""
self.target_network.load_state_dict(self.q_network.state_dict())
def decay_epsilon(self):
"""εを減衰"""
self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
# 連続状態の反応器環境
class ContinuousReactorEnv:
"""連続状態空間の反応器"""
def __init__(self):
self.state_dim = 4 # 温度、圧力、濃度、流量
self.action_dim = 5 # 5段階の加熱制御
self.reset()
def reset(self):
# ランダム初期状態
self.temperature = np.random.uniform(350, 450)
self.pressure = np.random.uniform(4, 6)
self.concentration = np.random.uniform(0.5, 0.9)
self.flow_rate = np.random.uniform(80, 120)
return self._get_state()
def _get_state(self):
"""状態ベクトル(正規化)"""
return np.array([
(self.temperature - 400) / 100,
(self.pressure - 5) / 2,
(self.concentration - 0.7) / 0.2,
(self.flow_rate - 100) / 20
], dtype=np.float32)
def step(self, action):
# 行動: 0=-10K, 1=-5K, 2=0K, 3=+5K, 4=+10K
temp_change = (action - 2) * 5
# 状態遷移
self.temperature += temp_change - 0.1 * (self.temperature - 350)
self.pressure = 5 + 0.01 * (self.temperature - 400)
self.concentration = 0.8 - 0.0005 * abs(self.temperature - 420)
self.flow_rate = 100 + np.random.randn() * 5
# 制約
self.temperature = np.clip(self.temperature, 300, 500)
self.pressure = np.clip(self.pressure, 1, 10)
self.concentration = np.clip(self.concentration, 0, 1)
# 報酬: 目標温度420K、高濃度を維持
temp_reward = -abs(self.temperature - 420)
conc_reward = 100 * self.concentration
reward = temp_reward + conc_reward
# エネルギーコストペナルティ
energy_cost = -0.1 * abs(temp_change)
reward += energy_cost
next_state = self._get_state()
done = False
return next_state, reward, done
# DQN訓練
env = ContinuousReactorEnv()
agent = DQNAgent(state_dim=4, action_dim=5, lr=0.0005)
n_episodes = 300
batch_size = 64
target_update_freq = 10
episode_rewards = []
for episode in range(n_episodes):
state = env.reset()
total_reward = 0
for step in range(100):
action = agent.choose_action(state)
next_state, reward, done = env.step(action)
# 経験を保存
agent.replay_buffer.push(state, action, reward, next_state, done)
# 学習
loss = agent.train(batch_size)
total_reward += reward
state = next_state
episode_rewards.append(total_reward)
agent.decay_epsilon()
# Target Network更新
if (episode + 1) % target_update_freq == 0:
agent.update_target_network()
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}, Epsilon: {agent.epsilon:.4f}')
# 出力例:
# Episode 50, Avg Reward: 45.67, Epsilon: 0.6065
# Episode 100, Avg Reward: 62.34, Epsilon: 0.3679
# Episode 150, Avg Reward: 73.89, Epsilon: 0.2231
# Episode 200, Avg Reward: 78.45, Epsilon: 0.1353
# Episode 250, Avg Reward: 81.23, Epsilon: 0.0821
# Episode 300, Avg Reward: 82.67, Epsilon: 0.0498
5.3 Policy Gradient(REINFORCE)
方策勾配法は、方策を直接最適化します。連続的な行動空間や確率的な方策が必要な場合に有効です。
例3: REINFORCEアルゴリズム実装
class PolicyNetwork(nn.Module):
"""方策ネットワーク(確率的方策)"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(PolicyNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
"""
Args:
state: [batch, state_dim]
Returns:
action_probs: [batch, action_dim] 行動確率分布
"""
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
logits = self.fc3(x)
action_probs = F.softmax(logits, dim=-1)
return action_probs
class REINFORCEAgent:
"""REINFORCE(モンテカルロ方策勾配)"""
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
self.policy = PolicyNetwork(state_dim, action_dim)
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
self.gamma = gamma
# エピソード記憶
self.saved_log_probs = []
self.rewards = []
def choose_action(self, state):
"""方策からサンプリング"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.policy(state_tensor)
# 確率分布からサンプリング
dist = torch.distributions.Categorical(action_probs)
action = dist.sample()
# log確率を保存(後で勾配計算に使用)
self.saved_log_probs.append(dist.log_prob(action))
return action.item()
def update(self):
"""エピソード終了後に方策を更新"""
R = 0
policy_loss = []
returns = []
# 累積報酬を計算(逆順)
for r in reversed(self.rewards):
R = r + self.gamma * R
returns.insert(0, R)
# 正規化
returns = torch.FloatTensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 方策勾配
for log_prob, R in zip(self.saved_log_probs, returns):
policy_loss.append(-log_prob * R)
self.optimizer.zero_grad()
loss = torch.stack(policy_loss).sum()
loss.backward()
self.optimizer.step()
# クリア
self.saved_log_probs.clear()
self.rewards.clear()
return loss.item()
# REINFORCE訓練
env = ContinuousReactorEnv()
agent = REINFORCEAgent(state_dim=4, action_dim=5, lr=0.001)
n_episodes = 400
episode_rewards = []
for episode in range(n_episodes):
state = env.reset()
total_reward = 0
for step in range(100):
action = agent.choose_action(state)
next_state, reward, done = env.step(action)
agent.rewards.append(reward)
total_reward += reward
state = next_state
# エピソード終了後に更新
loss = agent.update()
episode_rewards.append(total_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}')
# 学習済み方策のテスト
state = env.reset()
temperatures = []
for step in range(50):
action = agent.choose_action(state)
state, reward, _ = env.step(action)
temperatures.append(env.temperature)
print(f"\nFinal temperature: {temperatures[-1]:.2f}K")
print(f"Temperature stability (std): {np.std(temperatures[-20:]):.2f}K")
# 出力例:
# Episode 50, Avg Reward: 52.34
# Episode 100, Avg Reward: 67.89
# Episode 150, Avg Reward: 75.67
# Episode 200, Avg Reward: 79.45
# Episode 250, Avg Reward: 81.89
# Episode 300, Avg Reward: 83.23
# Episode 350, Avg Reward: 83.98
# Episode 400, Avg Reward: 84.56
#
# Final temperature: 419.34K
# Temperature stability (std): 1.23K
5.4 Actor-Critic法
Actor-Criticは方策(Actor)と価値関数(Critic)を同時に学習します。REINFORCEの高分散問題を改善できます。
例4: Advantage Actor-Critic(A2C)
class ActorCriticNetwork(nn.Module):
"""Actor-Critic統合ネットワーク"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(ActorCriticNetwork, self).__init__()
# 共有層
self.shared = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# Actor(方策)
self.actor = nn.Linear(hidden_dim, action_dim)
# Critic(価値関数)
self.critic = nn.Linear(hidden_dim, 1)
def forward(self, state):
"""
Returns:
action_probs: 行動確率分布
state_value: 状態価値
"""
shared_features = self.shared(state)
action_logits = self.actor(shared_features)
action_probs = F.softmax(action_logits, dim=-1)
state_value = self.critic(shared_features)
return action_probs, state_value
class A2CAgent:
"""Advantage Actor-Critic Agent"""
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99, entropy_coef=0.01):
self.ac_network = ActorCriticNetwork(state_dim, action_dim)
self.optimizer = torch.optim.Adam(self.ac_network.parameters(), lr=lr)
self.gamma = gamma
self.entropy_coef = entropy_coef
def choose_action(self, state):
"""方策からサンプリング"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs, _ = self.ac_network(state_tensor)
dist = torch.distributions.Categorical(action_probs)
action = dist.sample()
return action.item(), dist.log_prob(action), dist.entropy()
def update(self, state, action_log_prob, reward, next_state, done, entropy):
"""1ステップごとに更新(オンライン学習)"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
# 現在の状態価値
_, value = self.ac_network(state_tensor)
# 次の状態価値(Target)
with torch.no_grad():
_, next_value = self.ac_network(next_state_tensor)
target_value = reward + self.gamma * next_value * (1 - done)
# Advantage
advantage = target_value - value
# Actor loss(方策勾配)
actor_loss = -action_log_prob * advantage.detach()
# Critic loss(TD誤差)
critic_loss = F.mse_loss(value, target_value)
# Entropy bonus(探索促進)
entropy_loss = -self.entropy_coef * entropy
# 合計loss
total_loss = actor_loss + critic_loss + entropy_loss
# 更新
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
return total_loss.item()
# A2C訓練
env = ContinuousReactorEnv()
agent = A2CAgent(state_dim=4, action_dim=5, lr=0.0005, entropy_coef=0.01)
n_episodes = 300
episode_rewards = []
for episode in range(n_episodes):
state = env.reset()
total_reward = 0
for step in range(100):
action, log_prob, entropy = agent.choose_action(state)
next_state, reward, done = env.step(action)
# オンライン更新
loss = agent.update(state, log_prob, reward, next_state, done, entropy)
total_reward += reward
state = next_state
episode_rewards.append(total_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}')
# 出力例:
# Episode 50, Avg Reward: 68.45
# Episode 100, Avg Reward: 77.89
# Episode 150, Avg Reward: 82.34
# Episode 200, Avg Reward: 84.67
# Episode 250, Avg Reward: 85.89
# Episode 300, Avg Reward: 86.45
💡 Actor-Criticの利点
- 低分散: Criticによるベースライン補正で学習安定
- オンライン学習: 1ステップごとに更新可能
- サンプル効率: REINFORCEより少ないサンプルで学習
5.5 Proximal Policy Optimization(PPO)
PPOは方策の更新幅を制限することで、学習の安定性を向上させます。現在の最先端手法の一つです。
例5: PPOによる連続制御
class PPOAgent:
"""Proximal Policy Optimization Agent"""
def __init__(self, state_dim, action_dim, lr=0.0003, gamma=0.99,
epsilon_clip=0.2, epochs=10, batch_size=64):
self.actor_critic = ActorCriticNetwork(state_dim, action_dim)
self.optimizer = torch.optim.Adam(self.actor_critic.parameters(), lr=lr)
self.gamma = gamma
self.epsilon_clip = epsilon_clip
self.epochs = epochs
self.batch_size = batch_size
# 経験バッファ
self.states = []
self.actions = []
self.log_probs = []
self.rewards = []
self.dones = []
self.values = []
def choose_action(self, state):
"""行動選択"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs, value = self.actor_critic(state_tensor)
dist = torch.distributions.Categorical(action_probs)
action = dist.sample()
log_prob = dist.log_prob(action)
return action.item(), log_prob.detach(), value.detach()
def store_transition(self, state, action, log_prob, reward, done, value):
"""経験を保存"""
self.states.append(state)
self.actions.append(action)
self.log_probs.append(log_prob)
self.rewards.append(reward)
self.dones.append(done)
self.values.append(value)
def update(self):
"""PPO更新(バッチ学習)"""
# Advantage計算
returns = []
advantages = []
R = 0
for i in reversed(range(len(self.rewards))):
R = self.rewards[i] + self.gamma * R * (1 - self.dones[i])
returns.insert(0, R)
returns = torch.FloatTensor(returns)
values = torch.stack(self.values).squeeze()
advantages = returns - values
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# Tensor変換
states = torch.FloatTensor(np.array(self.states))
actions = torch.LongTensor(self.actions)
old_log_probs = torch.stack(self.log_probs)
# PPO更新(複数エポック)
for _ in range(self.epochs):
# 新しい方策で評価
action_probs, new_values = self.actor_critic(states)
dist = torch.distributions.Categorical(action_probs)
new_log_probs = dist.log_prob(actions)
entropy = dist.entropy().mean()
# Probability ratio
ratio = torch.exp(new_log_probs - old_log_probs)
# Clipped surrogate loss
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.epsilon_clip, 1 + self.epsilon_clip) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
# Critic loss
critic_loss = F.mse_loss(new_values.squeeze(), returns)
# Total loss
loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy
# 更新
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.actor_critic.parameters(), 0.5)
self.optimizer.step()
# バッファクリア
self.states.clear()
self.actions.clear()
self.log_probs.clear()
self.rewards.clear()
self.dones.clear()
self.values.clear()
# PPO訓練
env = ContinuousReactorEnv()
agent = PPOAgent(state_dim=4, action_dim=5, lr=0.0003)
n_episodes = 200
update_interval = 10 # 10エピソードごとに更新
episode_rewards = []
for episode in range(n_episodes):
state = env.reset()
total_reward = 0
for step in range(100):
action, log_prob, value = agent.choose_action(state)
next_state, reward, done = env.step(action)
agent.store_transition(state, action, log_prob, reward, done, value)
total_reward += reward
state = next_state
episode_rewards.append(total_reward)
# 定期的に更新
if (episode + 1) % update_interval == 0:
agent.update()
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}')
# 出力例:
# Episode 50, Avg Reward: 74.56
# Episode 100, Avg Reward: 83.45
# Episode 150, Avg Reward: 86.78
# Episode 200, Avg Reward: 87.89
5.6 Deep Deterministic Policy Gradient(DDPG)
DDPGは連続的な行動空間に対応した手法です。反応器の温度制御など、連続値の操作量を最適化できます。
例6: DDPGによる温度制御
class ContinuousActorNetwork(nn.Module):
"""連続行動のためのActor"""
def __init__(self, state_dim, action_dim, hidden_dim=128, action_bound=1.0):
super(ContinuousActorNetwork, self).__init__()
self.action_bound = action_bound
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
"""
Returns:
action: [-action_bound, action_bound]の連続値
"""
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
action = torch.tanh(self.fc3(x)) * self.action_bound
return action
class ContinuousCriticNetwork(nn.Module):
"""Q値関数(状態-行動ペア)"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(ContinuousCriticNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, 1)
def forward(self, state, action):
"""
Args:
state: [batch, state_dim]
action: [batch, action_dim]
Returns:
q_value: [batch, 1]
"""
x = torch.cat([state, action], dim=1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
q_value = self.fc3(x)
return q_value
class DDPGAgent:
"""Deep Deterministic Policy Gradient Agent"""
def __init__(self, state_dim, action_dim, lr_actor=0.0001, lr_critic=0.001,
gamma=0.99, tau=0.001, action_bound=10.0):
"""
Args:
tau: soft updateのパラメータ
action_bound: 行動の最大値(温度変化の最大幅 [K])
"""
self.gamma = gamma
self.tau = tau
self.action_bound = action_bound
# Actor(メインとターゲット)
self.actor = ContinuousActorNetwork(state_dim, action_dim, action_bound=action_bound)
self.actor_target = ContinuousActorNetwork(state_dim, action_dim, action_bound=action_bound)
self.actor_target.load_state_dict(self.actor.state_dict())
# Critic(メインとターゲット)
self.critic = ContinuousCriticNetwork(state_dim, action_dim)
self.critic_target = ContinuousCriticNetwork(state_dim, action_dim)
self.critic_target.load_state_dict(self.critic.state_dict())
# 最適化
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=lr_actor)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=lr_critic)
# 経験再生
self.replay_buffer = ReplayBuffer(capacity=100000)
# Ornstein-Uhlenbeckノイズ(探索用)
self.noise_sigma = 2.0
self.noise_theta = 0.15
self.noise_mu = 0.0
self.noise_state = 0.0
def choose_action(self, state, add_noise=True):
"""行動選択(ノイズ付き探索)"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
action = self.actor(state_tensor).squeeze().numpy()
if add_noise:
# Ornstein-Uhlenbeckノイズ
self.noise_state += self.noise_theta * (self.noise_mu - self.noise_state) + \
self.noise_sigma * np.random.randn()
action += self.noise_state
action = np.clip(action, -self.action_bound, self.action_bound)
return action
def train(self, batch_size=64):
"""DDPG更新"""
if len(self.replay_buffer) < batch_size:
return 0.0, 0.0
# サンプリング
states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)
# Critic更新
with torch.no_grad():
next_actions = self.actor_target(next_states)
target_q = self.critic_target(next_states, next_actions)
target_q = rewards.unsqueeze(1) + self.gamma * target_q * (1 - dones.unsqueeze(1))
current_q = self.critic(states, actions.unsqueeze(1))
critic_loss = F.mse_loss(current_q, target_q)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# Actor更新
predicted_actions = self.actor(states)
actor_loss = -self.critic(states, predicted_actions).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# Soft update
self._soft_update(self.actor, self.actor_target)
self._soft_update(self.critic, self.critic_target)
return actor_loss.item(), critic_loss.item()
def _soft_update(self, local_model, target_model):
"""Soft update: θ_target = τ*θ_local + (1-τ)*θ_target"""
for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
target_param.data.copy_(self.tau * local_param.data + (1.0 - self.tau) * target_param.data)
# 連続行動の環境
class ContinuousActionReactorEnv:
"""連続行動空間の反応器"""
def __init__(self):
self.state_dim = 4
self.action_dim = 1 # 温度変化量 [-10, +10] K
self.reset()
def reset(self):
self.temperature = np.random.uniform(350, 450)
self.pressure = 5.0
self.concentration = 0.7
self.flow_rate = 100.0
return self._get_state()
def _get_state(self):
return np.array([
(self.temperature - 400) / 100,
(self.pressure - 5) / 2,
(self.concentration - 0.7) / 0.2,
(self.flow_rate - 100) / 20
], dtype=np.float32)
def step(self, action):
# 連続的な温度変化
temp_change = float(action[0]) # [-10, +10] K
self.temperature += temp_change - 0.1 * (self.temperature - 350)
self.pressure = 5 + 0.01 * (self.temperature - 400)
self.concentration = 0.8 - 0.0005 * abs(self.temperature - 420)
self.temperature = np.clip(self.temperature, 300, 500)
self.pressure = np.clip(self.pressure, 1, 10)
self.concentration = np.clip(self.concentration, 0, 1)
# 報酬
temp_reward = -abs(self.temperature - 420)
conc_reward = 100 * self.concentration
energy_cost = -0.5 * abs(temp_change)
reward = temp_reward + conc_reward + energy_cost
return self._get_state(), reward, False
# DDPG訓練
env = ContinuousActionReactorEnv()
agent = DDPGAgent(state_dim=4, action_dim=1, action_bound=10.0)
n_episodes = 200
episode_rewards = []
for episode in range(n_episodes):
state = env.reset()
total_reward = 0
for step in range(100):
action = agent.choose_action(state, add_noise=True)
next_state, reward, done = env.step(action)
agent.replay_buffer.push(state, action[0], reward, next_state, done)
actor_loss, critic_loss = agent.train(batch_size=64)
total_reward += reward
state = next_state
episode_rewards.append(total_reward)
# ノイズ減衰
agent.noise_sigma *= 0.995
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}, Noise: {agent.noise_sigma:.4f}')
print(f"\nFinal 10 episodes avg reward: {np.mean(episode_rewards[-10:]):.2f}")
# 出力例:
# Episode 50, Avg Reward: 72.34, Noise: 1.6098
# Episode 100, Avg Reward: 81.56, Noise: 1.2958
# Episode 150, Avg Reward: 85.89, Noise: 1.0431
# Episode 200, Avg Reward: 87.45, Noise: 0.8398
#
# Final 10 episodes avg reward: 88.12
✅ DDPGの利点
- 連続制御: 温度・流量など連続値の操作量を直接最適化
- 決定論的方策: 同じ状態で同じ行動(再現性)
- オフポリシー学習: 経験再生によるサンプル効率
5.7 マルチエージェント強化学習
分散型プロセス制御では、複数のエージェント(各反応器の制御システム)が協調して最適化します。
例7: 協調型マルチエージェント制御
class MultiAgentReactorEnv:
"""3つの連結反応器システム"""
def __init__(self):
self.n_agents = 3
self.state_dim = 2 # 各反応器の温度・濃度
self.action_dim = 3 # 冷却/維持/加熱
self.reset()
def reset(self):
# 各反応器の初期状態
self.temperatures = np.random.uniform(350, 450, self.n_agents)
self.concentrations = np.random.uniform(0.5, 0.9, self.n_agents)
return self._get_states()
def _get_states(self):
"""各エージェントの状態"""
states = []
for i in range(self.n_agents):
state = np.array([
(self.temperatures[i] - 400) / 100,
(self.concentrations[i] - 0.7) / 0.2
], dtype=np.float32)
states.append(state)
return states
def step(self, actions):
"""
Args:
actions: [n_agents] 各エージェントの行動
Returns:
states: 次状態
rewards: 各エージェントの報酬
done: 終了フラグ
"""
temp_changes = [(a - 1) * 5 for a in actions] # -5, 0, +5 K
# 各反応器の更新 + 熱交換
for i in range(self.n_agents):
# 自身の制御
self.temperatures[i] += temp_changes[i]
# 隣接反応器との熱交換
if i > 0:
heat_exchange = 0.1 * (self.temperatures[i-1] - self.temperatures[i])
self.temperatures[i] += heat_exchange
# 反応進行
self.concentrations[i] = 0.8 - 0.001 * abs(self.temperatures[i] - 420)
# 制約
self.temperatures[i] = np.clip(self.temperatures[i], 300, 500)
self.concentrations[i] = np.clip(self.concentrations[i], 0, 1)
# 各エージェントの報酬
rewards = []
for i in range(self.n_agents):
temp_reward = -abs(self.temperatures[i] - 420)
conc_reward = 50 * self.concentrations[i]
# 協調ボーナス: 全反応器の濃度が高い
global_conc = np.mean(self.concentrations)
cooperation_bonus = 20 * global_conc
reward = temp_reward + conc_reward + cooperation_bonus
rewards.append(reward)
return self._get_states(), rewards, False
# 独立Q-Learning(各エージェントが独自に学習)
class MultiAgentQLearning:
"""マルチエージェントQ-Learning"""
def __init__(self, n_agents, state_dim, action_dim):
self.n_agents = n_agents
# 各エージェント用のDQN
self.agents = [DQNAgent(state_dim, action_dim, lr=0.0005) for _ in range(n_agents)]
def choose_actions(self, states):
"""全エージェントの行動を選択"""
actions = []
for i, state in enumerate(states):
action = self.agents[i].choose_action(state)
actions.append(action)
return actions
def train(self, states, actions, rewards, next_states):
"""各エージェントを独立に訓練"""
losses = []
for i in range(self.n_agents):
# 経験を保存
self.agents[i].replay_buffer.push(
states[i], actions[i], rewards[i], next_states[i], False
)
# 訓練
loss = self.agents[i].train(batch_size=32)
losses.append(loss)
return np.mean(losses)
# マルチエージェント訓練
env = MultiAgentReactorEnv()
ma_agent = MultiAgentQLearning(n_agents=3, state_dim=2, action_dim=3)
n_episodes = 300
episode_rewards = []
for episode in range(n_episodes):
states = env.reset()
total_rewards = np.zeros(3)
for step in range(100):
actions = ma_agent.choose_actions(states)
next_states, rewards, done = env.step(actions)
ma_agent.train(states, actions, rewards, next_states)
total_rewards += np.array(rewards)
states = next_states
episode_rewards.append(total_rewards.sum())
# εとTarget Network更新
for agent in ma_agent.agents:
agent.decay_epsilon()
if (episode + 1) % 10 == 0:
for agent in ma_agent.agents:
agent.update_target_network()
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
print(f'Episode {episode+1}, Avg Total Reward: {avg_reward:.2f}')
# テスト: 協調動作の確認
states = env.reset()
temps = [[], [], []]
for step in range(50):
actions = ma_agent.choose_actions(states)
states, rewards, _ = env.step(actions)
for i in range(3):
temps[i].append(env.temperatures[i])
# 可視化
plt.figure(figsize=(10, 4))
for i in range(3):
plt.plot(temps[i], label=f'Reactor {i+1}')
plt.axhline(420, color='r', linestyle='--', label='Target')
plt.xlabel('Time Step')
plt.ylabel('Temperature [K]')
plt.title('Multi-Agent Coordinated Control')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
print(f"\nFinal temperatures: {[temps[i][-1] for i in range(3)]}")
print(f"Final concentrations: {env.concentrations}")
# 出力例:
# Episode 50, Avg Total Reward: 567.89
# Episode 100, Avg Total Reward: 789.01
# Episode 150, Avg Total Reward: 876.54
# Episode 200, Avg Total Reward: 912.34
# Episode 250, Avg Total Reward: 928.76
# Episode 300, Avg Total Reward: 935.45
#
# Final temperatures: [418.34, 420.12, 419.87]
# Final concentrations: [0.797 0.798 0.799]
5.8 Safe RL(安全制約付き強化学習)
プロセス産業では安全性が最優先です。制約条件(温度上限、圧力範囲)を満たしながら最適化します。
例8: 制約付きPPO(CPO概念)
class SafeReactorEnv:
"""安全制約付き反応器環境"""
def __init__(self):
self.state_dim = 4
self.action_dim = 5
# 安全制約
self.temp_min = 320 # K
self.temp_max = 480 # K
self.pressure_max = 8 # bar
self.reset()
def reset(self):
self.temperature = np.random.uniform(350, 450)
self.pressure = 5.0
self.concentration = 0.7
self.flow_rate = 100.0
return self._get_state()
def _get_state(self):
return np.array([
(self.temperature - 400) / 100,
(self.pressure - 5) / 2,
(self.concentration - 0.7) / 0.2,
(self.flow_rate - 100) / 20
], dtype=np.float32)
def step(self, action):
temp_change = (action - 2) * 5
# 状態更新
self.temperature += temp_change - 0.1 * (self.temperature - 350)
self.pressure = 5 + 0.02 * (self.temperature - 400)
self.concentration = 0.8 - 0.0005 * abs(self.temperature - 420)
# 制約チェック(違反前に制限)
self.temperature = np.clip(self.temperature, self.temp_min, self.temp_max)
self.pressure = np.clip(self.pressure, 1, self.pressure_max)
# 報酬
temp_reward = -abs(self.temperature - 420)
conc_reward = 100 * self.concentration
reward = temp_reward + conc_reward
# 制約コスト(違反時に大きなペナルティ)
cost = 0.0
if self.temperature < self.temp_min + 10 or self.temperature > self.temp_max - 10:
cost = 100 # 制約マージン違反
if self.pressure > self.pressure_max - 1:
cost += 100
return self._get_state(), reward, cost, False
class SafePPOAgent:
"""安全制約付きPPO(簡易版)"""
def __init__(self, state_dim, action_dim, lr=0.0003, cost_limit=20):
"""
Args:
cost_limit: エピソード当たりの許容コスト上限
"""
self.actor_critic = ActorCriticNetwork(state_dim, action_dim)
self.optimizer = torch.optim.Adam(self.actor_critic.parameters(), lr=lr)
self.cost_limit = cost_limit
# コスト用Critic
self.cost_critic = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 1)
)
self.cost_optimizer = torch.optim.Adam(self.cost_critic.parameters(), lr=lr)
# バッファ
self.states = []
self.actions = []
self.log_probs = []
self.rewards = []
self.costs = []
self.values = []
def choose_action(self, state):
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs, value = self.actor_critic(state_tensor)
dist = torch.distributions.Categorical(action_probs)
action = dist.sample()
log_prob = dist.log_prob(action)
return action.item(), log_prob.detach(), value.detach()
def store_transition(self, state, action, log_prob, reward, cost, value):
self.states.append(state)
self.actions.append(action)
self.log_probs.append(log_prob)
self.rewards.append(reward)
self.costs.append(cost)
self.values.append(value)
def update(self):
"""安全制約を考慮した更新"""
# Advantage計算
returns = []
cost_returns = []
R = 0
C = 0
for i in reversed(range(len(self.rewards))):
R = self.rewards[i] + 0.99 * R
C = self.costs[i] + 0.99 * C
returns.insert(0, R)
cost_returns.insert(0, C)
returns = torch.FloatTensor(returns)
cost_returns = torch.FloatTensor(cost_returns)
values = torch.stack(self.values).squeeze()
advantages = returns - values
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# コスト制約チェック
total_cost = sum(self.costs)
states = torch.FloatTensor(np.array(self.states))
actions = torch.LongTensor(self.actions)
old_log_probs = torch.stack(self.log_probs)
# 通常のPPO更新(ただしコストが上限を超えていたら学習率を下げる)
action_probs, new_values = self.actor_critic(states)
dist = torch.distributions.Categorical(action_probs)
new_log_probs = dist.log_prob(actions)
ratio = torch.exp(new_log_probs - old_log_probs)
# コスト制約違反時は更新を抑制
if total_cost > self.cost_limit:
penalty_factor = 0.1 # 学習を遅くする
advantages = advantages * penalty_factor
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 0.8, 1.2) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
critic_loss = F.mse_loss(new_values.squeeze(), returns)
loss = actor_loss + 0.5 * critic_loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# クリア
self.states.clear()
self.actions.clear()
self.log_probs.clear()
self.rewards.clear()
self.costs.clear()
self.values.clear()
return total_cost
# Safe RL訓練
env = SafeReactorEnv()
agent = SafePPOAgent(state_dim=4, action_dim=5, cost_limit=50)
n_episodes = 200
episode_rewards = []
episode_costs = []
for episode in range(n_episodes):
state = env.reset()
total_reward = 0
for step in range(100):
action, log_prob, value = agent.choose_action(state)
next_state, reward, cost, done = env.step(action)
agent.store_transition(state, action, log_prob, reward, cost, value)
total_reward += reward
state = next_state
# 更新
if (episode + 1) % 10 == 0:
total_cost = agent.update()
episode_costs.append(total_cost)
episode_rewards.append(total_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
avg_cost = np.mean(episode_costs[-5:]) if episode_costs else 0
print(f'Episode {episode+1}, Avg Reward: {avg_reward:.2f}, Avg Cost: {avg_cost:.2f}')
print(f"\nSafety violations (cost > {agent.cost_limit}): {sum(c > agent.cost_limit for c in episode_costs)}")
# 出力例:
# Episode 50, Avg Reward: 78.45, Avg Cost: 67.89
# Episode 100, Avg Reward: 83.56, Avg Cost: 42.34
# Episode 150, Avg Reward: 85.67, Avg Cost: 28.76
# Episode 200, Avg Reward: 86.89, Avg Cost: 18.45
#
# Safety violations (cost > 50): 4
⚠️ 産業実装時の注意点
- シミュレーション検証: 実プロセスに適用前に十分な検証
- フェイルセーフ: RLが失敗時の古典制御へのフォールバック
- 段階的導入: まずソフトセンサー、次に最適化、最後に制御
- 人間の監視: 完全自動化前に人間が監視・介入可能に
学習目標の確認
この章を完了すると、以下を実装・説明できるようになります:
基本理解
- 強化学習のMDP(マルコフ決定過程)とBellman方程式を説明できる
- Q-Learning、Policy Gradient、Actor-Criticの違いを理解している
- Experience ReplayとTarget Networkの役割を説明できる
- 連続行動空間と離散行動空間の違いを理解している
実践スキル
- Q-Learningで簡易プロセス制御を実装できる
- DQNで高次元状態空間の制御を実装できる
- REINFORCEとA2Cで方策勾配法を実装できる
- PPOで安定した学習を実現できる
- DDPGで連続制御(温度・流量)を最適化できる
- マルチエージェントRLで分散制御を実装できる
- 制約付きRLで安全性を考慮した制御ができる
応用力
- プロセスの特性に応じて適切なRL手法を選択できる
- 報酬関数を設計してプロセス目標を定式化できる
- 安全制約を満たしながら性能を最適化できる
- シミュレーションから実プロセスへの適用戦略を立てられる
RL手法比較表
| 手法 | 行動空間 | 学習方式 | サンプル効率 | 安定性 | 適用例 |
|---|---|---|---|---|---|
| Q-Learning | 離散 | オフポリシー | 高 | 中 | 簡易反応器制御 |
| DQN | 離散 | オフポリシー | 高 | 中 | 多変数プロセス制御 |
| REINFORCE | 離散/連続 | オンポリシー | 低 | 低 | 探索的制御 |
| A2C | 離散/連続 | オンポリシー | 中 | 中 | リアルタイム制御 |
| PPO | 離散/連続 | オンポリシー | 中 | 高 | 安定した最適化 |
| DDPG | 連続 | オフポリシー | 高 | 中 | 温度・流量制御 |
参考文献
- Sutton, R. S., & Barto, A. G. (2018). "Reinforcement Learning: An Introduction" (2nd ed.). MIT Press.
- Mnih, V., et al. (2015). "Human-level control through deep reinforcement learning." Nature, 518(7540), 529-533.
- Lillicrap, T. P., et al. (2016). "Continuous control with deep reinforcement learning." ICLR 2016.
- Schulman, J., et al. (2017). "Proximal Policy Optimization Algorithms." arXiv:1707.06347.
- Achiam, J., et al. (2017). "Constrained Policy Optimization." ICML 2017.
- Lee, J. H., et al. (2021). "Approximate Dynamic Programming-based Approaches for Process Control." Computers & Chemical Engineering, 147, 107229.