第4章:オートエンコーダと生成モデル

次元削減、異常検知、プロセスデータ生成への応用

📖 読了時間: 35-40分 💡 難易度: 上級 🔬 実例: VAE・GAN・異常検知

4.1 Vanilla Autoencoder(基本オートエンコーダ)

オートエンコーダは、入力データを低次元の潜在表現に圧縮(エンコード)し、元のデータを再構成(デコード)するニューラルネットワークです。プロセスデータの次元削減や特徴抽出に有効です。

💡 Autoencoderの基本構成

  • Encoder: 高次元データ → 低次元潜在表現(ボトルネック)
  • Decoder: 潜在表現 → 再構成データ
  • 目的: 再構成誤差を最小化 \(\mathcal{L} = ||x - \hat{x}||^2\)

例1: プロセスデータの次元削減

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

class VanillaAutoencoder(nn.Module):
    """基本的なオートエンコーダ"""

    def __init__(self, input_dim, latent_dim=8):
        """
        Args:
            input_dim: 入力次元(プロセス変数の数)
            latent_dim: 潜在空間の次元
        """
        super(VanillaAutoencoder, self).__init__()

        # Encoder: 入力 → 潜在表現
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, latent_dim)
        )

        # Decoder: 潜在表現 → 再構成
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def forward(self, x):
        """
        Args:
            x: [batch, input_dim] 入力データ
        Returns:
            reconstructed: [batch, input_dim] 再構成データ
            latent: [batch, latent_dim] 潜在表現
        """
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed, latent

# 合成プロセスデータ生成(10変数 → 2次元に圧縮)
def generate_process_data(n_samples=1000, n_features=10):
    """多変量プロセスデータ(実際は2つの主成分で決定)"""
    # 2つの潜在因子
    z1 = np.random.randn(n_samples)
    z2 = np.random.randn(n_samples)

    # 10変数を生成(z1, z2の線形結合 + ノイズ)
    data = np.zeros((n_samples, n_features))
    for i in range(n_features):
        w1 = np.random.randn()
        w2 = np.random.randn()
        data[:, i] = w1 * z1 + w2 * z2 + 0.1 * np.random.randn(n_samples)

    return data

# データ生成と正規化
data = generate_process_data(n_samples=1000, n_features=10)
data_mean = data.mean(axis=0)
data_std = data.std(axis=0)
data_normalized = (data - data_mean) / (data_std + 1e-8)

# Tensor変換
data_tensor = torch.FloatTensor(data_normalized)

# モデル訓練
model = VanillaAutoencoder(input_dim=10, latent_dim=2)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(100):
    model.train()
    optimizer.zero_grad()

    reconstructed, latent = model(data_tensor)
    loss = criterion(reconstructed, data_tensor)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        print(f'Epoch {epoch+1}, Reconstruction Loss: {loss.item():.6f}')

# 潜在空間の可視化
model.eval()
with torch.no_grad():
    _, latent_codes = model(data_tensor)
    latent_codes = latent_codes.numpy()

plt.figure(figsize=(8, 6))
plt.scatter(latent_codes[:, 0], latent_codes[:, 1], alpha=0.5, s=20)
plt.xlabel('Latent Dimension 1')
plt.ylabel('Latent Dimension 2')
plt.title('Latent Space Representation (10D → 2D)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
# plt.savefig('latent_space.png', dpi=150)

print(f"\nLatent space shape: {latent_codes.shape}")
print(f"Latent space range: [{latent_codes.min():.2f}, {latent_codes.max():.2f}]")

# 出力例:
# Epoch 20, Reconstruction Loss: 0.125678
# Epoch 40, Reconstruction Loss: 0.056789
# Epoch 60, Reconstruction Loss: 0.034567
# Epoch 80, Reconstruction Loss: 0.023456
# Epoch 100, Reconstruction Loss: 0.018901
#
# Latent space shape: (1000, 2)
# Latent space range: [-3.45, 3.78]

4.2 Denoising Autoencoder(ノイズ除去)

センサーノイズを含むプロセスデータから、真の信号を復元します。入力にノイズを加え、元の信号を再構成するよう訓練します。

例2: センサーデータのノイズ除去

class DenoisingAutoencoder(nn.Module):
    """ノイズ除去オートエンコーダ"""

    def __init__(self, input_dim, latent_dim=16):
        super(DenoisingAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, input_dim)
        )

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed

def add_noise(data, noise_factor=0.3):
    """ガウスノイズを追加

    Args:
        data: [samples, features] クリーンデータ
        noise_factor: ノイズの強度

    Returns:
        noisy_data: ノイズが加わったデータ
    """
    noise = torch.randn_like(data) * noise_factor
    noisy_data = data + noise
    return noisy_data

# クリーンなプロセスデータ
clean_data = torch.FloatTensor(generate_process_data(n_samples=1000, n_features=20))

# 正規化
clean_mean = clean_data.mean(dim=0, keepdim=True)
clean_std = clean_data.std(dim=0, keepdim=True)
clean_normalized = (clean_data - clean_mean) / (clean_std + 1e-8)

# ノイズ追加
noisy_data = add_noise(clean_normalized, noise_factor=0.5)

# モデル訓練
model = DenoisingAutoencoder(input_dim=20, latent_dim=16)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(100):
    model.train()
    optimizer.zero_grad()

    # ノイズ入力 → クリーン出力を学習
    denoised = model(noisy_data)
    loss = criterion(denoised, clean_normalized)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        print(f'Epoch {epoch+1}, Denoising Loss: {loss.item():.6f}')

# テスト:新しいノイズデータを除去
model.eval()
with torch.no_grad():
    test_clean = clean_normalized[:10]
    test_noisy = add_noise(test_clean, noise_factor=0.5)
    test_denoised = model(test_noisy)

    # SNR(信号対雑音比)を計算
    noise_power = torch.mean((test_noisy - test_clean) ** 2)
    residual_power = torch.mean((test_denoised - test_clean) ** 2)

    snr_before = 10 * torch.log10(torch.mean(test_clean ** 2) / noise_power)
    snr_after = 10 * torch.log10(torch.mean(test_clean ** 2) / residual_power)

print(f"\nSNR before denoising: {snr_before.item():.2f} dB")
print(f"SNR after denoising:  {snr_after.item():.2f} dB")
print(f"Improvement: {(snr_after - snr_before).item():.2f} dB")

# 1変数の可視化
time_steps = np.arange(10)
var_idx = 0

plt.figure(figsize=(10, 4))
plt.plot(time_steps, test_clean[:, var_idx].numpy(), 'g-', label='Clean', linewidth=2)
plt.plot(time_steps, test_noisy[:, var_idx].numpy(), 'r--', label='Noisy', alpha=0.7)
plt.plot(time_steps, test_denoised[:, var_idx].numpy(), 'b-', label='Denoised', linewidth=2)
plt.xlabel('Sample')
plt.ylabel('Normalized Value')
plt.title(f'Denoising Performance (Variable {var_idx+1})')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()

# 出力例:
# Epoch 20, Denoising Loss: 0.234567
# Epoch 40, Denoising Loss: 0.123456
# Epoch 60, Denoising Loss: 0.078901
# Epoch 80, Denoising Loss: 0.056789
# Epoch 100, Denoising Loss: 0.045678
#
# SNR before denoising: 5.23 dB
# SNR after denoising:  18.76 dB
# Improvement: 13.53 dB

4.3 Variational Autoencoder(VAE)

VAEは確率的な生成モデルで、潜在空間から新しいデータをサンプリングできます。プロセスデータの生成や補完に利用します。

💡 VAEの特徴

  • 確率的エンコーディング: 潜在変数 \(z \sim \mathcal{N}(\mu, \sigma^2)\)
  • KL divergence項: 潜在分布を標準正規分布に近づける正則化
  • 生成能力: 潜在空間からサンプリングして新データ生成

VAEの損失関数:

$$\mathcal{L} = \mathbb{E}_{q(z|x)}[\log p(x|z)] - D_{KL}(q(z|x) || p(z))$$

例3: VAEによるプロセスデータ生成

class VAE(nn.Module):
    """Variational Autoencoder"""

    def __init__(self, input_dim, latent_dim=8):
        super(VAE, self).__init__()

        # Encoder
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)

        # 平均と分散を出力
        self.fc_mu = nn.Linear(64, latent_dim)
        self.fc_logvar = nn.Linear(64, latent_dim)

        # Decoder
        self.fc3 = nn.Linear(latent_dim, 64)
        self.fc4 = nn.Linear(64, 128)
        self.fc5 = nn.Linear(128, input_dim)

    def encode(self, x):
        """Encoder: 平均と対数分散を出力

        Returns:
            mu: [batch, latent_dim] 平均
            logvar: [batch, latent_dim] log(σ²)
        """
        h = torch.relu(self.fc1(x))
        h = torch.relu(self.fc2(h))
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        """Reparameterization trick

        z = μ + σ * ε, where ε ~ N(0, 1)
        """
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mu + eps * std
        return z

    def decode(self, z):
        """Decoder: 潜在変数から再構成"""
        h = torch.relu(self.fc3(z))
        h = torch.relu(self.fc4(h))
        reconstructed = self.fc5(h)
        return reconstructed

    def forward(self, x):
        """
        Returns:
            reconstructed: 再構成データ
            mu: 潜在変数の平均
            logvar: 潜在変数の対数分散
        """
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        reconstructed = self.decode(z)
        return reconstructed, mu, logvar

def vae_loss(reconstructed, original, mu, logvar, beta=1.0):
    """VAE loss = Reconstruction + KL divergence

    Args:
        beta: KL項の重み(β-VAE)
    """
    # Reconstruction loss (MSE)
    recon_loss = nn.functional.mse_loss(reconstructed, original, reduction='sum')

    # KL divergence: -0.5 * Σ(1 + log(σ²) - μ² - σ²)
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return recon_loss + beta * kl_loss

# プロセスデータ
data = generate_process_data(n_samples=1000, n_features=15)
data_normalized = (data - data.mean(axis=0)) / (data.std(axis=0) + 1e-8)
data_tensor = torch.FloatTensor(data_normalized)

# VAE訓練
model = VAE(input_dim=15, latent_dim=4)
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(150):
    model.train()
    optimizer.zero_grad()

    reconstructed, mu, logvar = model(data_tensor)
    loss = vae_loss(reconstructed, data_tensor, mu, logvar, beta=0.5)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 30 == 0:
        print(f'Epoch {epoch+1}, Loss: {loss.item():.2f}')

# 新しいデータ生成
model.eval()
with torch.no_grad():
    # 潜在空間からサンプリング
    z_samples = torch.randn(10, 4)  # 標準正規分布から10サンプル
    generated_data = model.decode(z_samples)

    print(f"\nGenerated data shape: {generated_data.shape}")
    print(f"Generated data range: [{generated_data.min():.2f}, {generated_data.max():.2f}]")

    # 元データとの統計比較
    original_mean = data_tensor.mean(dim=0)
    generated_mean = generated_data.mean(dim=0)

    print(f"\nOriginal data mean (first 5 vars): {original_mean[:5].numpy()}")
    print(f"Generated data mean (first 5 vars): {generated_mean[:5].numpy()}")

# 出力例:
# Epoch 30, Loss: 8765.43
# Epoch 60, Loss: 4321.09
# Epoch 90, Loss: 2987.65
# Epoch 120, Loss: 2456.78
# Epoch 150, Loss: 2234.56
#
# Generated data shape: torch.Size([10, 15])
# Generated data range: [-2.87, 2.45]
#
# Original data mean (first 5 vars): [-0.01  0.02 -0.00  0.01 -0.02]
# Generated data mean (first 5 vars): [-0.15  0.23 -0.08  0.12 -0.18]

4.4 再構成誤差による異常検知

正常データで訓練したオートエンコーダは、異常データを正確に再構成できません。この性質を利用して異常を検知します。

例4: プロセス異常検知システム

class AnomalyDetector:
    """オートエンコーダベースの異常検知"""

    def __init__(self, autoencoder, threshold_percentile=95):
        """
        Args:
            autoencoder: 訓練済みオートエンコーダ
            threshold_percentile: 異常判定閾値(正常データの再構成誤差のパーセンタイル)
        """
        self.autoencoder = autoencoder
        self.threshold = None
        self.threshold_percentile = threshold_percentile

    def fit_threshold(self, normal_data):
        """正常データから閾値を決定

        Args:
            normal_data: [samples, features] 正常プロセスデータ
        """
        self.autoencoder.eval()
        with torch.no_grad():
            reconstructed, _ = self.autoencoder(normal_data)
            reconstruction_errors = torch.mean((normal_data - reconstructed) ** 2, dim=1)

        # パーセンタイルで閾値を設定
        self.threshold = torch.quantile(reconstruction_errors, self.threshold_percentile / 100.0)

        print(f"Anomaly threshold set to: {self.threshold.item():.6f}")
        print(f"Based on {self.threshold_percentile}th percentile of normal data errors")

        return reconstruction_errors

    def detect(self, data):
        """異常検知

        Args:
            data: [samples, features] テストデータ

        Returns:
            is_anomaly: [samples] ブール配列(True=異常)
            scores: [samples] 異常スコア(再構成誤差)
        """
        self.autoencoder.eval()
        with torch.no_grad():
            reconstructed, _ = self.autoencoder(data)
            scores = torch.mean((data - reconstructed) ** 2, dim=1)

        is_anomaly = scores > self.threshold

        return is_anomaly, scores

# 正常データで訓練
normal_data = generate_process_data(n_samples=800, n_features=10)
normal_normalized = (normal_data - normal_data.mean(axis=0)) / (normal_data.std(axis=0) + 1e-8)
normal_tensor = torch.FloatTensor(normal_normalized)

# オートエンコーダ訓練
ae_model = VanillaAutoencoder(input_dim=10, latent_dim=3)
optimizer = optim.Adam(ae_model.parameters(), lr=0.001)
criterion = nn.MSELoss()

for epoch in range(100):
    ae_model.train()
    optimizer.zero_grad()
    reconstructed, _ = ae_model(normal_tensor)
    loss = criterion(reconstructed, normal_tensor)
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 25 == 0:
        print(f'Training Epoch {epoch+1}, Loss: {loss.item():.6f}')

# 異常検知器の初期化
detector = AnomalyDetector(ae_model, threshold_percentile=95)
normal_errors = detector.fit_threshold(normal_tensor)

# 異常データ生成(一部の変数に大きな偏差を追加)
anomaly_data = generate_process_data(n_samples=200, n_features=10)
# 異常パターン1: 特定変数が異常値
anomaly_data[:100, 0] += 5.0  # 変数1が異常
# 異常パターン2: 複数変数の異常な相関
anomaly_data[100:, [2, 5, 7]] += 3.0

anomaly_normalized = (anomaly_data - normal_data.mean(axis=0)) / (normal_data.std(axis=0) + 1e-8)
anomaly_tensor = torch.FloatTensor(anomaly_normalized)

# 異常検知
is_anomaly, anomaly_scores = detector.detect(anomaly_tensor)

# 評価
n_detected = is_anomaly.sum().item()
detection_rate = 100 * n_detected / len(anomaly_tensor)

print(f"\nAnomaly detection results:")
print(f"  Total test samples: {len(anomaly_tensor)}")
print(f"  Detected anomalies: {n_detected}")
print(f"  Detection rate: {detection_rate:.2f}%")
print(f"  Score range: [{anomaly_scores.min():.6f}, {anomaly_scores.max():.6f}]")

# 正常データでの誤検知率
false_positive = (normal_errors > detector.threshold).sum().item()
fpr = 100 * false_positive / len(normal_errors)
print(f"  False positive rate: {fpr:.2f}%")

# 可視化
plt.figure(figsize=(10, 4))
plt.hist(normal_errors.numpy(), bins=50, alpha=0.7, label='Normal', color='green')
plt.hist(anomaly_scores.numpy(), bins=50, alpha=0.7, label='Anomaly', color='red')
plt.axvline(detector.threshold.item(), color='black', linestyle='--', label='Threshold')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
plt.title('Anomaly Detection: Reconstruction Error Distribution')
plt.yscale('log')
plt.grid(True, alpha=0.3)
plt.tight_layout()

# 出力例:
# Training Epoch 25, Loss: 0.045678
# Training Epoch 50, Loss: 0.023456
# Training Epoch 75, Loss: 0.015678
# Training Epoch 100, Loss: 0.012345
# Anomaly threshold set to: 0.034567
# Based on 95th percentile of normal data errors
#
# Anomaly detection results:
#   Total test samples: 200
#   Detected anomalies: 187
#   Detection rate: 93.50%
#   Score range: [0.012345, 0.234567]
#   False positive rate: 5.00%

4.5 Sparse Autoencoder(疎な特徴抽出)

スパース性正則化により、少数の重要な特徴のみを活性化させます。プロセス変数の中から重要な組み合わせを発見できます。

例5: スパース特徴抽出

class SparseAutoencoder(nn.Module):
    """スパースオートエンコーダ"""

    def __init__(self, input_dim, latent_dim=20):
        super(SparseAutoencoder, self).__init__()

        # Encoder(潜在層を大きめに設定し、スパース性で制約)
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, latent_dim),
            nn.ReLU()  # 非負制約(ReLU)
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed, latent

def sparse_loss(reconstructed, original, latent, sparsity_weight=0.01, sparsity_target=0.05):
    """スパース性を考慮した損失関数

    Args:
        sparsity_weight: スパース項の重み
        sparsity_target: 目標スパース率(5%の活性化が理想など)
    """
    # Reconstruction loss
    recon_loss = nn.functional.mse_loss(reconstructed, original)

    # Sparsity loss: KL divergence between target and actual activation
    # 各ユニットの平均活性化率
    rho = torch.mean(latent, dim=0)  # [latent_dim]
    rho_hat = torch.tensor([sparsity_target] * latent.size(1))

    # KL(ρ || ρ̂)
    kl_div = sparsity_target * torch.log(sparsity_target / (rho + 1e-8)) + \
             (1 - sparsity_target) * torch.log((1 - sparsity_target) / (1 - rho + 1e-8))
    sparsity_loss = torch.sum(kl_div)

    return recon_loss + sparsity_weight * sparsity_loss

# プロセスデータ(30変数)
large_data = generate_process_data(n_samples=1000, n_features=30)
large_normalized = (large_data - large_data.mean(axis=0)) / (large_data.std(axis=0) + 1e-8)
large_tensor = torch.FloatTensor(large_normalized)

# スパースオートエンコーダ訓練
sparse_model = SparseAutoencoder(input_dim=30, latent_dim=20)
optimizer = optim.Adam(sparse_model.parameters(), lr=0.001)

for epoch in range(100):
    sparse_model.train()
    optimizer.zero_grad()

    reconstructed, latent = sparse_model(large_tensor)
    loss = sparse_loss(reconstructed, large_tensor, latent,
                      sparsity_weight=0.05, sparsity_target=0.1)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        # スパース率を計算
        with torch.no_grad():
            _, latent_codes = sparse_model(large_tensor)
            sparsity_rate = (latent_codes < 0.01).float().mean().item()

        print(f'Epoch {epoch+1}, Loss: {loss.item():.6f}, Sparsity: {100*sparsity_rate:.2f}%')

# 重要特徴の分析
sparse_model.eval()
with torch.no_grad():
    _, latent_codes = sparse_model(large_tensor)

    # 各潜在ユニットの活性化頻度
    activation_freq = (latent_codes > 0.1).float().mean(dim=0).numpy()

    # Top-5の活性的なユニット
    top_units = np.argsort(activation_freq)[-5:][::-1]

    print(f"\nTop 5 active latent units:")
    for i, unit_id in enumerate(top_units):
        print(f"  {i+1}. Unit {unit_id}: {100*activation_freq[unit_id]:.2f}% activation")

    # 各ユニットが表現する入力変数の重要度
    encoder_weights = sparse_model.encoder[2].weight.data.numpy()  # [latent, 128]

    print(f"\nLatent unit representations:")
    for unit_id in top_units[:3]:
        weights = encoder_weights[unit_id]
        top_inputs = np.argsort(np.abs(weights))[-5:][::-1]
        print(f"  Unit {unit_id} → Input variables: {top_inputs}")

# 出力例:
# Epoch 20, Loss: 0.567890, Sparsity: 62.34%
# Epoch 40, Loss: 0.345678, Sparsity: 78.56%
# Epoch 60, Loss: 0.234567, Sparsity: 85.23%
# Epoch 80, Loss: 0.178901, Sparsity: 88.45%
# Epoch 100, Loss: 0.145678, Sparsity: 89.67%
#
# Top 5 active latent units:
#   1. Unit 12: 45.67% activation
#   2. Unit 7: 38.92% activation
#   3. Unit 18: 32.45% activation
#   4. Unit 3: 28.76% activation
#   5. Unit 15: 24.33% activation
#
# Latent unit representations:
#   Unit 12 → Input variables: [ 3  7 12 18 23]
#   Unit 7 → Input variables: [ 1  5 14 19 27]
#   Unit 18 → Input variables: [ 2  9 11 16 25]

4.6 Convolutional Autoencoder(画像圧縮)

畳み込み層を使用したオートエンコーダで、プロセス画像を効率的に圧縮・復元します。

例6: プロセス画像の圧縮

class ConvAutoencoder(nn.Module):
    """畳み込みオートエンコーダ"""

    def __init__(self, latent_dim=64):
        super(ConvAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            # 224x224 → 112x112
            nn.Conv2d(3, 32, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),

            # 112x112 → 56x56
            nn.Conv2d(32, 64, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),

            # 56x56 → 28x28
            nn.Conv2d(64, 128, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),

            # 28x28 → 14x14
            nn.Conv2d(128, 256, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(256)
        )

        # Bottleneck: 256*14*14 → latent_dim
        self.fc_encode = nn.Linear(256 * 14 * 14, latent_dim)

        # Bottleneck: latent_dim → 256*14*14
        self.fc_decode = nn.Linear(latent_dim, 256 * 14 * 14)

        # Decoder
        self.decoder = nn.Sequential(
            # 14x14 → 28x28
            nn.ConvTranspose2d(256, 128, 3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),

            # 28x28 → 56x56
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),

            # 56x56 → 112x112
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),

            # 112x112 → 224x224
            nn.ConvTranspose2d(32, 3, 3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()  # 0-1に正規化
        )

    def forward(self, x):
        """
        Args:
            x: [batch, 3, 224, 224]
        Returns:
            reconstructed: [batch, 3, 224, 224]
            latent: [batch, latent_dim]
        """
        # Encode
        encoded = self.encoder(x)  # [batch, 256, 14, 14]
        encoded_flat = encoded.view(encoded.size(0), -1)  # [batch, 256*14*14]
        latent = self.fc_encode(encoded_flat)  # [batch, latent_dim]

        # Decode
        decoded_flat = self.fc_decode(latent)  # [batch, 256*14*14]
        decoded = decoded_flat.view(-1, 256, 14, 14)  # [batch, 256, 14, 14]
        reconstructed = self.decoder(decoded)  # [batch, 3, 224, 224]

        return reconstructed, latent

# ダミー画像データ(実際はプロセス画像)
dummy_images = torch.rand(16, 3, 224, 224)  # 0-1に正規化済み

# モデル
conv_ae = ConvAutoencoder(latent_dim=128)

# 圧縮率の計算
original_size = 3 * 224 * 224  # 150,528
compressed_size = 128
compression_ratio = original_size / compressed_size

print(f"Compression ratio: {compression_ratio:.2f}x")
print(f"Original: {original_size:,} → Compressed: {compressed_size}")

# 訓練
criterion = nn.MSELoss()
optimizer = optim.Adam(conv_ae.parameters(), lr=0.001)

for epoch in range(30):
    conv_ae.train()
    optimizer.zero_grad()

    reconstructed, latent = conv_ae(dummy_images)
    loss = criterion(reconstructed, dummy_images)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        # PSNR(Peak Signal-to-Noise Ratio)を計算
        mse = loss.item()
        psnr = 10 * np.log10(1.0 / mse)
        print(f'Epoch {epoch+1}, MSE: {mse:.6f}, PSNR: {psnr:.2f} dB')

# テスト
conv_ae.eval()
with torch.no_grad():
    test_image = dummy_images[0:1]
    reconstructed_image, latent_code = conv_ae(test_image)

    # 画像の比較(最初のサンプル)
    original_np = test_image[0].permute(1, 2, 0).numpy()
    reconstructed_np = reconstructed_image[0].permute(1, 2, 0).numpy()

    # 誤差
    pixel_error = np.abs(original_np - reconstructed_np)
    mean_error = pixel_error.mean()

    print(f"\nReconstruction quality:")
    print(f"  Mean pixel error: {mean_error:.6f}")
    print(f"  Max pixel error: {pixel_error.max():.6f}")

# 出力例:
# Compression ratio: 1176.00x
# Original: 150,528 → Compressed: 128
# Epoch 10, MSE: 0.023456, PSNR: 16.30 dB
# Epoch 20, MSE: 0.012345, PSNR: 19.09 dB
# Epoch 30, MSE: 0.008901, PSNR: 20.51 dB
#
# Reconstruction quality:
#   Mean pixel error: 0.008234
#   Max pixel error: 0.234567

4.7 Conditional VAE(条件付き生成)

条件(プロセス条件、設定温度など)を指定して、その条件下でのプロセスデータを生成します。

例7: 条件付きプロセスデータ生成

class ConditionalVAE(nn.Module):
    """条件付きVAE(C-VAE)"""

    def __init__(self, input_dim, condition_dim, latent_dim=8):
        """
        Args:
            input_dim: データ次元
            condition_dim: 条件次元(温度、圧力設定値など)
            latent_dim: 潜在次元
        """
        super(ConditionalVAE, self).__init__()

        # Encoder: データ + 条件 → 潜在変数
        self.fc1 = nn.Linear(input_dim + condition_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc_mu = nn.Linear(64, latent_dim)
        self.fc_logvar = nn.Linear(64, latent_dim)

        # Decoder: 潜在変数 + 条件 → データ
        self.fc3 = nn.Linear(latent_dim + condition_dim, 64)
        self.fc4 = nn.Linear(64, 128)
        self.fc5 = nn.Linear(128, input_dim)

    def encode(self, x, c):
        """Encode with condition

        Args:
            x: [batch, input_dim] データ
            c: [batch, condition_dim] 条件
        """
        h = torch.cat([x, c], dim=1)  # 条件を結合
        h = torch.relu(self.fc1(h))
        h = torch.relu(self.fc2(h))
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def decode(self, z, c):
        """Decode with condition

        Args:
            z: [batch, latent_dim] 潜在変数
            c: [batch, condition_dim] 条件
        """
        h = torch.cat([z, c], dim=1)  # 条件を結合
        h = torch.relu(self.fc3(h))
        h = torch.relu(self.fc4(h))
        reconstructed = self.fc5(h)
        return reconstructed

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x, c):
        mu, logvar = self.encode(x, c)
        z = self.reparameterize(mu, logvar)
        reconstructed = self.decode(z, c)
        return reconstructed, mu, logvar

# 条件付きプロセスデータ生成
def generate_conditional_data(n_samples=1000):
    """条件(温度)に依存するプロセスデータ"""
    # 条件: 反応温度 [300-500K]
    temperature = np.random.uniform(300, 500, n_samples)

    # データ: 温度に依存する5変数
    data = np.zeros((n_samples, 5))

    for i in range(n_samples):
        T = temperature[i]

        # 温度に依存する物理化学的関係
        data[i, 0] = 0.001 * T**2 - 0.3 * T + 50  # 反応速度定数
        data[i, 1] = 100 * np.exp(-5000 / T)  # 平衡定数(Arrhenius型)
        data[i, 2] = 0.5 * T + 50  # 圧力
        data[i, 3] = -0.002 * T + 2.0  # pH
        data[i, 4] = 0.01 * T  # 濃度

        # ノイズ
        data[i] += np.random.randn(5) * 2

    # 正規化
    condition = (temperature - 300) / 200  # 0-1に正規化
    condition = condition.reshape(-1, 1)

    return data, condition

# データ生成
data, conditions = generate_conditional_data(n_samples=1000)
data_normalized = (data - data.mean(axis=0)) / (data.std(axis=0) + 1e-8)

data_tensor = torch.FloatTensor(data_normalized)
condition_tensor = torch.FloatTensor(conditions)

# C-VAE訓練
cvae = ConditionalVAE(input_dim=5, condition_dim=1, latent_dim=4)
optimizer = optim.Adam(cvae.parameters(), lr=0.001)

for epoch in range(150):
    cvae.train()
    optimizer.zero_grad()

    reconstructed, mu, logvar = cvae(data_tensor, condition_tensor)
    loss = vae_loss(reconstructed, data_tensor, mu, logvar, beta=0.5)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 30 == 0:
        print(f'Epoch {epoch+1}, Loss: {loss.item():.2f}')

# 条件を指定して生成
cvae.eval()
with torch.no_grad():
    # 特定温度(例: 400K)でのデータ生成
    target_temp = 400  # K
    target_condition = torch.FloatTensor([[(target_temp - 300) / 200]])  # 正規化

    # 潜在空間からサンプリング
    z_samples = torch.randn(10, 4)

    # 条件を10サンプル全てに適用
    conditions_repeated = target_condition.repeat(10, 1)

    # 生成
    generated_data = cvae.decode(z_samples, conditions_repeated)

    print(f"\nGenerated data for T={target_temp}K:")
    print(f"  Shape: {generated_data.shape}")
    print(f"  Mean (normalized): {generated_data.mean(dim=0).numpy()}")

    # 異なる温度での生成比較
    temps = [320, 380, 440, 500]
    print(f"\nData generation at different temperatures:")

    for temp in temps:
        cond = torch.FloatTensor([[(temp - 300) / 200]])
        z = torch.randn(1, 4)
        gen = cvae.decode(z, cond)
        print(f"  T={temp}K: Variable 0 = {gen[0, 0].item():.4f}")

# 出力例:
# Epoch 30, Loss: 6789.01
# Epoch 60, Loss: 3456.78
# Epoch 90, Loss: 2345.67
# Epoch 120, Loss: 1987.65
# Epoch 150, Loss: 1765.43
#
# Generated data for T=400K:
#   Shape: torch.Size([10, 5])
#   Mean (normalized): [-0.12  0.34 -0.08  0.15 -0.23]
#
# Data generation at different temperatures:
#   T=320K: Variable 0 = -1.2345
#   T=380K: Variable 0 = -0.4567
#   T=440K: Variable 0 = 0.6789
#   T=500K: Variable 0 = 1.5432

4.8 GANによる合成プロセスデータ生成

Generative Adversarial Network(GAN)を使用して、実データと見分けがつかないプロセスデータを生成します。データ拡張や少量データ問題の解決に利用します。

例8: Process GAN

class Generator(nn.Module):
    """プロセスデータ生成器"""

    def __init__(self, latent_dim=16, output_dim=10):
        super(Generator, self).__init__()

        self.model = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(64),

            nn.Linear(64, 128),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(128),

            nn.Linear(128, 256),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(256),

            nn.Linear(256, output_dim),
            nn.Tanh()  # -1~1に正規化
        )

    def forward(self, z):
        """
        Args:
            z: [batch, latent_dim] ノイズベクトル
        Returns:
            fake_data: [batch, output_dim] 生成データ
        """
        return self.model(z)

class Discriminator(nn.Module):
    """プロセスデータ識別器(本物 vs 偽物)"""

    def __init__(self, input_dim=10):
        super(Discriminator, self).__init__()

        self.model = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),

            nn.Linear(256, 128),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),

            nn.Linear(128, 64),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),

            nn.Linear(64, 1),
            nn.Sigmoid()  # 0-1の確率
        )

    def forward(self, x):
        """
        Args:
            x: [batch, input_dim] データ
        Returns:
            validity: [batch, 1] 本物である確率
        """
        return self.model(x)

# 実データ
real_data = generate_process_data(n_samples=1000, n_features=10)
real_normalized = 2 * (real_data - real_data.min(axis=0)) / \
                  (real_data.max(axis=0) - real_data.min(axis=0) + 1e-8) - 1  # -1~1
real_tensor = torch.FloatTensor(real_normalized)

# GAN初期化
latent_dim = 16
generator = Generator(latent_dim=latent_dim, output_dim=10)
discriminator = Discriminator(input_dim=10)

# 最適化
lr = 0.0002
optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))

criterion = nn.BCELoss()

# 訓練
batch_size = 64
n_epochs = 100

for epoch in range(n_epochs):
    # Discriminatorの訓練
    for _ in range(2):  # Discriminatorを2回更新
        optimizer_D.zero_grad()

        # 実データ
        idx = np.random.randint(0, len(real_tensor), batch_size)
        real_batch = real_tensor[idx]
        real_labels = torch.ones(batch_size, 1)

        # 偽データ
        z = torch.randn(batch_size, latent_dim)
        fake_batch = generator(z).detach()
        fake_labels = torch.zeros(batch_size, 1)

        # Discriminatorの損失
        real_loss = criterion(discriminator(real_batch), real_labels)
        fake_loss = criterion(discriminator(fake_batch), fake_labels)
        d_loss = (real_loss + fake_loss) / 2

        d_loss.backward()
        optimizer_D.step()

    # Generatorの訓練
    optimizer_G.zero_grad()

    z = torch.randn(batch_size, latent_dim)
    fake_batch = generator(z)

    # Generatorは偽物を本物と誤認させたい
    g_loss = criterion(discriminator(fake_batch), torch.ones(batch_size, 1))

    g_loss.backward()
    optimizer_G.step()

    if (epoch + 1) % 20 == 0:
        print(f'Epoch {epoch+1}/{n_epochs}, D Loss: {d_loss.item():.4f}, G Loss: {g_loss.item():.4f}')

# 生成データの評価
generator.eval()
with torch.no_grad():
    z_samples = torch.randn(1000, latent_dim)
    generated_data = generator(z_samples).numpy()

# 統計比較
print(f"\nStatistical comparison:")
print(f"Real data mean: {real_normalized.mean(axis=0)[:3]}")
print(f"Generated mean: {generated_data.mean(axis=0)[:3]}")
print(f"Real data std:  {real_normalized.std(axis=0)[:3]}")
print(f"Generated std:  {generated_data.std(axis=0)[:3]}")

# Discriminatorによる評価
real_score = discriminator(real_tensor).mean().item()
fake_score = discriminator(torch.FloatTensor(generated_data)).mean().item()

print(f"\nDiscriminator scores:")
print(f"  Real data: {real_score:.4f} (1.0 = perfect real)")
print(f"  Generated data: {fake_score:.4f} (0.5 = indistinguishable)")

# 出力例:
# Epoch 20/100, D Loss: 0.5678, G Loss: 0.8901
# Epoch 40/100, D Loss: 0.4567, G Loss: 1.0234
# Epoch 60/100, D Loss: 0.3890, G Loss: 1.2345
# Epoch 80/100, D Loss: 0.3456, G Loss: 1.3456
# Epoch 100/100, D Loss: 0.3234, G Loss: 1.4123
#
# Statistical comparison:
# Real data mean: [-0.023  0.045 -0.012]
# Generated mean: [-0.034  0.056 -0.018]
# Real data std:  [0.567 0.623 0.589]
# Generated std:  [0.543 0.598 0.612]
#
# Discriminator scores:
#   Real data: 0.8234 (1.0 = perfect real)
#   Generated data: 0.4876 (0.5 = indistinguishable)

✅ GANの応用

  • データ拡張: 少量の実データから大量の合成データを生成
  • 異常データ生成: 訓練データに不足する異常パターンを合成
  • シミュレーション代替: 物理シミュレーションの高速代替
  • プライバシー保護: 実データの統計的性質を保持した合成データ

学習目標の確認

この章を完了すると、以下を実装・説明できるようになります:

基本理解

実践スキル

応用力

参考文献

  1. Hinton, G. E., & Salakhutdinov, R. R. (2006). "Reducing the Dimensionality of Data with Neural Networks." Science, 313(5786), 504-507.
  2. Vincent, P., et al. (2008). "Extracting and Composing Robust Features with Denoising Autoencoders." ICML 2008.
  3. Kingma, D. P., & Welling, M. (2014). "Auto-Encoding Variational Bayes." ICLR 2014.
  4. Goodfellow, I., et al. (2014). "Generative Adversarial Networks." NeurIPS 2014.
  5. Sakurada, M., & Yairi, T. (2014). "Anomaly Detection Using Autoencoders with Nonlinear Dimensionality Reduction." MLSDA Workshop 2014.
  6. Mirza, M., & Osindero, S. (2014). "Conditional Generative Adversarial Nets." arXiv:1411.1784.