4.1 Vanilla Autoencoder(基本オートエンコーダ)
オートエンコーダは、入力データを低次元の潜在表現に圧縮(エンコード)し、元のデータを再構成(デコード)するニューラルネットワークです。プロセスデータの次元削減や特徴抽出に有効です。
💡 Autoencoderの基本構成
- Encoder: 高次元データ → 低次元潜在表現(ボトルネック)
- Decoder: 潜在表現 → 再構成データ
- 目的: 再構成誤差を最小化 \(\mathcal{L} = ||x - \hat{x}||^2\)
例1: プロセスデータの次元削減
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
class VanillaAutoencoder(nn.Module):
"""基本的なオートエンコーダ"""
def __init__(self, input_dim, latent_dim=8):
"""
Args:
input_dim: 入力次元(プロセス変数の数)
latent_dim: 潜在空間の次元
"""
super(VanillaAutoencoder, self).__init__()
# Encoder: 入力 → 潜在表現
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, latent_dim)
)
# Decoder: 潜在表現 → 再構成
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 32),
nn.ReLU(),
nn.Linear(32, 64),
nn.ReLU(),
nn.Linear(64, input_dim)
)
def forward(self, x):
"""
Args:
x: [batch, input_dim] 入力データ
Returns:
reconstructed: [batch, input_dim] 再構成データ
latent: [batch, latent_dim] 潜在表現
"""
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed, latent
# 合成プロセスデータ生成(10変数 → 2次元に圧縮)
def generate_process_data(n_samples=1000, n_features=10):
"""多変量プロセスデータ(実際は2つの主成分で決定)"""
# 2つの潜在因子
z1 = np.random.randn(n_samples)
z2 = np.random.randn(n_samples)
# 10変数を生成(z1, z2の線形結合 + ノイズ)
data = np.zeros((n_samples, n_features))
for i in range(n_features):
w1 = np.random.randn()
w2 = np.random.randn()
data[:, i] = w1 * z1 + w2 * z2 + 0.1 * np.random.randn(n_samples)
return data
# データ生成と正規化
data = generate_process_data(n_samples=1000, n_features=10)
data_mean = data.mean(axis=0)
data_std = data.std(axis=0)
data_normalized = (data - data_mean) / (data_std + 1e-8)
# Tensor変換
data_tensor = torch.FloatTensor(data_normalized)
# モデル訓練
model = VanillaAutoencoder(input_dim=10, latent_dim=2)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
model.train()
optimizer.zero_grad()
reconstructed, latent = model(data_tensor)
loss = criterion(reconstructed, data_tensor)
loss.backward()
optimizer.step()
if (epoch + 1) % 20 == 0:
print(f'Epoch {epoch+1}, Reconstruction Loss: {loss.item():.6f}')
# 潜在空間の可視化
model.eval()
with torch.no_grad():
_, latent_codes = model(data_tensor)
latent_codes = latent_codes.numpy()
plt.figure(figsize=(8, 6))
plt.scatter(latent_codes[:, 0], latent_codes[:, 1], alpha=0.5, s=20)
plt.xlabel('Latent Dimension 1')
plt.ylabel('Latent Dimension 2')
plt.title('Latent Space Representation (10D → 2D)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
# plt.savefig('latent_space.png', dpi=150)
print(f"\nLatent space shape: {latent_codes.shape}")
print(f"Latent space range: [{latent_codes.min():.2f}, {latent_codes.max():.2f}]")
# 出力例:
# Epoch 20, Reconstruction Loss: 0.125678
# Epoch 40, Reconstruction Loss: 0.056789
# Epoch 60, Reconstruction Loss: 0.034567
# Epoch 80, Reconstruction Loss: 0.023456
# Epoch 100, Reconstruction Loss: 0.018901
#
# Latent space shape: (1000, 2)
# Latent space range: [-3.45, 3.78]
4.2 Denoising Autoencoder(ノイズ除去)
センサーノイズを含むプロセスデータから、真の信号を復元します。入力にノイズを加え、元の信号を再構成するよう訓練します。
例2: センサーデータのノイズ除去
class DenoisingAutoencoder(nn.Module):
"""ノイズ除去オートエンコーダ"""
def __init__(self, input_dim, latent_dim=16):
super(DenoisingAutoencoder, self).__init__()
# Encoder
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, latent_dim)
)
# Decoder
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.ReLU(),
nn.Linear(64, 128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, input_dim)
)
def forward(self, x):
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed
def add_noise(data, noise_factor=0.3):
"""ガウスノイズを追加
Args:
data: [samples, features] クリーンデータ
noise_factor: ノイズの強度
Returns:
noisy_data: ノイズが加わったデータ
"""
noise = torch.randn_like(data) * noise_factor
noisy_data = data + noise
return noisy_data
# クリーンなプロセスデータ
clean_data = torch.FloatTensor(generate_process_data(n_samples=1000, n_features=20))
# 正規化
clean_mean = clean_data.mean(dim=0, keepdim=True)
clean_std = clean_data.std(dim=0, keepdim=True)
clean_normalized = (clean_data - clean_mean) / (clean_std + 1e-8)
# ノイズ追加
noisy_data = add_noise(clean_normalized, noise_factor=0.5)
# モデル訓練
model = DenoisingAutoencoder(input_dim=20, latent_dim=16)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
model.train()
optimizer.zero_grad()
# ノイズ入力 → クリーン出力を学習
denoised = model(noisy_data)
loss = criterion(denoised, clean_normalized)
loss.backward()
optimizer.step()
if (epoch + 1) % 20 == 0:
print(f'Epoch {epoch+1}, Denoising Loss: {loss.item():.6f}')
# テスト:新しいノイズデータを除去
model.eval()
with torch.no_grad():
test_clean = clean_normalized[:10]
test_noisy = add_noise(test_clean, noise_factor=0.5)
test_denoised = model(test_noisy)
# SNR(信号対雑音比)を計算
noise_power = torch.mean((test_noisy - test_clean) ** 2)
residual_power = torch.mean((test_denoised - test_clean) ** 2)
snr_before = 10 * torch.log10(torch.mean(test_clean ** 2) / noise_power)
snr_after = 10 * torch.log10(torch.mean(test_clean ** 2) / residual_power)
print(f"\nSNR before denoising: {snr_before.item():.2f} dB")
print(f"SNR after denoising: {snr_after.item():.2f} dB")
print(f"Improvement: {(snr_after - snr_before).item():.2f} dB")
# 1変数の可視化
time_steps = np.arange(10)
var_idx = 0
plt.figure(figsize=(10, 4))
plt.plot(time_steps, test_clean[:, var_idx].numpy(), 'g-', label='Clean', linewidth=2)
plt.plot(time_steps, test_noisy[:, var_idx].numpy(), 'r--', label='Noisy', alpha=0.7)
plt.plot(time_steps, test_denoised[:, var_idx].numpy(), 'b-', label='Denoised', linewidth=2)
plt.xlabel('Sample')
plt.ylabel('Normalized Value')
plt.title(f'Denoising Performance (Variable {var_idx+1})')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
# 出力例:
# Epoch 20, Denoising Loss: 0.234567
# Epoch 40, Denoising Loss: 0.123456
# Epoch 60, Denoising Loss: 0.078901
# Epoch 80, Denoising Loss: 0.056789
# Epoch 100, Denoising Loss: 0.045678
#
# SNR before denoising: 5.23 dB
# SNR after denoising: 18.76 dB
# Improvement: 13.53 dB
4.3 Variational Autoencoder(VAE)
VAEは確率的な生成モデルで、潜在空間から新しいデータをサンプリングできます。プロセスデータの生成や補完に利用します。
💡 VAEの特徴
- 確率的エンコーディング: 潜在変数 \(z \sim \mathcal{N}(\mu, \sigma^2)\)
- KL divergence項: 潜在分布を標準正規分布に近づける正則化
- 生成能力: 潜在空間からサンプリングして新データ生成
VAEの損失関数:
$$\mathcal{L} = \mathbb{E}_{q(z|x)}[\log p(x|z)] - D_{KL}(q(z|x) || p(z))$$
例3: VAEによるプロセスデータ生成
class VAE(nn.Module):
"""Variational Autoencoder"""
def __init__(self, input_dim, latent_dim=8):
super(VAE, self).__init__()
# Encoder
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 64)
# 平均と分散を出力
self.fc_mu = nn.Linear(64, latent_dim)
self.fc_logvar = nn.Linear(64, latent_dim)
# Decoder
self.fc3 = nn.Linear(latent_dim, 64)
self.fc4 = nn.Linear(64, 128)
self.fc5 = nn.Linear(128, input_dim)
def encode(self, x):
"""Encoder: 平均と対数分散を出力
Returns:
mu: [batch, latent_dim] 平均
logvar: [batch, latent_dim] log(σ²)
"""
h = torch.relu(self.fc1(x))
h = torch.relu(self.fc2(h))
mu = self.fc_mu(h)
logvar = self.fc_logvar(h)
return mu, logvar
def reparameterize(self, mu, logvar):
"""Reparameterization trick
z = μ + σ * ε, where ε ~ N(0, 1)
"""
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
z = mu + eps * std
return z
def decode(self, z):
"""Decoder: 潜在変数から再構成"""
h = torch.relu(self.fc3(z))
h = torch.relu(self.fc4(h))
reconstructed = self.fc5(h)
return reconstructed
def forward(self, x):
"""
Returns:
reconstructed: 再構成データ
mu: 潜在変数の平均
logvar: 潜在変数の対数分散
"""
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
reconstructed = self.decode(z)
return reconstructed, mu, logvar
def vae_loss(reconstructed, original, mu, logvar, beta=1.0):
"""VAE loss = Reconstruction + KL divergence
Args:
beta: KL項の重み(β-VAE)
"""
# Reconstruction loss (MSE)
recon_loss = nn.functional.mse_loss(reconstructed, original, reduction='sum')
# KL divergence: -0.5 * Σ(1 + log(σ²) - μ² - σ²)
kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return recon_loss + beta * kl_loss
# プロセスデータ
data = generate_process_data(n_samples=1000, n_features=15)
data_normalized = (data - data.mean(axis=0)) / (data.std(axis=0) + 1e-8)
data_tensor = torch.FloatTensor(data_normalized)
# VAE訓練
model = VAE(input_dim=15, latent_dim=4)
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(150):
model.train()
optimizer.zero_grad()
reconstructed, mu, logvar = model(data_tensor)
loss = vae_loss(reconstructed, data_tensor, mu, logvar, beta=0.5)
loss.backward()
optimizer.step()
if (epoch + 1) % 30 == 0:
print(f'Epoch {epoch+1}, Loss: {loss.item():.2f}')
# 新しいデータ生成
model.eval()
with torch.no_grad():
# 潜在空間からサンプリング
z_samples = torch.randn(10, 4) # 標準正規分布から10サンプル
generated_data = model.decode(z_samples)
print(f"\nGenerated data shape: {generated_data.shape}")
print(f"Generated data range: [{generated_data.min():.2f}, {generated_data.max():.2f}]")
# 元データとの統計比較
original_mean = data_tensor.mean(dim=0)
generated_mean = generated_data.mean(dim=0)
print(f"\nOriginal data mean (first 5 vars): {original_mean[:5].numpy()}")
print(f"Generated data mean (first 5 vars): {generated_mean[:5].numpy()}")
# 出力例:
# Epoch 30, Loss: 8765.43
# Epoch 60, Loss: 4321.09
# Epoch 90, Loss: 2987.65
# Epoch 120, Loss: 2456.78
# Epoch 150, Loss: 2234.56
#
# Generated data shape: torch.Size([10, 15])
# Generated data range: [-2.87, 2.45]
#
# Original data mean (first 5 vars): [-0.01 0.02 -0.00 0.01 -0.02]
# Generated data mean (first 5 vars): [-0.15 0.23 -0.08 0.12 -0.18]
4.4 再構成誤差による異常検知
正常データで訓練したオートエンコーダは、異常データを正確に再構成できません。この性質を利用して異常を検知します。
例4: プロセス異常検知システム
class AnomalyDetector:
"""オートエンコーダベースの異常検知"""
def __init__(self, autoencoder, threshold_percentile=95):
"""
Args:
autoencoder: 訓練済みオートエンコーダ
threshold_percentile: 異常判定閾値(正常データの再構成誤差のパーセンタイル)
"""
self.autoencoder = autoencoder
self.threshold = None
self.threshold_percentile = threshold_percentile
def fit_threshold(self, normal_data):
"""正常データから閾値を決定
Args:
normal_data: [samples, features] 正常プロセスデータ
"""
self.autoencoder.eval()
with torch.no_grad():
reconstructed, _ = self.autoencoder(normal_data)
reconstruction_errors = torch.mean((normal_data - reconstructed) ** 2, dim=1)
# パーセンタイルで閾値を設定
self.threshold = torch.quantile(reconstruction_errors, self.threshold_percentile / 100.0)
print(f"Anomaly threshold set to: {self.threshold.item():.6f}")
print(f"Based on {self.threshold_percentile}th percentile of normal data errors")
return reconstruction_errors
def detect(self, data):
"""異常検知
Args:
data: [samples, features] テストデータ
Returns:
is_anomaly: [samples] ブール配列(True=異常)
scores: [samples] 異常スコア(再構成誤差)
"""
self.autoencoder.eval()
with torch.no_grad():
reconstructed, _ = self.autoencoder(data)
scores = torch.mean((data - reconstructed) ** 2, dim=1)
is_anomaly = scores > self.threshold
return is_anomaly, scores
# 正常データで訓練
normal_data = generate_process_data(n_samples=800, n_features=10)
normal_normalized = (normal_data - normal_data.mean(axis=0)) / (normal_data.std(axis=0) + 1e-8)
normal_tensor = torch.FloatTensor(normal_normalized)
# オートエンコーダ訓練
ae_model = VanillaAutoencoder(input_dim=10, latent_dim=3)
optimizer = optim.Adam(ae_model.parameters(), lr=0.001)
criterion = nn.MSELoss()
for epoch in range(100):
ae_model.train()
optimizer.zero_grad()
reconstructed, _ = ae_model(normal_tensor)
loss = criterion(reconstructed, normal_tensor)
loss.backward()
optimizer.step()
if (epoch + 1) % 25 == 0:
print(f'Training Epoch {epoch+1}, Loss: {loss.item():.6f}')
# 異常検知器の初期化
detector = AnomalyDetector(ae_model, threshold_percentile=95)
normal_errors = detector.fit_threshold(normal_tensor)
# 異常データ生成(一部の変数に大きな偏差を追加)
anomaly_data = generate_process_data(n_samples=200, n_features=10)
# 異常パターン1: 特定変数が異常値
anomaly_data[:100, 0] += 5.0 # 変数1が異常
# 異常パターン2: 複数変数の異常な相関
anomaly_data[100:, [2, 5, 7]] += 3.0
anomaly_normalized = (anomaly_data - normal_data.mean(axis=0)) / (normal_data.std(axis=0) + 1e-8)
anomaly_tensor = torch.FloatTensor(anomaly_normalized)
# 異常検知
is_anomaly, anomaly_scores = detector.detect(anomaly_tensor)
# 評価
n_detected = is_anomaly.sum().item()
detection_rate = 100 * n_detected / len(anomaly_tensor)
print(f"\nAnomaly detection results:")
print(f" Total test samples: {len(anomaly_tensor)}")
print(f" Detected anomalies: {n_detected}")
print(f" Detection rate: {detection_rate:.2f}%")
print(f" Score range: [{anomaly_scores.min():.6f}, {anomaly_scores.max():.6f}]")
# 正常データでの誤検知率
false_positive = (normal_errors > detector.threshold).sum().item()
fpr = 100 * false_positive / len(normal_errors)
print(f" False positive rate: {fpr:.2f}%")
# 可視化
plt.figure(figsize=(10, 4))
plt.hist(normal_errors.numpy(), bins=50, alpha=0.7, label='Normal', color='green')
plt.hist(anomaly_scores.numpy(), bins=50, alpha=0.7, label='Anomaly', color='red')
plt.axvline(detector.threshold.item(), color='black', linestyle='--', label='Threshold')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
plt.title('Anomaly Detection: Reconstruction Error Distribution')
plt.yscale('log')
plt.grid(True, alpha=0.3)
plt.tight_layout()
# 出力例:
# Training Epoch 25, Loss: 0.045678
# Training Epoch 50, Loss: 0.023456
# Training Epoch 75, Loss: 0.015678
# Training Epoch 100, Loss: 0.012345
# Anomaly threshold set to: 0.034567
# Based on 95th percentile of normal data errors
#
# Anomaly detection results:
# Total test samples: 200
# Detected anomalies: 187
# Detection rate: 93.50%
# Score range: [0.012345, 0.234567]
# False positive rate: 5.00%
4.5 Sparse Autoencoder(疎な特徴抽出)
スパース性正則化により、少数の重要な特徴のみを活性化させます。プロセス変数の中から重要な組み合わせを発見できます。
例5: スパース特徴抽出
class SparseAutoencoder(nn.Module):
"""スパースオートエンコーダ"""
def __init__(self, input_dim, latent_dim=20):
super(SparseAutoencoder, self).__init__()
# Encoder(潜在層を大きめに設定し、スパース性で制約)
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, latent_dim),
nn.ReLU() # 非負制約(ReLU)
)
# Decoder
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 128),
nn.ReLU(),
nn.Linear(128, input_dim)
)
def forward(self, x):
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed, latent
def sparse_loss(reconstructed, original, latent, sparsity_weight=0.01, sparsity_target=0.05):
"""スパース性を考慮した損失関数
Args:
sparsity_weight: スパース項の重み
sparsity_target: 目標スパース率(5%の活性化が理想など)
"""
# Reconstruction loss
recon_loss = nn.functional.mse_loss(reconstructed, original)
# Sparsity loss: KL divergence between target and actual activation
# 各ユニットの平均活性化率
rho = torch.mean(latent, dim=0) # [latent_dim]
rho_hat = torch.tensor([sparsity_target] * latent.size(1))
# KL(ρ || ρ̂)
kl_div = sparsity_target * torch.log(sparsity_target / (rho + 1e-8)) + \
(1 - sparsity_target) * torch.log((1 - sparsity_target) / (1 - rho + 1e-8))
sparsity_loss = torch.sum(kl_div)
return recon_loss + sparsity_weight * sparsity_loss
# プロセスデータ(30変数)
large_data = generate_process_data(n_samples=1000, n_features=30)
large_normalized = (large_data - large_data.mean(axis=0)) / (large_data.std(axis=0) + 1e-8)
large_tensor = torch.FloatTensor(large_normalized)
# スパースオートエンコーダ訓練
sparse_model = SparseAutoencoder(input_dim=30, latent_dim=20)
optimizer = optim.Adam(sparse_model.parameters(), lr=0.001)
for epoch in range(100):
sparse_model.train()
optimizer.zero_grad()
reconstructed, latent = sparse_model(large_tensor)
loss = sparse_loss(reconstructed, large_tensor, latent,
sparsity_weight=0.05, sparsity_target=0.1)
loss.backward()
optimizer.step()
if (epoch + 1) % 20 == 0:
# スパース率を計算
with torch.no_grad():
_, latent_codes = sparse_model(large_tensor)
sparsity_rate = (latent_codes < 0.01).float().mean().item()
print(f'Epoch {epoch+1}, Loss: {loss.item():.6f}, Sparsity: {100*sparsity_rate:.2f}%')
# 重要特徴の分析
sparse_model.eval()
with torch.no_grad():
_, latent_codes = sparse_model(large_tensor)
# 各潜在ユニットの活性化頻度
activation_freq = (latent_codes > 0.1).float().mean(dim=0).numpy()
# Top-5の活性的なユニット
top_units = np.argsort(activation_freq)[-5:][::-1]
print(f"\nTop 5 active latent units:")
for i, unit_id in enumerate(top_units):
print(f" {i+1}. Unit {unit_id}: {100*activation_freq[unit_id]:.2f}% activation")
# 各ユニットが表現する入力変数の重要度
encoder_weights = sparse_model.encoder[2].weight.data.numpy() # [latent, 128]
print(f"\nLatent unit representations:")
for unit_id in top_units[:3]:
weights = encoder_weights[unit_id]
top_inputs = np.argsort(np.abs(weights))[-5:][::-1]
print(f" Unit {unit_id} → Input variables: {top_inputs}")
# 出力例:
# Epoch 20, Loss: 0.567890, Sparsity: 62.34%
# Epoch 40, Loss: 0.345678, Sparsity: 78.56%
# Epoch 60, Loss: 0.234567, Sparsity: 85.23%
# Epoch 80, Loss: 0.178901, Sparsity: 88.45%
# Epoch 100, Loss: 0.145678, Sparsity: 89.67%
#
# Top 5 active latent units:
# 1. Unit 12: 45.67% activation
# 2. Unit 7: 38.92% activation
# 3. Unit 18: 32.45% activation
# 4. Unit 3: 28.76% activation
# 5. Unit 15: 24.33% activation
#
# Latent unit representations:
# Unit 12 → Input variables: [ 3 7 12 18 23]
# Unit 7 → Input variables: [ 1 5 14 19 27]
# Unit 18 → Input variables: [ 2 9 11 16 25]
4.6 Convolutional Autoencoder(画像圧縮)
畳み込み層を使用したオートエンコーダで、プロセス画像を効率的に圧縮・復元します。
例6: プロセス画像の圧縮
class ConvAutoencoder(nn.Module):
"""畳み込みオートエンコーダ"""
def __init__(self, latent_dim=64):
super(ConvAutoencoder, self).__init__()
# Encoder
self.encoder = nn.Sequential(
# 224x224 → 112x112
nn.Conv2d(3, 32, 3, stride=2, padding=1),
nn.ReLU(),
nn.BatchNorm2d(32),
# 112x112 → 56x56
nn.Conv2d(32, 64, 3, stride=2, padding=1),
nn.ReLU(),
nn.BatchNorm2d(64),
# 56x56 → 28x28
nn.Conv2d(64, 128, 3, stride=2, padding=1),
nn.ReLU(),
nn.BatchNorm2d(128),
# 28x28 → 14x14
nn.Conv2d(128, 256, 3, stride=2, padding=1),
nn.ReLU(),
nn.BatchNorm2d(256)
)
# Bottleneck: 256*14*14 → latent_dim
self.fc_encode = nn.Linear(256 * 14 * 14, latent_dim)
# Bottleneck: latent_dim → 256*14*14
self.fc_decode = nn.Linear(latent_dim, 256 * 14 * 14)
# Decoder
self.decoder = nn.Sequential(
# 14x14 → 28x28
nn.ConvTranspose2d(256, 128, 3, stride=2, padding=1, output_padding=1),
nn.ReLU(),
nn.BatchNorm2d(128),
# 28x28 → 56x56
nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
nn.ReLU(),
nn.BatchNorm2d(64),
# 56x56 → 112x112
nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
nn.ReLU(),
nn.BatchNorm2d(32),
# 112x112 → 224x224
nn.ConvTranspose2d(32, 3, 3, stride=2, padding=1, output_padding=1),
nn.Sigmoid() # 0-1に正規化
)
def forward(self, x):
"""
Args:
x: [batch, 3, 224, 224]
Returns:
reconstructed: [batch, 3, 224, 224]
latent: [batch, latent_dim]
"""
# Encode
encoded = self.encoder(x) # [batch, 256, 14, 14]
encoded_flat = encoded.view(encoded.size(0), -1) # [batch, 256*14*14]
latent = self.fc_encode(encoded_flat) # [batch, latent_dim]
# Decode
decoded_flat = self.fc_decode(latent) # [batch, 256*14*14]
decoded = decoded_flat.view(-1, 256, 14, 14) # [batch, 256, 14, 14]
reconstructed = self.decoder(decoded) # [batch, 3, 224, 224]
return reconstructed, latent
# ダミー画像データ(実際はプロセス画像)
dummy_images = torch.rand(16, 3, 224, 224) # 0-1に正規化済み
# モデル
conv_ae = ConvAutoencoder(latent_dim=128)
# 圧縮率の計算
original_size = 3 * 224 * 224 # 150,528
compressed_size = 128
compression_ratio = original_size / compressed_size
print(f"Compression ratio: {compression_ratio:.2f}x")
print(f"Original: {original_size:,} → Compressed: {compressed_size}")
# 訓練
criterion = nn.MSELoss()
optimizer = optim.Adam(conv_ae.parameters(), lr=0.001)
for epoch in range(30):
conv_ae.train()
optimizer.zero_grad()
reconstructed, latent = conv_ae(dummy_images)
loss = criterion(reconstructed, dummy_images)
loss.backward()
optimizer.step()
if (epoch + 1) % 10 == 0:
# PSNR(Peak Signal-to-Noise Ratio)を計算
mse = loss.item()
psnr = 10 * np.log10(1.0 / mse)
print(f'Epoch {epoch+1}, MSE: {mse:.6f}, PSNR: {psnr:.2f} dB')
# テスト
conv_ae.eval()
with torch.no_grad():
test_image = dummy_images[0:1]
reconstructed_image, latent_code = conv_ae(test_image)
# 画像の比較(最初のサンプル)
original_np = test_image[0].permute(1, 2, 0).numpy()
reconstructed_np = reconstructed_image[0].permute(1, 2, 0).numpy()
# 誤差
pixel_error = np.abs(original_np - reconstructed_np)
mean_error = pixel_error.mean()
print(f"\nReconstruction quality:")
print(f" Mean pixel error: {mean_error:.6f}")
print(f" Max pixel error: {pixel_error.max():.6f}")
# 出力例:
# Compression ratio: 1176.00x
# Original: 150,528 → Compressed: 128
# Epoch 10, MSE: 0.023456, PSNR: 16.30 dB
# Epoch 20, MSE: 0.012345, PSNR: 19.09 dB
# Epoch 30, MSE: 0.008901, PSNR: 20.51 dB
#
# Reconstruction quality:
# Mean pixel error: 0.008234
# Max pixel error: 0.234567
4.7 Conditional VAE(条件付き生成)
条件(プロセス条件、設定温度など)を指定して、その条件下でのプロセスデータを生成します。
例7: 条件付きプロセスデータ生成
class ConditionalVAE(nn.Module):
"""条件付きVAE(C-VAE)"""
def __init__(self, input_dim, condition_dim, latent_dim=8):
"""
Args:
input_dim: データ次元
condition_dim: 条件次元(温度、圧力設定値など)
latent_dim: 潜在次元
"""
super(ConditionalVAE, self).__init__()
# Encoder: データ + 条件 → 潜在変数
self.fc1 = nn.Linear(input_dim + condition_dim, 128)
self.fc2 = nn.Linear(128, 64)
self.fc_mu = nn.Linear(64, latent_dim)
self.fc_logvar = nn.Linear(64, latent_dim)
# Decoder: 潜在変数 + 条件 → データ
self.fc3 = nn.Linear(latent_dim + condition_dim, 64)
self.fc4 = nn.Linear(64, 128)
self.fc5 = nn.Linear(128, input_dim)
def encode(self, x, c):
"""Encode with condition
Args:
x: [batch, input_dim] データ
c: [batch, condition_dim] 条件
"""
h = torch.cat([x, c], dim=1) # 条件を結合
h = torch.relu(self.fc1(h))
h = torch.relu(self.fc2(h))
mu = self.fc_mu(h)
logvar = self.fc_logvar(h)
return mu, logvar
def decode(self, z, c):
"""Decode with condition
Args:
z: [batch, latent_dim] 潜在変数
c: [batch, condition_dim] 条件
"""
h = torch.cat([z, c], dim=1) # 条件を結合
h = torch.relu(self.fc3(h))
h = torch.relu(self.fc4(h))
reconstructed = self.fc5(h)
return reconstructed
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def forward(self, x, c):
mu, logvar = self.encode(x, c)
z = self.reparameterize(mu, logvar)
reconstructed = self.decode(z, c)
return reconstructed, mu, logvar
# 条件付きプロセスデータ生成
def generate_conditional_data(n_samples=1000):
"""条件(温度)に依存するプロセスデータ"""
# 条件: 反応温度 [300-500K]
temperature = np.random.uniform(300, 500, n_samples)
# データ: 温度に依存する5変数
data = np.zeros((n_samples, 5))
for i in range(n_samples):
T = temperature[i]
# 温度に依存する物理化学的関係
data[i, 0] = 0.001 * T**2 - 0.3 * T + 50 # 反応速度定数
data[i, 1] = 100 * np.exp(-5000 / T) # 平衡定数(Arrhenius型)
data[i, 2] = 0.5 * T + 50 # 圧力
data[i, 3] = -0.002 * T + 2.0 # pH
data[i, 4] = 0.01 * T # 濃度
# ノイズ
data[i] += np.random.randn(5) * 2
# 正規化
condition = (temperature - 300) / 200 # 0-1に正規化
condition = condition.reshape(-1, 1)
return data, condition
# データ生成
data, conditions = generate_conditional_data(n_samples=1000)
data_normalized = (data - data.mean(axis=0)) / (data.std(axis=0) + 1e-8)
data_tensor = torch.FloatTensor(data_normalized)
condition_tensor = torch.FloatTensor(conditions)
# C-VAE訓練
cvae = ConditionalVAE(input_dim=5, condition_dim=1, latent_dim=4)
optimizer = optim.Adam(cvae.parameters(), lr=0.001)
for epoch in range(150):
cvae.train()
optimizer.zero_grad()
reconstructed, mu, logvar = cvae(data_tensor, condition_tensor)
loss = vae_loss(reconstructed, data_tensor, mu, logvar, beta=0.5)
loss.backward()
optimizer.step()
if (epoch + 1) % 30 == 0:
print(f'Epoch {epoch+1}, Loss: {loss.item():.2f}')
# 条件を指定して生成
cvae.eval()
with torch.no_grad():
# 特定温度(例: 400K)でのデータ生成
target_temp = 400 # K
target_condition = torch.FloatTensor([[(target_temp - 300) / 200]]) # 正規化
# 潜在空間からサンプリング
z_samples = torch.randn(10, 4)
# 条件を10サンプル全てに適用
conditions_repeated = target_condition.repeat(10, 1)
# 生成
generated_data = cvae.decode(z_samples, conditions_repeated)
print(f"\nGenerated data for T={target_temp}K:")
print(f" Shape: {generated_data.shape}")
print(f" Mean (normalized): {generated_data.mean(dim=0).numpy()}")
# 異なる温度での生成比較
temps = [320, 380, 440, 500]
print(f"\nData generation at different temperatures:")
for temp in temps:
cond = torch.FloatTensor([[(temp - 300) / 200]])
z = torch.randn(1, 4)
gen = cvae.decode(z, cond)
print(f" T={temp}K: Variable 0 = {gen[0, 0].item():.4f}")
# 出力例:
# Epoch 30, Loss: 6789.01
# Epoch 60, Loss: 3456.78
# Epoch 90, Loss: 2345.67
# Epoch 120, Loss: 1987.65
# Epoch 150, Loss: 1765.43
#
# Generated data for T=400K:
# Shape: torch.Size([10, 5])
# Mean (normalized): [-0.12 0.34 -0.08 0.15 -0.23]
#
# Data generation at different temperatures:
# T=320K: Variable 0 = -1.2345
# T=380K: Variable 0 = -0.4567
# T=440K: Variable 0 = 0.6789
# T=500K: Variable 0 = 1.5432
4.8 GANによる合成プロセスデータ生成
Generative Adversarial Network(GAN)を使用して、実データと見分けがつかないプロセスデータを生成します。データ拡張や少量データ問題の解決に利用します。
例8: Process GAN
class Generator(nn.Module):
"""プロセスデータ生成器"""
def __init__(self, latent_dim=16, output_dim=10):
super(Generator, self).__init__()
self.model = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.LeakyReLU(0.2),
nn.BatchNorm1d(64),
nn.Linear(64, 128),
nn.LeakyReLU(0.2),
nn.BatchNorm1d(128),
nn.Linear(128, 256),
nn.LeakyReLU(0.2),
nn.BatchNorm1d(256),
nn.Linear(256, output_dim),
nn.Tanh() # -1~1に正規化
)
def forward(self, z):
"""
Args:
z: [batch, latent_dim] ノイズベクトル
Returns:
fake_data: [batch, output_dim] 生成データ
"""
return self.model(z)
class Discriminator(nn.Module):
"""プロセスデータ識別器(本物 vs 偽物)"""
def __init__(self, input_dim=10):
super(Discriminator, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_dim, 256),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(256, 128),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(64, 1),
nn.Sigmoid() # 0-1の確率
)
def forward(self, x):
"""
Args:
x: [batch, input_dim] データ
Returns:
validity: [batch, 1] 本物である確率
"""
return self.model(x)
# 実データ
real_data = generate_process_data(n_samples=1000, n_features=10)
real_normalized = 2 * (real_data - real_data.min(axis=0)) / \
(real_data.max(axis=0) - real_data.min(axis=0) + 1e-8) - 1 # -1~1
real_tensor = torch.FloatTensor(real_normalized)
# GAN初期化
latent_dim = 16
generator = Generator(latent_dim=latent_dim, output_dim=10)
discriminator = Discriminator(input_dim=10)
# 最適化
lr = 0.0002
optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))
criterion = nn.BCELoss()
# 訓練
batch_size = 64
n_epochs = 100
for epoch in range(n_epochs):
# Discriminatorの訓練
for _ in range(2): # Discriminatorを2回更新
optimizer_D.zero_grad()
# 実データ
idx = np.random.randint(0, len(real_tensor), batch_size)
real_batch = real_tensor[idx]
real_labels = torch.ones(batch_size, 1)
# 偽データ
z = torch.randn(batch_size, latent_dim)
fake_batch = generator(z).detach()
fake_labels = torch.zeros(batch_size, 1)
# Discriminatorの損失
real_loss = criterion(discriminator(real_batch), real_labels)
fake_loss = criterion(discriminator(fake_batch), fake_labels)
d_loss = (real_loss + fake_loss) / 2
d_loss.backward()
optimizer_D.step()
# Generatorの訓練
optimizer_G.zero_grad()
z = torch.randn(batch_size, latent_dim)
fake_batch = generator(z)
# Generatorは偽物を本物と誤認させたい
g_loss = criterion(discriminator(fake_batch), torch.ones(batch_size, 1))
g_loss.backward()
optimizer_G.step()
if (epoch + 1) % 20 == 0:
print(f'Epoch {epoch+1}/{n_epochs}, D Loss: {d_loss.item():.4f}, G Loss: {g_loss.item():.4f}')
# 生成データの評価
generator.eval()
with torch.no_grad():
z_samples = torch.randn(1000, latent_dim)
generated_data = generator(z_samples).numpy()
# 統計比較
print(f"\nStatistical comparison:")
print(f"Real data mean: {real_normalized.mean(axis=0)[:3]}")
print(f"Generated mean: {generated_data.mean(axis=0)[:3]}")
print(f"Real data std: {real_normalized.std(axis=0)[:3]}")
print(f"Generated std: {generated_data.std(axis=0)[:3]}")
# Discriminatorによる評価
real_score = discriminator(real_tensor).mean().item()
fake_score = discriminator(torch.FloatTensor(generated_data)).mean().item()
print(f"\nDiscriminator scores:")
print(f" Real data: {real_score:.4f} (1.0 = perfect real)")
print(f" Generated data: {fake_score:.4f} (0.5 = indistinguishable)")
# 出力例:
# Epoch 20/100, D Loss: 0.5678, G Loss: 0.8901
# Epoch 40/100, D Loss: 0.4567, G Loss: 1.0234
# Epoch 60/100, D Loss: 0.3890, G Loss: 1.2345
# Epoch 80/100, D Loss: 0.3456, G Loss: 1.3456
# Epoch 100/100, D Loss: 0.3234, G Loss: 1.4123
#
# Statistical comparison:
# Real data mean: [-0.023 0.045 -0.012]
# Generated mean: [-0.034 0.056 -0.018]
# Real data std: [0.567 0.623 0.589]
# Generated std: [0.543 0.598 0.612]
#
# Discriminator scores:
# Real data: 0.8234 (1.0 = perfect real)
# Generated data: 0.4876 (0.5 = indistinguishable)
✅ GANの応用
- データ拡張: 少量の実データから大量の合成データを生成
- 異常データ生成: 訓練データに不足する異常パターンを合成
- シミュレーション代替: 物理シミュレーションの高速代替
- プライバシー保護: 実データの統計的性質を保持した合成データ
学習目標の確認
この章を完了すると、以下を実装・説明できるようになります:
基本理解
- オートエンコーダのエンコーダ・デコーダ構造と再構成タスクを説明できる
- VAEのreparameterization trickとKL divergence項の役割を理解している
- GANの敵対的学習メカニズムを説明できる
- 再構成誤差に基づく異常検知の原理を理解している
実践スキル
- PyTorchでAutoencoderを実装し、プロセスデータを次元削減できる
- Denoising Autoencoderでセンサーノイズを除去できる
- VAEで新しいプロセスデータを生成できる
- 再構成誤差による異常検知システムを構築できる
- Sparse Autoencoderで重要な特徴組み合わせを抽出できる
- Convolutional Autoencoderで画像を圧縮できる
- Conditional VAEで条件付きデータ生成ができる
- GANで統計的に妥当な合成データを生成できる
応用力
- プロセスの特性に応じて適切なAutoencoderアーキテクチャを選択できる
- 異常検知の閾値を適切に設定し、誤検知率を制御できる
- 少量データ問題をGANによるデータ拡張で解決できる
- 条件付き生成で特定プロセス条件下のデータを合成できる
参考文献
- Hinton, G. E., & Salakhutdinov, R. R. (2006). "Reducing the Dimensionality of Data with Neural Networks." Science, 313(5786), 504-507.
- Vincent, P., et al. (2008). "Extracting and Composing Robust Features with Denoising Autoencoders." ICML 2008.
- Kingma, D. P., & Welling, M. (2014). "Auto-Encoding Variational Bayes." ICLR 2014.
- Goodfellow, I., et al. (2014). "Generative Adversarial Networks." NeurIPS 2014.
- Sakurada, M., & Yairi, T. (2014). "Anomaly Detection Using Autoencoders with Nonlinear Dimensionality Reduction." MLSDA Workshop 2014.
- Mirza, M., & Osindero, S. (2014). "Conditional Generative Adversarial Nets." arXiv:1411.1784.