This chapter covers Autoencoders and Generative Models. You will learn encoder-decoder structure of autoencoders, reparameterization trick, and adversarial learning mechanism of GAN.
4.1 Vanilla Autoencoder (Basic Autoencoder)
An autoencoder is a neural network that compresses input data into a low-dimensional latent representation (encoding) and reconstructs the original data (decoding). It is effective for dimensionality reduction and feature extraction of process data.
💡 Basic Structure of Autoencoder
- Encoder: High-dimensional data → Low-dimensional latent representation (bottleneck)
- Decoder: Latent representation → Reconstructed data
- Objective: Minimize reconstruction error \(\mathcal{L} = ||x - \hat{x}||^2\)
Example 1: Dimensionality Reduction of Process Data
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
class VanillaAutoencoder(nn.Module):
"""Basic autoencoder"""
def __init__(self, input_dim, latent_dim=8):
"""
Args:
input_dim: Input dimension (number of process variables)
latent_dim: Latent space dimension
"""
super(VanillaAutoencoder, self).__init__()
# Encoder: Input → Latent representation
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, latent_dim)
)
# Decoder: Latent representation → Reconstruction
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 32),
nn.ReLU(),
nn.Linear(32, 64),
nn.ReLU(),
nn.Linear(64, input_dim)
)
def forward(self, x):
"""
Args:
x: [batch, input_dim] Input data
Returns:
reconstructed: [batch, input_dim] Reconstructed data
latent: [batch, latent_dim] Latent representation
"""
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed, latent
# Generate synthetic process data (10 variables → compress to 2D)
def generate_process_data(n_samples=1000, n_features=10):
"""Multivariate process data (actually determined by 2 principal components)"""
# Two latent factors
z1 = np.random.randn(n_samples)
z2 = np.random.randn(n_samples)
# Generate 10 variables (linear combination of z1, z2 + noise)
data = np.zeros((n_samples, n_features))
for i in range(n_features):
w1 = np.random.randn()
w2 = np.random.randn()
data[:, i] = w1 * z1 + w2 * z2 + 0.1 * np.random.randn(n_samples)
return data
# Data generation and normalization
data = generate_process_data(n_samples=1000, n_features=10)
data_mean = data.mean(axis=0)
data_std = data.std(axis=0)
data_normalized = (data - data_mean) / (data_std + 1e-8)
# Tensor conversion
data_tensor = torch.FloatTensor(data_normalized)
# Model training
model = VanillaAutoencoder(input_dim=10, latent_dim=2)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
model.train()
optimizer.zero_grad()
reconstructed, latent = model(data_tensor)
loss = criterion(reconstructed, data_tensor)
loss.backward()
optimizer.step()
if (epoch + 1) % 20 == 0:
print(f'Epoch {epoch+1}, Reconstruction Loss: {loss.item():.6f}')
# Visualize latent space
model.eval()
with torch.no_grad():
_, latent_codes = model(data_tensor)
latent_codes = latent_codes.numpy()
plt.figure(figsize=(8, 6))
plt.scatter(latent_codes[:, 0], latent_codes[:, 1], alpha=0.5, s=20)
plt.xlabel('Latent Dimension 1')
plt.ylabel('Latent Dimension 2')
plt.title('Latent Space Representation (10D → 2D)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
# plt.savefig('latent_space.png', dpi=150)
print(f"\nLatent space shape: {latent_codes.shape}")
print(f"Latent space range: [{latent_codes.min():.2f}, {latent_codes.max():.2f}]")
# Output example:
# Epoch 20, Reconstruction Loss: 0.125678
# Epoch 40, Reconstruction Loss: 0.056789
# Epoch 60, Reconstruction Loss: 0.034567
# Epoch 80, Reconstruction Loss: 0.023456
# Epoch 100, Reconstruction Loss: 0.018901
#
# Latent space shape: (1000, 2)
# Latent space range: [-3.45, 3.78]
4.2 Denoising Autoencoder (Noise Removal)
Recovers the true signal from process data containing sensor noise. It is trained to add noise to the input and reconstruct the original signal.
Example 2: Sensor Data Denoising
class DenoisingAutoencoder(nn.Module):
"""Denoising autoencoder"""
def __init__(self, input_dim, latent_dim=16):
super(DenoisingAutoencoder, self).__init__()
# Encoder
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, latent_dim)
)
# Decoder
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.ReLU(),
nn.Linear(64, 128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, input_dim)
)
def forward(self, x):
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed
def add_noise(data, noise_factor=0.3):
"""Add Gaussian noise
Args:
data: [samples, features] Clean data
noise_factor: Noise intensity
Returns:
noisy_data: Data with added noise
"""
noise = torch.randn_like(data) * noise_factor
noisy_data = data + noise
return noisy_data
# Clean process data
clean_data = torch.FloatTensor(generate_process_data(n_samples=1000, n_features=20))
# Normalization
clean_mean = clean_data.mean(dim=0, keepdim=True)
clean_std = clean_data.std(dim=0, keepdim=True)
clean_normalized = (clean_data - clean_mean) / (clean_std + 1e-8)
# Add noise
noisy_data = add_noise(clean_normalized, noise_factor=0.5)
# Model training
model = DenoisingAutoencoder(input_dim=20, latent_dim=16)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
model.train()
optimizer.zero_grad()
# Learn: Noisy input → Clean output
denoised = model(noisy_data)
loss = criterion(denoised, clean_normalized)
loss.backward()
optimizer.step()
if (epoch + 1) % 20 == 0:
print(f'Epoch {epoch+1}, Denoising Loss: {loss.item():.6f}')
# Test: Denoise new noisy data
model.eval()
with torch.no_grad():
test_clean = clean_normalized[:10]
test_noisy = add_noise(test_clean, noise_factor=0.5)
test_denoised = model(test_noisy)
# Calculate SNR (Signal-to-Noise Ratio)
noise_power = torch.mean((test_noisy - test_clean) ** 2)
residual_power = torch.mean((test_denoised - test_clean) ** 2)
snr_before = 10 * torch.log10(torch.mean(test_clean ** 2) / noise_power)
snr_after = 10 * torch.log10(torch.mean(test_clean ** 2) / residual_power)
print(f"\nSNR before denoising: {snr_before.item():.2f} dB")
print(f"SNR after denoising: {snr_after.item():.2f} dB")
print(f"Improvement: {(snr_after - snr_before).item():.2f} dB")
# Visualize one variable
time_steps = np.arange(10)
var_idx = 0
plt.figure(figsize=(10, 4))
plt.plot(time_steps, test_clean[:, var_idx].numpy(), 'g-', label='Clean', linewidth=2)
plt.plot(time_steps, test_noisy[:, var_idx].numpy(), 'r--', label='Noisy', alpha=0.7)
plt.plot(time_steps, test_denoised[:, var_idx].numpy(), 'b-', label='Denoised', linewidth=2)
plt.xlabel('Sample')
plt.ylabel('Normalized Value')
plt.title(f'Denoising Performance (Variable {var_idx+1})')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
# Output example:
# Epoch 20, Denoising Loss: 0.234567
# Epoch 40, Denoising Loss: 0.123456
# Epoch 60, Denoising Loss: 0.078901
# Epoch 80, Denoising Loss: 0.056789
# Epoch 100, Denoising Loss: 0.045678
#
# SNR before denoising: 5.23 dB
# SNR after denoising: 18.76 dB
# Improvement: 13.53 dB
4.3 Variational Autoencoder (VAE)
VAE is a probabilistic generative model that can sample new data from the latent space. It is used for process data generation and interpolation.
💡 VAE Characteristics
- Probabilistic Encoding: Latent variable \(z \sim \mathcal{N}(\mu, \sigma^2)\)
- KL divergence term: Regularization to bring latent distribution closer to standard normal distribution
- Generation capability: Generate new data by sampling from latent space
VAE loss function:
$$\mathcal{L} = \mathbb{E}_{q(z|x)}[\log p(x|z)] - D_{KL}(q(z|x) || p(z))$$
Example 3: Process Data Generation with VAE
class VAE(nn.Module):
"""Variational Autoencoder"""
def __init__(self, input_dim, latent_dim=8):
super(VAE, self).__init__()
# Encoder
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 64)
# Output mean and variance
self.fc_mu = nn.Linear(64, latent_dim)
self.fc_logvar = nn.Linear(64, latent_dim)
# Decoder
self.fc3 = nn.Linear(latent_dim, 64)
self.fc4 = nn.Linear(64, 128)
self.fc5 = nn.Linear(128, input_dim)
def encode(self, x):
"""Encoder: Output mean and log variance
Returns:
mu: [batch, latent_dim] Mean
logvar: [batch, latent_dim] log(σ²)
"""
h = torch.relu(self.fc1(x))
h = torch.relu(self.fc2(h))
mu = self.fc_mu(h)
logvar = self.fc_logvar(h)
return mu, logvar
def reparameterize(self, mu, logvar):
"""Reparameterization trick
z = μ + σ * ε, where ε ~ N(0, 1)
"""
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
z = mu + eps * std
return z
def decode(self, z):
"""Decoder: Reconstruct from latent variable"""
h = torch.relu(self.fc3(z))
h = torch.relu(self.fc4(h))
reconstructed = self.fc5(h)
return reconstructed
def forward(self, x):
"""
Returns:
reconstructed: Reconstructed data
mu: Mean of latent variable
logvar: Log variance of latent variable
"""
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
reconstructed = self.decode(z)
return reconstructed, mu, logvar
def vae_loss(reconstructed, original, mu, logvar, beta=1.0):
"""VAE loss = Reconstruction + KL divergence
Args:
beta: Weight of KL term (β-VAE)
"""
# Reconstruction loss (MSE)
recon_loss = nn.functional.mse_loss(reconstructed, original, reduction='sum')
# KL divergence: -0.5 * Σ(1 + log(σ²) - μ² - σ²)
kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return recon_loss + beta * kl_loss
# Process data
data = generate_process_data(n_samples=1000, n_features=15)
data_normalized = (data - data.mean(axis=0)) / (data.std(axis=0) + 1e-8)
data_tensor = torch.FloatTensor(data_normalized)
# VAE training
model = VAE(input_dim=15, latent_dim=4)
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(150):
model.train()
optimizer.zero_grad()
reconstructed, mu, logvar = model(data_tensor)
loss = vae_loss(reconstructed, data_tensor, mu, logvar, beta=0.5)
loss.backward()
optimizer.step()
if (epoch + 1) % 30 == 0:
print(f'Epoch {epoch+1}, Loss: {loss.item():.2f}')
# Generate new data
model.eval()
with torch.no_grad():
# Sample from latent space
z_samples = torch.randn(10, 4) # 10 samples from standard normal distribution
generated_data = model.decode(z_samples)
print(f"\nGenerated data shape: {generated_data.shape}")
print(f"Generated data range: [{generated_data.min():.2f}, {generated_data.max():.2f}]")
# Statistical comparison with original data
original_mean = data_tensor.mean(dim=0)
generated_mean = generated_data.mean(dim=0)
print(f"\nOriginal data mean (first 5 vars): {original_mean[:5].numpy()}")
print(f"Generated data mean (first 5 vars): {generated_mean[:5].numpy()}")
# Output example:
# Epoch 30, Loss: 8765.43
# Epoch 60, Loss: 4321.09
# Epoch 90, Loss: 2987.65
# Epoch 120, Loss: 2456.78
# Epoch 150, Loss: 2234.56
#
# Generated data shape: torch.Size([10, 15])
# Generated data range: [-2.87, 2.45]
#
# Original data mean (first 5 vars): [-0.01 0.02 -0.00 0.01 -0.02]
# Generated data mean (first 5 vars): [-0.15 0.23 -0.08 0.12 -0.18]
4.4 Anomaly Detection Using Reconstruction Error
An autoencoder trained on normal data cannot accurately reconstruct abnormal data. This property is utilized to detect anomalies.
Example 4: Process Anomaly Detection System
class AnomalyDetector:
"""Autoencoder-based anomaly detection"""
def __init__(self, autoencoder, threshold_percentile=95):
"""
Args:
autoencoder: Trained autoencoder
threshold_percentile: Anomaly decision threshold (percentile of normal data reconstruction error)
"""
self.autoencoder = autoencoder
self.threshold = None
self.threshold_percentile = threshold_percentile
def fit_threshold(self, normal_data):
"""Determine threshold from normal data
Args:
normal_data: [samples, features] Normal process data
"""
self.autoencoder.eval()
with torch.no_grad():
reconstructed, _ = self.autoencoder(normal_data)
reconstruction_errors = torch.mean((normal_data - reconstructed) ** 2, dim=1)
# Set threshold by percentile
self.threshold = torch.quantile(reconstruction_errors, self.threshold_percentile / 100.0)
print(f"Anomaly threshold set to: {self.threshold.item():.6f}")
print(f"Based on {self.threshold_percentile}th percentile of normal data errors")
return reconstruction_errors
def detect(self, data):
"""Anomaly detection
Args:
data: [samples, features] Test data
Returns:
is_anomaly: [samples] Boolean array (True = anomaly)
scores: [samples] Anomaly score (reconstruction error)
"""
self.autoencoder.eval()
with torch.no_grad():
reconstructed, _ = self.autoencoder(data)
scores = torch.mean((data - reconstructed) ** 2, dim=1)
is_anomaly = scores > self.threshold
return is_anomaly, scores
# Train on normal data
normal_data = generate_process_data(n_samples=800, n_features=10)
normal_normalized = (normal_data - normal_data.mean(axis=0)) / (normal_data.std(axis=0) + 1e-8)
normal_tensor = torch.FloatTensor(normal_normalized)
# Train autoencoder
ae_model = VanillaAutoencoder(input_dim=10, latent_dim=3)
optimizer = optim.Adam(ae_model.parameters(), lr=0.001)
criterion = nn.MSELoss()
for epoch in range(100):
ae_model.train()
optimizer.zero_grad()
reconstructed, _ = ae_model(normal_tensor)
loss = criterion(reconstructed, normal_tensor)
loss.backward()
optimizer.step()
if (epoch + 1) % 25 == 0:
print(f'Training Epoch {epoch+1}, Loss: {loss.item():.6f}')
# Initialize anomaly detector
detector = AnomalyDetector(ae_model, threshold_percentile=95)
normal_errors = detector.fit_threshold(normal_tensor)
# Generate anomaly data (add large deviations to some variables)
anomaly_data = generate_process_data(n_samples=200, n_features=10)
# Anomaly pattern 1: Specific variable has abnormal value
anomaly_data[:100, 0] += 5.0 # Variable 1 is abnormal
# Anomaly pattern 2: Abnormal correlation of multiple variables
anomaly_data[100:, [2, 5, 7]] += 3.0
anomaly_normalized = (anomaly_data - normal_data.mean(axis=0)) / (normal_data.std(axis=0) + 1e-8)
anomaly_tensor = torch.FloatTensor(anomaly_normalized)
# Anomaly detection
is_anomaly, anomaly_scores = detector.detect(anomaly_tensor)
# Evaluation
n_detected = is_anomaly.sum().item()
detection_rate = 100 * n_detected / len(anomaly_tensor)
print(f"\nAnomaly detection results:")
print(f" Total test samples: {len(anomaly_tensor)}")
print(f" Detected anomalies: {n_detected}")
print(f" Detection rate: {detection_rate:.2f}%")
print(f" Score range: [{anomaly_scores.min():.6f}, {anomaly_scores.max():.6f}]")
# False positive rate on normal data
false_positive = (normal_errors > detector.threshold).sum().item()
fpr = 100 * false_positive / len(normal_errors)
print(f" False positive rate: {fpr:.2f}%")
# Visualization
plt.figure(figsize=(10, 4))
plt.hist(normal_errors.numpy(), bins=50, alpha=0.7, label='Normal', color='green')
plt.hist(anomaly_scores.numpy(), bins=50, alpha=0.7, label='Anomaly', color='red')
plt.axvline(detector.threshold.item(), color='black', linestyle='--', label='Threshold')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
plt.title('Anomaly Detection: Reconstruction Error Distribution')
plt.yscale('log')
plt.grid(True, alpha=0.3)
plt.tight_layout()
# Output example:
# Training Epoch 25, Loss: 0.045678
# Training Epoch 50, Loss: 0.023456
# Training Epoch 75, Loss: 0.015678
# Training Epoch 100, Loss: 0.012345
# Anomaly threshold set to: 0.034567
# Based on 95th percentile of normal data errors
#
# Anomaly detection results:
# Total test samples: 200
# Detected anomalies: 187
# Detection rate: 93.50%
# Score range: [0.012345, 0.234567]
# False positive rate: 5.00%
4.5 Sparse Autoencoder (Sparse Feature Extraction)
Through sparsity regularization, only a small number of important features are activated. Important combinations can be discovered from process variables.
Example 5: Sparse Feature Extraction
class SparseAutoencoder(nn.Module):
"""Sparse autoencoder"""
def __init__(self, input_dim, latent_dim=20):
super(SparseAutoencoder, self).__init__()
# Encoder (set latent layer larger and constrain with sparsity)
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, latent_dim),
nn.ReLU() # Non-negative constraint (ReLU)
)
# Decoder
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 128),
nn.ReLU(),
nn.Linear(128, input_dim)
)
def forward(self, x):
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed, latent
def sparse_loss(reconstructed, original, latent, sparsity_weight=0.01, sparsity_target=0.05):
"""Loss function considering sparsity
Args:
sparsity_weight: Weight of sparsity term
sparsity_target: Target sparsity rate (e.g., 5% activation is ideal)
"""
# Reconstruction loss
recon_loss = nn.functional.mse_loss(reconstructed, original)
# Sparsity loss: KL divergence between target and actual activation
# Average activation rate of each unit
rho = torch.mean(latent, dim=0) # [latent_dim]
rho_hat = torch.tensor([sparsity_target] * latent.size(1))
# KL(ρ || ρ̂)
kl_div = sparsity_target * torch.log(sparsity_target / (rho + 1e-8)) + \
(1 - sparsity_target) * torch.log((1 - sparsity_target) / (1 - rho + 1e-8))
sparsity_loss = torch.sum(kl_div)
return recon_loss + sparsity_weight * sparsity_loss
# Process data (30 variables)
large_data = generate_process_data(n_samples=1000, n_features=30)
large_normalized = (large_data - large_data.mean(axis=0)) / (large_data.std(axis=0) + 1e-8)
large_tensor = torch.FloatTensor(large_normalized)
# Train sparse autoencoder
sparse_model = SparseAutoencoder(input_dim=30, latent_dim=20)
optimizer = optim.Adam(sparse_model.parameters(), lr=0.001)
for epoch in range(100):
sparse_model.train()
optimizer.zero_grad()
reconstructed, latent = sparse_model(large_tensor)
loss = sparse_loss(reconstructed, large_tensor, latent,
sparsity_weight=0.05, sparsity_target=0.1)
loss.backward()
optimizer.step()
if (epoch + 1) % 20 == 0:
# Calculate sparsity rate
with torch.no_grad():
_, latent_codes = sparse_model(large_tensor)
sparsity_rate = (latent_codes < 0.01).float().mean().item()
print(f'Epoch {epoch+1}, Loss: {loss.item():.6f}, Sparsity: {100*sparsity_rate:.2f}%')
# Analyze important features
sparse_model.eval()
with torch.no_grad():
_, latent_codes = sparse_model(large_tensor)
# Activation frequency of each latent unit
activation_freq = (latent_codes > 0.1).float().mean(dim=0).numpy()
# Top-5 active units
top_units = np.argsort(activation_freq)[-5:][::-1]
print(f"\nTop 5 active latent units:")
for i, unit_id in enumerate(top_units):
print(f" {i+1}. Unit {unit_id}: {100*activation_freq[unit_id]:.2f}% activation")
# Importance of input variables represented by each unit
encoder_weights = sparse_model.encoder[2].weight.data.numpy() # [latent, 128]
print(f"\nLatent unit representations:")
for unit_id in top_units[:3]:
weights = encoder_weights[unit_id]
top_inputs = np.argsort(np.abs(weights))[-5:][::-1]
print(f" Unit {unit_id} → Input variables: {top_inputs}")
# Output example:
# Epoch 20, Loss: 0.567890, Sparsity: 62.34%
# Epoch 40, Loss: 0.345678, Sparsity: 78.56%
# Epoch 60, Loss: 0.234567, Sparsity: 85.23%
# Epoch 80, Loss: 0.178901, Sparsity: 88.45%
# Epoch 100, Loss: 0.145678, Sparsity: 89.67%
#
# Top 5 active latent units:
# 1. Unit 12: 45.67% activation
# 2. Unit 7: 38.92% activation
# 3. Unit 18: 32.45% activation
# 4. Unit 3: 28.76% activation
# 5. Unit 15: 24.33% activation
#
# Latent unit representations:
# Unit 12 → Input variables: [ 3 7 12 18 23]
# Unit 7 → Input variables: [ 1 5 14 19 27]
# Unit 18 → Input variables: [ 2 9 11 16 25]
4.6 Convolutional Autoencoder (Image Compression)
An autoencoder using convolutional layers efficiently compresses and restores process images.
Example 6: Process Image Compression
class ConvAutoencoder(nn.Module):
"""Convolutional autoencoder"""
def __init__(self, latent_dim=64):
super(ConvAutoencoder, self).__init__()
# Encoder
self.encoder = nn.Sequential(
# 224x224 → 112x112
nn.Conv2d(3, 32, 3, stride=2, padding=1),
nn.ReLU(),
nn.BatchNorm2d(32),
# 112x112 → 56x56
nn.Conv2d(32, 64, 3, stride=2, padding=1),
nn.ReLU(),
nn.BatchNorm2d(64),
# 56x56 → 28x28
nn.Conv2d(64, 128, 3, stride=2, padding=1),
nn.ReLU(),
nn.BatchNorm2d(128),
# 28x28 → 14x14
nn.Conv2d(128, 256, 3, stride=2, padding=1),
nn.ReLU(),
nn.BatchNorm2d(256)
)
# Bottleneck: 256*14*14 → latent_dim
self.fc_encode = nn.Linear(256 * 14 * 14, latent_dim)
# Bottleneck: latent_dim → 256*14*14
self.fc_decode = nn.Linear(latent_dim, 256 * 14 * 14)
# Decoder
self.decoder = nn.Sequential(
# 14x14 → 28x28
nn.ConvTranspose2d(256, 128, 3, stride=2, padding=1, output_padding=1),
nn.ReLU(),
nn.BatchNorm2d(128),
# 28x28 → 56x56
nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
nn.ReLU(),
nn.BatchNorm2d(64),
# 56x56 → 112x112
nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
nn.ReLU(),
nn.BatchNorm2d(32),
# 112x112 → 224x224
nn.ConvTranspose2d(32, 3, 3, stride=2, padding=1, output_padding=1),
nn.Sigmoid() # Normalize to 0-1
)
def forward(self, x):
"""
Args:
x: [batch, 3, 224, 224]
Returns:
reconstructed: [batch, 3, 224, 224]
latent: [batch, latent_dim]
"""
# Encode
encoded = self.encoder(x) # [batch, 256, 14, 14]
encoded_flat = encoded.view(encoded.size(0), -1) # [batch, 256*14*14]
latent = self.fc_encode(encoded_flat) # [batch, latent_dim]
# Decode
decoded_flat = self.fc_decode(latent) # [batch, 256*14*14]
decoded = decoded_flat.view(-1, 256, 14, 14) # [batch, 256, 14, 14]
reconstructed = self.decoder(decoded) # [batch, 3, 224, 224]
return reconstructed, latent
# Dummy image data (actually process images)
dummy_images = torch.rand(16, 3, 224, 224) # Normalized to 0-1
# Model
conv_ae = ConvAutoencoder(latent_dim=128)
# Calculate compression ratio
original_size = 3 * 224 * 224 # 150,528
compressed_size = 128
compression_ratio = original_size / compressed_size
print(f"Compression ratio: {compression_ratio:.2f}x")
print(f"Original: {original_size:,} → Compressed: {compressed_size}")
# Training
criterion = nn.MSELoss()
optimizer = optim.Adam(conv_ae.parameters(), lr=0.001)
for epoch in range(30):
conv_ae.train()
optimizer.zero_grad()
reconstructed, latent = conv_ae(dummy_images)
loss = criterion(reconstructed, dummy_images)
loss.backward()
optimizer.step()
if (epoch + 1) % 10 == 0:
# Calculate PSNR (Peak Signal-to-Noise Ratio)
mse = loss.item()
psnr = 10 * np.log10(1.0 / mse)
print(f'Epoch {epoch+1}, MSE: {mse:.6f}, PSNR: {psnr:.2f} dB')
# Test
conv_ae.eval()
with torch.no_grad():
test_image = dummy_images[0:1]
reconstructed_image, latent_code = conv_ae(test_image)
# Compare images (first sample)
original_np = test_image[0].permute(1, 2, 0).numpy()
reconstructed_np = reconstructed_image[0].permute(1, 2, 0).numpy()
# Error
pixel_error = np.abs(original_np - reconstructed_np)
mean_error = pixel_error.mean()
print(f"\nReconstruction quality:")
print(f" Mean pixel error: {mean_error:.6f}")
print(f" Max pixel error: {pixel_error.max():.6f}")
# Output example:
# Compression ratio: 1176.00x
# Original: 150,528 → Compressed: 128
# Epoch 10, MSE: 0.023456, PSNR: 16.30 dB
# Epoch 20, MSE: 0.012345, PSNR: 19.09 dB
# Epoch 30, MSE: 0.008901, PSNR: 20.51 dB
#
# Reconstruction quality:
# Mean pixel error: 0.008234
# Max pixel error: 0.234567
4.7 Conditional VAE (Conditional Generation)
Generates process data under specified conditions (process conditions, set temperature, etc.).
Example 7: Conditional Process Data Generation
class ConditionalVAE(nn.Module):
"""Conditional VAE (C-VAE)"""
def __init__(self, input_dim, condition_dim, latent_dim=8):
"""
Args:
input_dim: Data dimension
condition_dim: Condition dimension (temperature, pressure setpoints, etc.)
latent_dim: Latent dimension
"""
super(ConditionalVAE, self).__init__()
# Encoder: Data + Condition → Latent variable
self.fc1 = nn.Linear(input_dim + condition_dim, 128)
self.fc2 = nn.Linear(128, 64)
self.fc_mu = nn.Linear(64, latent_dim)
self.fc_logvar = nn.Linear(64, latent_dim)
# Decoder: Latent variable + Condition → Data
self.fc3 = nn.Linear(latent_dim + condition_dim, 64)
self.fc4 = nn.Linear(64, 128)
self.fc5 = nn.Linear(128, input_dim)
def encode(self, x, c):
"""Encode with condition
Args:
x: [batch, input_dim] Data
c: [batch, condition_dim] Condition
"""
h = torch.cat([x, c], dim=1) # Concatenate condition
h = torch.relu(self.fc1(h))
h = torch.relu(self.fc2(h))
mu = self.fc_mu(h)
logvar = self.fc_logvar(h)
return mu, logvar
def decode(self, z, c):
"""Decode with condition
Args:
z: [batch, latent_dim] Latent variable
c: [batch, condition_dim] Condition
"""
h = torch.cat([z, c], dim=1) # Concatenate condition
h = torch.relu(self.fc3(h))
h = torch.relu(self.fc4(h))
reconstructed = self.fc5(h)
return reconstructed
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def forward(self, x, c):
mu, logvar = self.encode(x, c)
z = self.reparameterize(mu, logvar)
reconstructed = self.decode(z, c)
return reconstructed, mu, logvar
# Generate conditional process data
def generate_conditional_data(n_samples=1000):
"""Process data depending on condition (temperature)"""
# Condition: Reaction temperature [300-500K]
temperature = np.random.uniform(300, 500, n_samples)
# Data: 5 variables dependent on temperature
data = np.zeros((n_samples, 5))
for i in range(n_samples):
T = temperature[i]
# Physicochemical relationships dependent on temperature
data[i, 0] = 0.001 * T**2 - 0.3 * T + 50 # Reaction rate constant
data[i, 1] = 100 * np.exp(-5000 / T) # Equilibrium constant (Arrhenius type)
data[i, 2] = 0.5 * T + 50 # Pressure
data[i, 3] = -0.002 * T + 2.0 # pH
data[i, 4] = 0.01 * T # Concentration
# Noise
data[i] += np.random.randn(5) * 2
# Normalization
condition = (temperature - 300) / 200 # Normalize to 0-1
condition = condition.reshape(-1, 1)
return data, condition
# Generate data
data, conditions = generate_conditional_data(n_samples=1000)
data_normalized = (data - data.mean(axis=0)) / (data.std(axis=0) + 1e-8)
data_tensor = torch.FloatTensor(data_normalized)
condition_tensor = torch.FloatTensor(conditions)
# Train C-VAE
cvae = ConditionalVAE(input_dim=5, condition_dim=1, latent_dim=4)
optimizer = optim.Adam(cvae.parameters(), lr=0.001)
for epoch in range(150):
cvae.train()
optimizer.zero_grad()
reconstructed, mu, logvar = cvae(data_tensor, condition_tensor)
loss = vae_loss(reconstructed, data_tensor, mu, logvar, beta=0.5)
loss.backward()
optimizer.step()
if (epoch + 1) % 30 == 0:
print(f'Epoch {epoch+1}, Loss: {loss.item():.2f}')
# Generate with specified condition
cvae.eval()
with torch.no_grad():
# Generate data at specific temperature (e.g., 400K)
target_temp = 400 # K
target_condition = torch.FloatTensor([[(target_temp - 300) / 200]]) # Normalize
# Sample from latent space
z_samples = torch.randn(10, 4)
# Apply condition to all 10 samples
conditions_repeated = target_condition.repeat(10, 1)
# Generate
generated_data = cvae.decode(z_samples, conditions_repeated)
print(f"\nGenerated data for T={target_temp}K:")
print(f" Shape: {generated_data.shape}")
print(f" Mean (normalized): {generated_data.mean(dim=0).numpy()}")
# Compare generation at different temperatures
temps = [320, 380, 440, 500]
print(f"\nData generation at different temperatures:")
for temp in temps:
cond = torch.FloatTensor([[(temp - 300) / 200]])
z = torch.randn(1, 4)
gen = cvae.decode(z, cond)
print(f" T={temp}K: Variable 0 = {gen[0, 0].item():.4f}")
# Output example:
# Epoch 30, Loss: 6789.01
# Epoch 60, Loss: 3456.78
# Epoch 90, Loss: 2345.67
# Epoch 120, Loss: 1987.65
# Epoch 150, Loss: 1765.43
#
# Generated data for T=400K:
# Shape: torch.Size([10, 5])
# Mean (normalized): [-0.12 0.34 -0.08 0.15 -0.23]
#
# Data generation at different temperatures:
# T=320K: Variable 0 = -1.2345
# T=380K: Variable 0 = -0.4567
# T=440K: Variable 0 = 0.6789
# T=500K: Variable 0 = 1.5432
4.8 Synthetic Process Data Generation with GAN
Using Generative Adversarial Network (GAN) to generate process data indistinguishable from real data. Used for data augmentation and solving small data problems.
Example 8: Process GAN
class Generator(nn.Module):
"""Process data generator"""
def __init__(self, latent_dim=16, output_dim=10):
super(Generator, self).__init__()
self.model = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.LeakyReLU(0.2),
nn.BatchNorm1d(64),
nn.Linear(64, 128),
nn.LeakyReLU(0.2),
nn.BatchNorm1d(128),
nn.Linear(128, 256),
nn.LeakyReLU(0.2),
nn.BatchNorm1d(256),
nn.Linear(256, output_dim),
nn.Tanh() # Normalize to -1~1
)
def forward(self, z):
"""
Args:
z: [batch, latent_dim] Noise vector
Returns:
fake_data: [batch, output_dim] Generated data
"""
return self.model(z)
class Discriminator(nn.Module):
"""Process data discriminator (real vs fake)"""
def __init__(self, input_dim=10):
super(Discriminator, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_dim, 256),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(256, 128),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(64, 1),
nn.Sigmoid() # 0-1 probability
)
def forward(self, x):
"""
Args:
x: [batch, input_dim] Data
Returns:
validity: [batch, 1] Probability of being real
"""
return self.model(x)
# Real data
real_data = generate_process_data(n_samples=1000, n_features=10)
real_normalized = 2 * (real_data - real_data.min(axis=0)) / \
(real_data.max(axis=0) - real_data.min(axis=0) + 1e-8) - 1 # -1~1
real_tensor = torch.FloatTensor(real_normalized)
# Initialize GAN
latent_dim = 16
generator = Generator(latent_dim=latent_dim, output_dim=10)
discriminator = Discriminator(input_dim=10)
# Optimization
lr = 0.0002
optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))
criterion = nn.BCELoss()
# Training
batch_size = 64
n_epochs = 100
for epoch in range(n_epochs):
# Train Discriminator
for _ in range(2): # Update Discriminator twice
optimizer_D.zero_grad()
# Real data
idx = np.random.randint(0, len(real_tensor), batch_size)
real_batch = real_tensor[idx]
real_labels = torch.ones(batch_size, 1)
# Fake data
z = torch.randn(batch_size, latent_dim)
fake_batch = generator(z).detach()
fake_labels = torch.zeros(batch_size, 1)
# Discriminator loss
real_loss = criterion(discriminator(real_batch), real_labels)
fake_loss = criterion(discriminator(fake_batch), fake_labels)
d_loss = (real_loss + fake_loss) / 2
d_loss.backward()
optimizer_D.step()
# Train Generator
optimizer_G.zero_grad()
z = torch.randn(batch_size, latent_dim)
fake_batch = generator(z)
# Generator wants to fool discriminator into thinking fake is real
g_loss = criterion(discriminator(fake_batch), torch.ones(batch_size, 1))
g_loss.backward()
optimizer_G.step()
if (epoch + 1) % 20 == 0:
print(f'Epoch {epoch+1}/{n_epochs}, D Loss: {d_loss.item():.4f}, G Loss: {g_loss.item():.4f}')
# Evaluate generated data
generator.eval()
with torch.no_grad():
z_samples = torch.randn(1000, latent_dim)
generated_data = generator(z_samples).numpy()
# Statistical comparison
print(f"\nStatistical comparison:")
print(f"Real data mean: {real_normalized.mean(axis=0)[:3]}")
print(f"Generated mean: {generated_data.mean(axis=0)[:3]}")
print(f"Real data std: {real_normalized.std(axis=0)[:3]}")
print(f"Generated std: {generated_data.std(axis=0)[:3]}")
# Evaluation by Discriminator
real_score = discriminator(real_tensor).mean().item()
fake_score = discriminator(torch.FloatTensor(generated_data)).mean().item()
print(f"\nDiscriminator scores:")
print(f" Real data: {real_score:.4f} (1.0 = perfect real)")
print(f" Generated data: {fake_score:.4f} (0.5 = indistinguishable)")
# Output example:
# Epoch 20/100, D Loss: 0.5678, G Loss: 0.8901
# Epoch 40/100, D Loss: 0.4567, G Loss: 1.0234
# Epoch 60/100, D Loss: 0.3890, G Loss: 1.2345
# Epoch 80/100, D Loss: 0.3456, G Loss: 1.3456
# Epoch 100/100, D Loss: 0.3234, G Loss: 1.4123
#
# Statistical comparison:
# Real data mean: [-0.023 0.045 -0.012]
# Generated mean: [-0.034 0.056 -0.018]
# Real data std: [0.567 0.623 0.589]
# Generated std: [0.543 0.598 0.612]
#
# Discriminator scores:
# Real data: 0.8234 (1.0 = perfect real)
# Generated data: 0.4876 (0.5 = indistinguishable)
✅ GAN Applications
- Data Augmentation: Generate large amounts of synthetic data from small amounts of real data
- Anomaly Data Generation: Synthesize abnormal patterns lacking in training data
- Simulation Alternative: Fast alternative to physical simulation
- Privacy Protection: Synthetic data preserving statistical properties of real data
Learning Objectives Checklist
After completing this chapter, you should be able to implement and explain the following:
Basic Understanding
- Explain the encoder-decoder structure of autoencoders and the reconstruction task
- Understand the reparameterization trick and the role of the KL divergence term in VAE
- Explain the adversarial learning mechanism of GAN
- Understand the principle of anomaly detection based on reconstruction error
Practical Skills
- Implement Autoencoder in PyTorch and perform dimensionality reduction on process data
- Remove sensor noise with Denoising Autoencoder
- Generate new process data with VAE
- Build anomaly detection system using reconstruction error
- Extract important feature combinations with Sparse Autoencoder
- Compress images with Convolutional Autoencoder
- Perform conditional data generation with Conditional VAE
- Generate statistically valid synthetic data with GAN
Application Skills
- Select appropriate Autoencoder architecture according to process characteristics
- Set appropriate thresholds for anomaly detection and control false positive rate
- Solve small data problems with GAN data augmentation
- Synthesize data under specific process conditions with conditional generation
References
- Hinton, G. E., & Salakhutdinov, R. R. (2006). "Reducing the Dimensionality of Data with Neural Networks." Science, 313(5786), 504-507.
- Vincent, P., et al. (2008). "Extracting and Composing Robust Features with Denoising Autoencoders." ICML 2008.
- Kingma, D. P., & Welling, M. (2014). "Auto-Encoding Variational Bayes." ICLR 2014.
- Goodfellow, I., et al. (2014). "Generative Adversarial Networks." NeurIPS 2014.
- Sakurada, M., & Yairi, T. (2014). "Anomaly Detection Using Autoencoders with Nonlinear Dimensionality Reduction." MLSDA Workshop 2014.
- Mirza, M., & Osindero, S. (2014). "Conditional Generative Adversarial Nets." arXiv:1411.1784.
Disclaimer
- This content is provided solely for educational, research, and informational purposes and does not constitute professional advice (legal, accounting, technical warranty, etc.).
- This content and accompanying code examples are provided "AS IS" without any warranty, express or implied, including but not limited to merchantability, fitness for a particular purpose, non-infringement, accuracy, completeness, operation, or safety.
- The author and Tohoku University assume no responsibility for the content, availability, or safety of external links, third-party data, tools, libraries, etc.
- To the maximum extent permitted by applicable law, the author and Tohoku University shall not be liable for any direct, indirect, incidental, special, consequential, or punitive damages arising from the use, execution, or interpretation of this content.
- The content may be changed, updated, or discontinued without notice.
- The copyright and license of this content are subject to the stated conditions (e.g., CC BY 4.0). Such licenses typically include no-warranty clauses.