🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 4: Autoencoders and Generative Models

Dimensionality Reduction, Anomaly Detection, and Process Data Generation Applications

📖 Reading Time: 35-40 minutes 💡 Difficulty: Advanced 🔬 Examples: VAE, GAN, Anomaly Detection

This chapter covers Autoencoders and Generative Models. You will learn encoder-decoder structure of autoencoders, reparameterization trick, and adversarial learning mechanism of GAN.

4.1 Vanilla Autoencoder (Basic Autoencoder)

An autoencoder is a neural network that compresses input data into a low-dimensional latent representation (encoding) and reconstructs the original data (decoding). It is effective for dimensionality reduction and feature extraction of process data.

💡 Basic Structure of Autoencoder

  • Encoder: High-dimensional data → Low-dimensional latent representation (bottleneck)
  • Decoder: Latent representation → Reconstructed data
  • Objective: Minimize reconstruction error \(\mathcal{L} = ||x - \hat{x}||^2\)

Example 1: Dimensionality Reduction of Process Data

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

class VanillaAutoencoder(nn.Module):
    """Basic autoencoder"""

    def __init__(self, input_dim, latent_dim=8):
        """
        Args:
            input_dim: Input dimension (number of process variables)
            latent_dim: Latent space dimension
        """
        super(VanillaAutoencoder, self).__init__()

        # Encoder: Input → Latent representation
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, latent_dim)
        )

        # Decoder: Latent representation → Reconstruction
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def forward(self, x):
        """
        Args:
            x: [batch, input_dim] Input data
        Returns:
            reconstructed: [batch, input_dim] Reconstructed data
            latent: [batch, latent_dim] Latent representation
        """
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed, latent

# Generate synthetic process data (10 variables → compress to 2D)
def generate_process_data(n_samples=1000, n_features=10):
    """Multivariate process data (actually determined by 2 principal components)"""
    # Two latent factors
    z1 = np.random.randn(n_samples)
    z2 = np.random.randn(n_samples)

    # Generate 10 variables (linear combination of z1, z2 + noise)
    data = np.zeros((n_samples, n_features))
    for i in range(n_features):
        w1 = np.random.randn()
        w2 = np.random.randn()
        data[:, i] = w1 * z1 + w2 * z2 + 0.1 * np.random.randn(n_samples)

    return data

# Data generation and normalization
data = generate_process_data(n_samples=1000, n_features=10)
data_mean = data.mean(axis=0)
data_std = data.std(axis=0)
data_normalized = (data - data_mean) / (data_std + 1e-8)

# Tensor conversion
data_tensor = torch.FloatTensor(data_normalized)

# Model training
model = VanillaAutoencoder(input_dim=10, latent_dim=2)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(100):
    model.train()
    optimizer.zero_grad()

    reconstructed, latent = model(data_tensor)
    loss = criterion(reconstructed, data_tensor)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        print(f'Epoch {epoch+1}, Reconstruction Loss: {loss.item():.6f}')

# Visualize latent space
model.eval()
with torch.no_grad():
    _, latent_codes = model(data_tensor)
    latent_codes = latent_codes.numpy()

plt.figure(figsize=(8, 6))
plt.scatter(latent_codes[:, 0], latent_codes[:, 1], alpha=0.5, s=20)
plt.xlabel('Latent Dimension 1')
plt.ylabel('Latent Dimension 2')
plt.title('Latent Space Representation (10D → 2D)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
# plt.savefig('latent_space.png', dpi=150)

print(f"\nLatent space shape: {latent_codes.shape}")
print(f"Latent space range: [{latent_codes.min():.2f}, {latent_codes.max():.2f}]")

# Output example:
# Epoch 20, Reconstruction Loss: 0.125678
# Epoch 40, Reconstruction Loss: 0.056789
# Epoch 60, Reconstruction Loss: 0.034567
# Epoch 80, Reconstruction Loss: 0.023456
# Epoch 100, Reconstruction Loss: 0.018901
#
# Latent space shape: (1000, 2)
# Latent space range: [-3.45, 3.78]

4.2 Denoising Autoencoder (Noise Removal)

Recovers the true signal from process data containing sensor noise. It is trained to add noise to the input and reconstruct the original signal.

Example 2: Sensor Data Denoising

class DenoisingAutoencoder(nn.Module):
    """Denoising autoencoder"""

    def __init__(self, input_dim, latent_dim=16):
        super(DenoisingAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, input_dim)
        )

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed

def add_noise(data, noise_factor=0.3):
    """Add Gaussian noise

    Args:
        data: [samples, features] Clean data
        noise_factor: Noise intensity

    Returns:
        noisy_data: Data with added noise
    """
    noise = torch.randn_like(data) * noise_factor
    noisy_data = data + noise
    return noisy_data

# Clean process data
clean_data = torch.FloatTensor(generate_process_data(n_samples=1000, n_features=20))

# Normalization
clean_mean = clean_data.mean(dim=0, keepdim=True)
clean_std = clean_data.std(dim=0, keepdim=True)
clean_normalized = (clean_data - clean_mean) / (clean_std + 1e-8)

# Add noise
noisy_data = add_noise(clean_normalized, noise_factor=0.5)

# Model training
model = DenoisingAutoencoder(input_dim=20, latent_dim=16)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(100):
    model.train()
    optimizer.zero_grad()

    # Learn: Noisy input → Clean output
    denoised = model(noisy_data)
    loss = criterion(denoised, clean_normalized)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        print(f'Epoch {epoch+1}, Denoising Loss: {loss.item():.6f}')

# Test: Denoise new noisy data
model.eval()
with torch.no_grad():
    test_clean = clean_normalized[:10]
    test_noisy = add_noise(test_clean, noise_factor=0.5)
    test_denoised = model(test_noisy)

    # Calculate SNR (Signal-to-Noise Ratio)
    noise_power = torch.mean((test_noisy - test_clean) ** 2)
    residual_power = torch.mean((test_denoised - test_clean) ** 2)

    snr_before = 10 * torch.log10(torch.mean(test_clean ** 2) / noise_power)
    snr_after = 10 * torch.log10(torch.mean(test_clean ** 2) / residual_power)

print(f"\nSNR before denoising: {snr_before.item():.2f} dB")
print(f"SNR after denoising:  {snr_after.item():.2f} dB")
print(f"Improvement: {(snr_after - snr_before).item():.2f} dB")

# Visualize one variable
time_steps = np.arange(10)
var_idx = 0

plt.figure(figsize=(10, 4))
plt.plot(time_steps, test_clean[:, var_idx].numpy(), 'g-', label='Clean', linewidth=2)
plt.plot(time_steps, test_noisy[:, var_idx].numpy(), 'r--', label='Noisy', alpha=0.7)
plt.plot(time_steps, test_denoised[:, var_idx].numpy(), 'b-', label='Denoised', linewidth=2)
plt.xlabel('Sample')
plt.ylabel('Normalized Value')
plt.title(f'Denoising Performance (Variable {var_idx+1})')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()

# Output example:
# Epoch 20, Denoising Loss: 0.234567
# Epoch 40, Denoising Loss: 0.123456
# Epoch 60, Denoising Loss: 0.078901
# Epoch 80, Denoising Loss: 0.056789
# Epoch 100, Denoising Loss: 0.045678
#
# SNR before denoising: 5.23 dB
# SNR after denoising:  18.76 dB
# Improvement: 13.53 dB

4.3 Variational Autoencoder (VAE)

VAE is a probabilistic generative model that can sample new data from the latent space. It is used for process data generation and interpolation.

💡 VAE Characteristics

  • Probabilistic Encoding: Latent variable \(z \sim \mathcal{N}(\mu, \sigma^2)\)
  • KL divergence term: Regularization to bring latent distribution closer to standard normal distribution
  • Generation capability: Generate new data by sampling from latent space

VAE loss function:

$$\mathcal{L} = \mathbb{E}_{q(z|x)}[\log p(x|z)] - D_{KL}(q(z|x) || p(z))$$

Example 3: Process Data Generation with VAE

class VAE(nn.Module):
    """Variational Autoencoder"""

    def __init__(self, input_dim, latent_dim=8):
        super(VAE, self).__init__()

        # Encoder
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)

        # Output mean and variance
        self.fc_mu = nn.Linear(64, latent_dim)
        self.fc_logvar = nn.Linear(64, latent_dim)

        # Decoder
        self.fc3 = nn.Linear(latent_dim, 64)
        self.fc4 = nn.Linear(64, 128)
        self.fc5 = nn.Linear(128, input_dim)

    def encode(self, x):
        """Encoder: Output mean and log variance

        Returns:
            mu: [batch, latent_dim] Mean
            logvar: [batch, latent_dim] log(σ²)
        """
        h = torch.relu(self.fc1(x))
        h = torch.relu(self.fc2(h))
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        """Reparameterization trick

        z = μ + σ * ε, where ε ~ N(0, 1)
        """
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mu + eps * std
        return z

    def decode(self, z):
        """Decoder: Reconstruct from latent variable"""
        h = torch.relu(self.fc3(z))
        h = torch.relu(self.fc4(h))
        reconstructed = self.fc5(h)
        return reconstructed

    def forward(self, x):
        """
        Returns:
            reconstructed: Reconstructed data
            mu: Mean of latent variable
            logvar: Log variance of latent variable
        """
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        reconstructed = self.decode(z)
        return reconstructed, mu, logvar

def vae_loss(reconstructed, original, mu, logvar, beta=1.0):
    """VAE loss = Reconstruction + KL divergence

    Args:
        beta: Weight of KL term (β-VAE)
    """
    # Reconstruction loss (MSE)
    recon_loss = nn.functional.mse_loss(reconstructed, original, reduction='sum')

    # KL divergence: -0.5 * Σ(1 + log(σ²) - μ² - σ²)
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return recon_loss + beta * kl_loss

# Process data
data = generate_process_data(n_samples=1000, n_features=15)
data_normalized = (data - data.mean(axis=0)) / (data.std(axis=0) + 1e-8)
data_tensor = torch.FloatTensor(data_normalized)

# VAE training
model = VAE(input_dim=15, latent_dim=4)
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(150):
    model.train()
    optimizer.zero_grad()

    reconstructed, mu, logvar = model(data_tensor)
    loss = vae_loss(reconstructed, data_tensor, mu, logvar, beta=0.5)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 30 == 0:
        print(f'Epoch {epoch+1}, Loss: {loss.item():.2f}')

# Generate new data
model.eval()
with torch.no_grad():
    # Sample from latent space
    z_samples = torch.randn(10, 4)  # 10 samples from standard normal distribution
    generated_data = model.decode(z_samples)

    print(f"\nGenerated data shape: {generated_data.shape}")
    print(f"Generated data range: [{generated_data.min():.2f}, {generated_data.max():.2f}]")

    # Statistical comparison with original data
    original_mean = data_tensor.mean(dim=0)
    generated_mean = generated_data.mean(dim=0)

    print(f"\nOriginal data mean (first 5 vars): {original_mean[:5].numpy()}")
    print(f"Generated data mean (first 5 vars): {generated_mean[:5].numpy()}")

# Output example:
# Epoch 30, Loss: 8765.43
# Epoch 60, Loss: 4321.09
# Epoch 90, Loss: 2987.65
# Epoch 120, Loss: 2456.78
# Epoch 150, Loss: 2234.56
#
# Generated data shape: torch.Size([10, 15])
# Generated data range: [-2.87, 2.45]
#
# Original data mean (first 5 vars): [-0.01  0.02 -0.00  0.01 -0.02]
# Generated data mean (first 5 vars): [-0.15  0.23 -0.08  0.12 -0.18]

4.4 Anomaly Detection Using Reconstruction Error

An autoencoder trained on normal data cannot accurately reconstruct abnormal data. This property is utilized to detect anomalies.

Example 4: Process Anomaly Detection System

class AnomalyDetector:
    """Autoencoder-based anomaly detection"""

    def __init__(self, autoencoder, threshold_percentile=95):
        """
        Args:
            autoencoder: Trained autoencoder
            threshold_percentile: Anomaly decision threshold (percentile of normal data reconstruction error)
        """
        self.autoencoder = autoencoder
        self.threshold = None
        self.threshold_percentile = threshold_percentile

    def fit_threshold(self, normal_data):
        """Determine threshold from normal data

        Args:
            normal_data: [samples, features] Normal process data
        """
        self.autoencoder.eval()
        with torch.no_grad():
            reconstructed, _ = self.autoencoder(normal_data)
            reconstruction_errors = torch.mean((normal_data - reconstructed) ** 2, dim=1)

        # Set threshold by percentile
        self.threshold = torch.quantile(reconstruction_errors, self.threshold_percentile / 100.0)

        print(f"Anomaly threshold set to: {self.threshold.item():.6f}")
        print(f"Based on {self.threshold_percentile}th percentile of normal data errors")

        return reconstruction_errors

    def detect(self, data):
        """Anomaly detection

        Args:
            data: [samples, features] Test data

        Returns:
            is_anomaly: [samples] Boolean array (True = anomaly)
            scores: [samples] Anomaly score (reconstruction error)
        """
        self.autoencoder.eval()
        with torch.no_grad():
            reconstructed, _ = self.autoencoder(data)
            scores = torch.mean((data - reconstructed) ** 2, dim=1)

        is_anomaly = scores > self.threshold

        return is_anomaly, scores

# Train on normal data
normal_data = generate_process_data(n_samples=800, n_features=10)
normal_normalized = (normal_data - normal_data.mean(axis=0)) / (normal_data.std(axis=0) + 1e-8)
normal_tensor = torch.FloatTensor(normal_normalized)

# Train autoencoder
ae_model = VanillaAutoencoder(input_dim=10, latent_dim=3)
optimizer = optim.Adam(ae_model.parameters(), lr=0.001)
criterion = nn.MSELoss()

for epoch in range(100):
    ae_model.train()
    optimizer.zero_grad()
    reconstructed, _ = ae_model(normal_tensor)
    loss = criterion(reconstructed, normal_tensor)
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 25 == 0:
        print(f'Training Epoch {epoch+1}, Loss: {loss.item():.6f}')

# Initialize anomaly detector
detector = AnomalyDetector(ae_model, threshold_percentile=95)
normal_errors = detector.fit_threshold(normal_tensor)

# Generate anomaly data (add large deviations to some variables)
anomaly_data = generate_process_data(n_samples=200, n_features=10)
# Anomaly pattern 1: Specific variable has abnormal value
anomaly_data[:100, 0] += 5.0  # Variable 1 is abnormal
# Anomaly pattern 2: Abnormal correlation of multiple variables
anomaly_data[100:, [2, 5, 7]] += 3.0

anomaly_normalized = (anomaly_data - normal_data.mean(axis=0)) / (normal_data.std(axis=0) + 1e-8)
anomaly_tensor = torch.FloatTensor(anomaly_normalized)

# Anomaly detection
is_anomaly, anomaly_scores = detector.detect(anomaly_tensor)

# Evaluation
n_detected = is_anomaly.sum().item()
detection_rate = 100 * n_detected / len(anomaly_tensor)

print(f"\nAnomaly detection results:")
print(f"  Total test samples: {len(anomaly_tensor)}")
print(f"  Detected anomalies: {n_detected}")
print(f"  Detection rate: {detection_rate:.2f}%")
print(f"  Score range: [{anomaly_scores.min():.6f}, {anomaly_scores.max():.6f}]")

# False positive rate on normal data
false_positive = (normal_errors > detector.threshold).sum().item()
fpr = 100 * false_positive / len(normal_errors)
print(f"  False positive rate: {fpr:.2f}%")

# Visualization
plt.figure(figsize=(10, 4))
plt.hist(normal_errors.numpy(), bins=50, alpha=0.7, label='Normal', color='green')
plt.hist(anomaly_scores.numpy(), bins=50, alpha=0.7, label='Anomaly', color='red')
plt.axvline(detector.threshold.item(), color='black', linestyle='--', label='Threshold')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
plt.title('Anomaly Detection: Reconstruction Error Distribution')
plt.yscale('log')
plt.grid(True, alpha=0.3)
plt.tight_layout()

# Output example:
# Training Epoch 25, Loss: 0.045678
# Training Epoch 50, Loss: 0.023456
# Training Epoch 75, Loss: 0.015678
# Training Epoch 100, Loss: 0.012345
# Anomaly threshold set to: 0.034567
# Based on 95th percentile of normal data errors
#
# Anomaly detection results:
#   Total test samples: 200
#   Detected anomalies: 187
#   Detection rate: 93.50%
#   Score range: [0.012345, 0.234567]
#   False positive rate: 5.00%

4.5 Sparse Autoencoder (Sparse Feature Extraction)

Through sparsity regularization, only a small number of important features are activated. Important combinations can be discovered from process variables.

Example 5: Sparse Feature Extraction

class SparseAutoencoder(nn.Module):
    """Sparse autoencoder"""

    def __init__(self, input_dim, latent_dim=20):
        super(SparseAutoencoder, self).__init__()

        # Encoder (set latent layer larger and constrain with sparsity)
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, latent_dim),
            nn.ReLU()  # Non-negative constraint (ReLU)
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed, latent

def sparse_loss(reconstructed, original, latent, sparsity_weight=0.01, sparsity_target=0.05):
    """Loss function considering sparsity

    Args:
        sparsity_weight: Weight of sparsity term
        sparsity_target: Target sparsity rate (e.g., 5% activation is ideal)
    """
    # Reconstruction loss
    recon_loss = nn.functional.mse_loss(reconstructed, original)

    # Sparsity loss: KL divergence between target and actual activation
    # Average activation rate of each unit
    rho = torch.mean(latent, dim=0)  # [latent_dim]
    rho_hat = torch.tensor([sparsity_target] * latent.size(1))

    # KL(ρ || ρ̂)
    kl_div = sparsity_target * torch.log(sparsity_target / (rho + 1e-8)) + \
             (1 - sparsity_target) * torch.log((1 - sparsity_target) / (1 - rho + 1e-8))
    sparsity_loss = torch.sum(kl_div)

    return recon_loss + sparsity_weight * sparsity_loss

# Process data (30 variables)
large_data = generate_process_data(n_samples=1000, n_features=30)
large_normalized = (large_data - large_data.mean(axis=0)) / (large_data.std(axis=0) + 1e-8)
large_tensor = torch.FloatTensor(large_normalized)

# Train sparse autoencoder
sparse_model = SparseAutoencoder(input_dim=30, latent_dim=20)
optimizer = optim.Adam(sparse_model.parameters(), lr=0.001)

for epoch in range(100):
    sparse_model.train()
    optimizer.zero_grad()

    reconstructed, latent = sparse_model(large_tensor)
    loss = sparse_loss(reconstructed, large_tensor, latent,
                      sparsity_weight=0.05, sparsity_target=0.1)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        # Calculate sparsity rate
        with torch.no_grad():
            _, latent_codes = sparse_model(large_tensor)
            sparsity_rate = (latent_codes < 0.01).float().mean().item()

        print(f'Epoch {epoch+1}, Loss: {loss.item():.6f}, Sparsity: {100*sparsity_rate:.2f}%')

# Analyze important features
sparse_model.eval()
with torch.no_grad():
    _, latent_codes = sparse_model(large_tensor)

    # Activation frequency of each latent unit
    activation_freq = (latent_codes > 0.1).float().mean(dim=0).numpy()

    # Top-5 active units
    top_units = np.argsort(activation_freq)[-5:][::-1]

    print(f"\nTop 5 active latent units:")
    for i, unit_id in enumerate(top_units):
        print(f"  {i+1}. Unit {unit_id}: {100*activation_freq[unit_id]:.2f}% activation")

    # Importance of input variables represented by each unit
    encoder_weights = sparse_model.encoder[2].weight.data.numpy()  # [latent, 128]

    print(f"\nLatent unit representations:")
    for unit_id in top_units[:3]:
        weights = encoder_weights[unit_id]
        top_inputs = np.argsort(np.abs(weights))[-5:][::-1]
        print(f"  Unit {unit_id} → Input variables: {top_inputs}")

# Output example:
# Epoch 20, Loss: 0.567890, Sparsity: 62.34%
# Epoch 40, Loss: 0.345678, Sparsity: 78.56%
# Epoch 60, Loss: 0.234567, Sparsity: 85.23%
# Epoch 80, Loss: 0.178901, Sparsity: 88.45%
# Epoch 100, Loss: 0.145678, Sparsity: 89.67%
#
# Top 5 active latent units:
#   1. Unit 12: 45.67% activation
#   2. Unit 7: 38.92% activation
#   3. Unit 18: 32.45% activation
#   4. Unit 3: 28.76% activation
#   5. Unit 15: 24.33% activation
#
# Latent unit representations:
#   Unit 12 → Input variables: [ 3  7 12 18 23]
#   Unit 7 → Input variables: [ 1  5 14 19 27]
#   Unit 18 → Input variables: [ 2  9 11 16 25]

4.6 Convolutional Autoencoder (Image Compression)

An autoencoder using convolutional layers efficiently compresses and restores process images.

Example 6: Process Image Compression

class ConvAutoencoder(nn.Module):
    """Convolutional autoencoder"""

    def __init__(self, latent_dim=64):
        super(ConvAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            # 224x224 → 112x112
            nn.Conv2d(3, 32, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),

            # 112x112 → 56x56
            nn.Conv2d(32, 64, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),

            # 56x56 → 28x28
            nn.Conv2d(64, 128, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),

            # 28x28 → 14x14
            nn.Conv2d(128, 256, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(256)
        )

        # Bottleneck: 256*14*14 → latent_dim
        self.fc_encode = nn.Linear(256 * 14 * 14, latent_dim)

        # Bottleneck: latent_dim → 256*14*14
        self.fc_decode = nn.Linear(latent_dim, 256 * 14 * 14)

        # Decoder
        self.decoder = nn.Sequential(
            # 14x14 → 28x28
            nn.ConvTranspose2d(256, 128, 3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),

            # 28x28 → 56x56
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),

            # 56x56 → 112x112
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),

            # 112x112 → 224x224
            nn.ConvTranspose2d(32, 3, 3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()  # Normalize to 0-1
        )

    def forward(self, x):
        """
        Args:
            x: [batch, 3, 224, 224]
        Returns:
            reconstructed: [batch, 3, 224, 224]
            latent: [batch, latent_dim]
        """
        # Encode
        encoded = self.encoder(x)  # [batch, 256, 14, 14]
        encoded_flat = encoded.view(encoded.size(0), -1)  # [batch, 256*14*14]
        latent = self.fc_encode(encoded_flat)  # [batch, latent_dim]

        # Decode
        decoded_flat = self.fc_decode(latent)  # [batch, 256*14*14]
        decoded = decoded_flat.view(-1, 256, 14, 14)  # [batch, 256, 14, 14]
        reconstructed = self.decoder(decoded)  # [batch, 3, 224, 224]

        return reconstructed, latent

# Dummy image data (actually process images)
dummy_images = torch.rand(16, 3, 224, 224)  # Normalized to 0-1

# Model
conv_ae = ConvAutoencoder(latent_dim=128)

# Calculate compression ratio
original_size = 3 * 224 * 224  # 150,528
compressed_size = 128
compression_ratio = original_size / compressed_size

print(f"Compression ratio: {compression_ratio:.2f}x")
print(f"Original: {original_size:,} → Compressed: {compressed_size}")

# Training
criterion = nn.MSELoss()
optimizer = optim.Adam(conv_ae.parameters(), lr=0.001)

for epoch in range(30):
    conv_ae.train()
    optimizer.zero_grad()

    reconstructed, latent = conv_ae(dummy_images)
    loss = criterion(reconstructed, dummy_images)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        # Calculate PSNR (Peak Signal-to-Noise Ratio)
        mse = loss.item()
        psnr = 10 * np.log10(1.0 / mse)
        print(f'Epoch {epoch+1}, MSE: {mse:.6f}, PSNR: {psnr:.2f} dB')

# Test
conv_ae.eval()
with torch.no_grad():
    test_image = dummy_images[0:1]
    reconstructed_image, latent_code = conv_ae(test_image)

    # Compare images (first sample)
    original_np = test_image[0].permute(1, 2, 0).numpy()
    reconstructed_np = reconstructed_image[0].permute(1, 2, 0).numpy()

    # Error
    pixel_error = np.abs(original_np - reconstructed_np)
    mean_error = pixel_error.mean()

    print(f"\nReconstruction quality:")
    print(f"  Mean pixel error: {mean_error:.6f}")
    print(f"  Max pixel error: {pixel_error.max():.6f}")

# Output example:
# Compression ratio: 1176.00x
# Original: 150,528 → Compressed: 128
# Epoch 10, MSE: 0.023456, PSNR: 16.30 dB
# Epoch 20, MSE: 0.012345, PSNR: 19.09 dB
# Epoch 30, MSE: 0.008901, PSNR: 20.51 dB
#
# Reconstruction quality:
#   Mean pixel error: 0.008234
#   Max pixel error: 0.234567

4.7 Conditional VAE (Conditional Generation)

Generates process data under specified conditions (process conditions, set temperature, etc.).

Example 7: Conditional Process Data Generation

class ConditionalVAE(nn.Module):
    """Conditional VAE (C-VAE)"""

    def __init__(self, input_dim, condition_dim, latent_dim=8):
        """
        Args:
            input_dim: Data dimension
            condition_dim: Condition dimension (temperature, pressure setpoints, etc.)
            latent_dim: Latent dimension
        """
        super(ConditionalVAE, self).__init__()

        # Encoder: Data + Condition → Latent variable
        self.fc1 = nn.Linear(input_dim + condition_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc_mu = nn.Linear(64, latent_dim)
        self.fc_logvar = nn.Linear(64, latent_dim)

        # Decoder: Latent variable + Condition → Data
        self.fc3 = nn.Linear(latent_dim + condition_dim, 64)
        self.fc4 = nn.Linear(64, 128)
        self.fc5 = nn.Linear(128, input_dim)

    def encode(self, x, c):
        """Encode with condition

        Args:
            x: [batch, input_dim] Data
            c: [batch, condition_dim] Condition
        """
        h = torch.cat([x, c], dim=1)  # Concatenate condition
        h = torch.relu(self.fc1(h))
        h = torch.relu(self.fc2(h))
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def decode(self, z, c):
        """Decode with condition

        Args:
            z: [batch, latent_dim] Latent variable
            c: [batch, condition_dim] Condition
        """
        h = torch.cat([z, c], dim=1)  # Concatenate condition
        h = torch.relu(self.fc3(h))
        h = torch.relu(self.fc4(h))
        reconstructed = self.fc5(h)
        return reconstructed

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x, c):
        mu, logvar = self.encode(x, c)
        z = self.reparameterize(mu, logvar)
        reconstructed = self.decode(z, c)
        return reconstructed, mu, logvar

# Generate conditional process data
def generate_conditional_data(n_samples=1000):
    """Process data depending on condition (temperature)"""
    # Condition: Reaction temperature [300-500K]
    temperature = np.random.uniform(300, 500, n_samples)

    # Data: 5 variables dependent on temperature
    data = np.zeros((n_samples, 5))

    for i in range(n_samples):
        T = temperature[i]

        # Physicochemical relationships dependent on temperature
        data[i, 0] = 0.001 * T**2 - 0.3 * T + 50  # Reaction rate constant
        data[i, 1] = 100 * np.exp(-5000 / T)  # Equilibrium constant (Arrhenius type)
        data[i, 2] = 0.5 * T + 50  # Pressure
        data[i, 3] = -0.002 * T + 2.0  # pH
        data[i, 4] = 0.01 * T  # Concentration

        # Noise
        data[i] += np.random.randn(5) * 2

    # Normalization
    condition = (temperature - 300) / 200  # Normalize to 0-1
    condition = condition.reshape(-1, 1)

    return data, condition

# Generate data
data, conditions = generate_conditional_data(n_samples=1000)
data_normalized = (data - data.mean(axis=0)) / (data.std(axis=0) + 1e-8)

data_tensor = torch.FloatTensor(data_normalized)
condition_tensor = torch.FloatTensor(conditions)

# Train C-VAE
cvae = ConditionalVAE(input_dim=5, condition_dim=1, latent_dim=4)
optimizer = optim.Adam(cvae.parameters(), lr=0.001)

for epoch in range(150):
    cvae.train()
    optimizer.zero_grad()

    reconstructed, mu, logvar = cvae(data_tensor, condition_tensor)
    loss = vae_loss(reconstructed, data_tensor, mu, logvar, beta=0.5)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 30 == 0:
        print(f'Epoch {epoch+1}, Loss: {loss.item():.2f}')

# Generate with specified condition
cvae.eval()
with torch.no_grad():
    # Generate data at specific temperature (e.g., 400K)
    target_temp = 400  # K
    target_condition = torch.FloatTensor([[(target_temp - 300) / 200]])  # Normalize

    # Sample from latent space
    z_samples = torch.randn(10, 4)

    # Apply condition to all 10 samples
    conditions_repeated = target_condition.repeat(10, 1)

    # Generate
    generated_data = cvae.decode(z_samples, conditions_repeated)

    print(f"\nGenerated data for T={target_temp}K:")
    print(f"  Shape: {generated_data.shape}")
    print(f"  Mean (normalized): {generated_data.mean(dim=0).numpy()}")

    # Compare generation at different temperatures
    temps = [320, 380, 440, 500]
    print(f"\nData generation at different temperatures:")

    for temp in temps:
        cond = torch.FloatTensor([[(temp - 300) / 200]])
        z = torch.randn(1, 4)
        gen = cvae.decode(z, cond)
        print(f"  T={temp}K: Variable 0 = {gen[0, 0].item():.4f}")

# Output example:
# Epoch 30, Loss: 6789.01
# Epoch 60, Loss: 3456.78
# Epoch 90, Loss: 2345.67
# Epoch 120, Loss: 1987.65
# Epoch 150, Loss: 1765.43
#
# Generated data for T=400K:
#   Shape: torch.Size([10, 5])
#   Mean (normalized): [-0.12  0.34 -0.08  0.15 -0.23]
#
# Data generation at different temperatures:
#   T=320K: Variable 0 = -1.2345
#   T=380K: Variable 0 = -0.4567
#   T=440K: Variable 0 = 0.6789
#   T=500K: Variable 0 = 1.5432

4.8 Synthetic Process Data Generation with GAN

Using Generative Adversarial Network (GAN) to generate process data indistinguishable from real data. Used for data augmentation and solving small data problems.

Example 8: Process GAN

class Generator(nn.Module):
    """Process data generator"""

    def __init__(self, latent_dim=16, output_dim=10):
        super(Generator, self).__init__()

        self.model = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(64),

            nn.Linear(64, 128),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(128),

            nn.Linear(128, 256),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(256),

            nn.Linear(256, output_dim),
            nn.Tanh()  # Normalize to -1~1
        )

    def forward(self, z):
        """
        Args:
            z: [batch, latent_dim] Noise vector
        Returns:
            fake_data: [batch, output_dim] Generated data
        """
        return self.model(z)

class Discriminator(nn.Module):
    """Process data discriminator (real vs fake)"""

    def __init__(self, input_dim=10):
        super(Discriminator, self).__init__()

        self.model = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),

            nn.Linear(256, 128),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),

            nn.Linear(128, 64),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),

            nn.Linear(64, 1),
            nn.Sigmoid()  # 0-1 probability
        )

    def forward(self, x):
        """
        Args:
            x: [batch, input_dim] Data
        Returns:
            validity: [batch, 1] Probability of being real
        """
        return self.model(x)

# Real data
real_data = generate_process_data(n_samples=1000, n_features=10)
real_normalized = 2 * (real_data - real_data.min(axis=0)) / \
                  (real_data.max(axis=0) - real_data.min(axis=0) + 1e-8) - 1  # -1~1
real_tensor = torch.FloatTensor(real_normalized)

# Initialize GAN
latent_dim = 16
generator = Generator(latent_dim=latent_dim, output_dim=10)
discriminator = Discriminator(input_dim=10)

# Optimization
lr = 0.0002
optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))

criterion = nn.BCELoss()

# Training
batch_size = 64
n_epochs = 100

for epoch in range(n_epochs):
    # Train Discriminator
    for _ in range(2):  # Update Discriminator twice
        optimizer_D.zero_grad()

        # Real data
        idx = np.random.randint(0, len(real_tensor), batch_size)
        real_batch = real_tensor[idx]
        real_labels = torch.ones(batch_size, 1)

        # Fake data
        z = torch.randn(batch_size, latent_dim)
        fake_batch = generator(z).detach()
        fake_labels = torch.zeros(batch_size, 1)

        # Discriminator loss
        real_loss = criterion(discriminator(real_batch), real_labels)
        fake_loss = criterion(discriminator(fake_batch), fake_labels)
        d_loss = (real_loss + fake_loss) / 2

        d_loss.backward()
        optimizer_D.step()

    # Train Generator
    optimizer_G.zero_grad()

    z = torch.randn(batch_size, latent_dim)
    fake_batch = generator(z)

    # Generator wants to fool discriminator into thinking fake is real
    g_loss = criterion(discriminator(fake_batch), torch.ones(batch_size, 1))

    g_loss.backward()
    optimizer_G.step()

    if (epoch + 1) % 20 == 0:
        print(f'Epoch {epoch+1}/{n_epochs}, D Loss: {d_loss.item():.4f}, G Loss: {g_loss.item():.4f}')

# Evaluate generated data
generator.eval()
with torch.no_grad():
    z_samples = torch.randn(1000, latent_dim)
    generated_data = generator(z_samples).numpy()

# Statistical comparison
print(f"\nStatistical comparison:")
print(f"Real data mean: {real_normalized.mean(axis=0)[:3]}")
print(f"Generated mean: {generated_data.mean(axis=0)[:3]}")
print(f"Real data std:  {real_normalized.std(axis=0)[:3]}")
print(f"Generated std:  {generated_data.std(axis=0)[:3]}")

# Evaluation by Discriminator
real_score = discriminator(real_tensor).mean().item()
fake_score = discriminator(torch.FloatTensor(generated_data)).mean().item()

print(f"\nDiscriminator scores:")
print(f"  Real data: {real_score:.4f} (1.0 = perfect real)")
print(f"  Generated data: {fake_score:.4f} (0.5 = indistinguishable)")

# Output example:
# Epoch 20/100, D Loss: 0.5678, G Loss: 0.8901
# Epoch 40/100, D Loss: 0.4567, G Loss: 1.0234
# Epoch 60/100, D Loss: 0.3890, G Loss: 1.2345
# Epoch 80/100, D Loss: 0.3456, G Loss: 1.3456
# Epoch 100/100, D Loss: 0.3234, G Loss: 1.4123
#
# Statistical comparison:
# Real data mean: [-0.023  0.045 -0.012]
# Generated mean: [-0.034  0.056 -0.018]
# Real data std:  [0.567 0.623 0.589]
# Generated std:  [0.543 0.598 0.612]
#
# Discriminator scores:
#   Real data: 0.8234 (1.0 = perfect real)
#   Generated data: 0.4876 (0.5 = indistinguishable)

✅ GAN Applications

  • Data Augmentation: Generate large amounts of synthetic data from small amounts of real data
  • Anomaly Data Generation: Synthesize abnormal patterns lacking in training data
  • Simulation Alternative: Fast alternative to physical simulation
  • Privacy Protection: Synthetic data preserving statistical properties of real data

Learning Objectives Checklist

After completing this chapter, you should be able to implement and explain the following:

Basic Understanding

Practical Skills

Application Skills

References

  1. Hinton, G. E., & Salakhutdinov, R. R. (2006). "Reducing the Dimensionality of Data with Neural Networks." Science, 313(5786), 504-507.
  2. Vincent, P., et al. (2008). "Extracting and Composing Robust Features with Denoising Autoencoders." ICML 2008.
  3. Kingma, D. P., & Welling, M. (2014). "Auto-Encoding Variational Bayes." ICLR 2014.
  4. Goodfellow, I., et al. (2014). "Generative Adversarial Networks." NeurIPS 2014.
  5. Sakurada, M., & Yairi, T. (2014). "Anomaly Detection Using Autoencoders with Nonlinear Dimensionality Reduction." MLSDA Workshop 2014.
  6. Mirza, M., & Osindero, S. (2014). "Conditional Generative Adversarial Nets." arXiv:1411.1784.

Disclaimer