This chapter covers CNN. You will learn roles of convolutional, how CNNs preserve spatial structure of images, and encoder-decoder structure of U-Net.
3.1 CNN Fundamentals (Convolutional and Pooling Layers)
Convolutional Neural Networks (CNNs) are networks that can extract features while preserving the spatial structure of images. In process industries, they are utilized for quality inspection using camera images, thermal image analysis, equipment degradation diagnosis, and more.
💡 Basic Components of CNN
- Convolutional Layer: Extracts local features using filters
- Pooling Layer: Achieves position invariance through downsampling
- Fully Connected Layer: Uses extracted features for classification and regression
Convolution operation formula:
$$(I * K)(i, j) = \sum_m \sum_n I(i+m, j+n) \cdot K(m, n)$$
Example 1: Basic CNN Implementation (Process Image Classification)
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# - torchvision>=0.15.0
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torchvision import transforms
class SimpleCNN(nn.Module):
"""Basic CNN for process image classification"""
def __init__(self, num_classes=3):
"""
Args:
num_classes: Number of classes (normal/minor anomaly/severe anomaly, etc.)
"""
super(SimpleCNN, self).__init__()
# Convolutional layer 1: 3 channels (RGB) → 32 channels
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
self.bn1 = nn.BatchNorm2d(32)
# Convolutional layer 2: 32 → 64
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.bn2 = nn.BatchNorm2d(64)
# Convolutional layer 3: 64 → 128
self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
self.bn3 = nn.BatchNorm2d(128)
# Pooling layer (2x2, stride 2)
self.pool = nn.MaxPool2d(2, 2)
# Fully connected layers
# For 224x224 input images, after 3 pooling operations: 28x28
self.fc1 = nn.Linear(128 * 28 * 28, 512)
self.fc2 = nn.Linear(512, num_classes)
self.dropout = nn.Dropout(0.5)
def forward(self, x):
"""
Args:
x: [batch, 3, 224, 224] RGB image
Returns:
output: [batch, num_classes] Class probabilities
"""
# Convolutional layer 1 + ReLU + Pooling
x = self.pool(F.relu(self.bn1(self.conv1(x)))) # [batch, 32, 112, 112]
# Convolutional layer 2 + ReLU + Pooling
x = self.pool(F.relu(self.bn2(self.conv2(x)))) # [batch, 64, 56, 56]
# Convolutional layer 3 + ReLU + Pooling
x = self.pool(F.relu(self.bn3(self.conv3(x)))) # [batch, 128, 28, 28]
# Flatten
x = x.view(x.size(0), -1) # [batch, 128*28*28]
# Fully connected layers
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
# Test with synthetic data (in practice, use camera images from production line)
batch_size = 8
model = SimpleCNN(num_classes=3)
# Dummy images (224x224 RGB images)
dummy_images = torch.randn(batch_size, 3, 224, 224)
# Forward pass
output = model(dummy_images)
print(f"Output shape: {output.shape}") # [8, 3]
print(f"Class probabilities (sample 1): {F.softmax(output[0], dim=0)}")
# Check parameter count
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")
# Output example:
# Output shape: torch.Size([8, 3])
# Class probabilities (sample 1): tensor([0.3456, 0.4123, 0.2421], grad_fn=<softmaxbackward>)
# Total parameters: 40,363,971
</softmaxbackward>
3.2 Image Classification of Process States
Classify the state of the process (normal/abnormal) from camera images of production lines. For example, analyze images captured from observation windows inside reactors.
Example 2: Binary Classification (Normal/Abnormal)
# Requirements:
# - Python 3.9+
# - pillow>=10.0.0
# - torch>=2.0.0, <2.3.0
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
class ProcessImageDataset(Dataset):
"""Process image dataset"""
def __init__(self, image_paths, labels, transform=None):
"""
Args:
image_paths: List of image file paths
labels: Labels (0: normal, 1: abnormal)
transform: Data augmentation
"""
self.image_paths = image_paths
self.labels = labels
self.transform = transform
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
# Load image
image = Image.open(self.image_paths[idx]).convert('RGB')
label = self.labels[idx]
if self.transform:
image = self.transform(image)
return image, label
# Data augmentation (training)
train_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.RandomHorizontalFlip(), # Horizontal flip
transforms.RandomRotation(10), # ±10 degree rotation
transforms.ColorJitter(brightness=0.2, contrast=0.2), # Brightness and contrast adjustment
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Test time (no augmentation)
test_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Training function
def train_classifier(model, train_loader, val_loader, epochs=20, lr=0.001):
"""Train image classification model"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
best_acc = 0.0
for epoch in range(epochs):
# Training phase
model.train()
running_loss = 0.0
correct = 0
total = 0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
train_acc = 100 * correct / total
# Validation phase
model.eval()
val_correct = 0
val_total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs, 1)
val_total += labels.size(0)
val_correct += (predicted == labels).sum().item()
val_acc = 100 * val_correct / val_total
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_process_classifier.pth')
print(f'Epoch {epoch+1}/{epochs}:')
print(f' Train Loss: {running_loss/len(train_loader):.4f}, Acc: {train_acc:.2f}%')
print(f' Val Acc: {val_acc:.2f}%')
scheduler.step()
print(f'\nBest validation accuracy: {best_acc:.2f}%')
return model
# Usage example (replace with actual data paths)
# train_paths = ['path/to/normal1.jpg', 'path/to/abnormal1.jpg', ...]
# train_labels = [0, 1, ...] # 0: normal, 1: abnormal
#
# train_dataset = ProcessImageDataset(train_paths, train_labels, train_transform)
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
#
# model = SimpleCNN(num_classes=2)
# trained_model = train_classifier(model, train_loader, val_loader, epochs=20)
# Output example:
# Epoch 1/20:
# Train Loss: 0.6234, Acc: 65.34%
# Val Acc: 68.21%
# Epoch 5/20:
# Train Loss: 0.3456, Acc: 84.56%
# Val Acc: 82.34%
# ...
# Best validation accuracy: 94.56%
3.3 Object Detection for Equipment Monitoring
Using the YOLO (You Only Look Once) architecture to detect equipment and anomalous locations in images. We implement a simplified version.
Example 3: Simplified Object Detection (Bounding Box Prediction)
class SimpleYOLO(nn.Module):
"""Simplified YOLO-style object detection model"""
def __init__(self, num_classes=1, num_boxes=2):
"""
Args:
num_classes: Number of detection target classes (anomalous locations, etc.)
num_boxes: Number of boxes per grid cell
"""
super(SimpleYOLO, self).__init__()
self.num_classes = num_classes
self.num_boxes = num_boxes
# Backbone (feature extraction)
self.backbone = nn.Sequential(
# Conv1
nn.Conv2d(3, 64, 7, stride=2, padding=3),
nn.BatchNorm2d(64),
nn.LeakyReLU(0.1),
nn.MaxPool2d(2, 2),
# Conv2
nn.Conv2d(64, 192, 3, padding=1),
nn.BatchNorm2d(192),
nn.LeakyReLU(0.1),
nn.MaxPool2d(2, 2),
# Conv3-5
nn.Conv2d(192, 256, 1),
nn.BatchNorm2d(256),
nn.LeakyReLU(0.1),
nn.Conv2d(256, 512, 3, padding=1),
nn.BatchNorm2d(512),
nn.LeakyReLU(0.1),
nn.MaxPool2d(2, 2)
)
# Detection head
# Predict per grid cell: (x, y, w, h, confidence) * num_boxes + class_probs
output_size = num_boxes * 5 + num_classes
self.detection_head = nn.Sequential(
nn.Conv2d(512, 1024, 3, padding=1),
nn.BatchNorm2d(1024),
nn.LeakyReLU(0.1),
nn.Conv2d(1024, output_size, 1)
)
def forward(self, x):
"""
Args:
x: [batch, 3, 448, 448] Input image
Returns:
output: [batch, grid_h, grid_w, num_boxes*5 + num_classes]
(x, y, w, h, confidence, class_probs)
"""
features = self.backbone(x) # [batch, 512, 14, 14]
output = self.detection_head(features) # [batch, output_size, 14, 14]
# Convert to [batch, grid_h, grid_w, output_size]
output = output.permute(0, 2, 3, 1)
return output
# Model initialization
model = SimpleYOLO(num_classes=1, num_boxes=2)
# Test
dummy_image = torch.randn(4, 3, 448, 448)
output = model(dummy_image)
print(f"Output shape: {output.shape}") # [4, 14, 14, 11]
# 11 = 2 boxes * 5 (x,y,w,h,conf) + 1 class
# Bounding box decoding example
def decode_predictions(output, grid_size=14, img_size=448, conf_threshold=0.5):
"""Convert predictions to bounding boxes
Args:
output: [grid_h, grid_w, num_boxes*5 + num_classes]
Returns:
boxes: List of (x, y, w, h, confidence, class_id)
"""
boxes = []
num_boxes = 2
for i in range(grid_size):
for j in range(grid_size):
for b in range(num_boxes):
# Extract box information
box_idx = b * 5
x = (j + torch.sigmoid(output[i, j, box_idx])) / grid_size
y = (i + torch.sigmoid(output[i, j, box_idx+1])) / grid_size
w = torch.sigmoid(output[i, j, box_idx+2])
h = torch.sigmoid(output[i, j, box_idx+3])
conf = torch.sigmoid(output[i, j, box_idx+4])
if conf > conf_threshold:
# Class probabilities
class_probs = output[i, j, num_boxes*5:]
class_id = torch.argmax(class_probs)
# Convert to image coordinates
x_abs = x * img_size
y_abs = y * img_size
w_abs = w * img_size
h_abs = h * img_size
boxes.append([x_abs.item(), y_abs.item(),
w_abs.item(), h_abs.item(),
conf.item(), class_id.item()])
return boxes
# Prediction example
sample_output = output[0].detach() # First sample
detected_boxes = decode_predictions(sample_output, conf_threshold=0.3)
print(f"\nDetected {len(detected_boxes)} boxes with confidence > 0.3")
# Output example:
# Output shape: torch.Size([4, 14, 14, 11])
# Detected 37 boxes with confidence > 0.3
3.4 Manufacturing Defect Detection
Detect defects such as scratches, stains, and discoloration on product surfaces at the pixel level. Implemented as a segmentation task.
Example 4: Defect Segmentation with U-Net
class UNet(nn.Module):
"""U-Net architecture (for defect detection)"""
def __init__(self, in_channels=3, out_channels=1):
"""
Args:
in_channels: Number of input channels (RGB=3)
out_channels: Number of output channels (binary mask=1)
"""
super(UNet, self).__init__()
# Encoder (downsampling)
self.enc1 = self.conv_block(in_channels, 64)
self.enc2 = self.conv_block(64, 128)
self.enc3 = self.conv_block(128, 256)
self.enc4 = self.conv_block(256, 512)
# Bottleneck
self.bottleneck = self.conv_block(512, 1024)
# Decoder (upsampling)
self.upconv4 = nn.ConvTranspose2d(1024, 512, 2, stride=2)
self.dec4 = self.conv_block(1024, 512) # 512+512 (skip connection)
self.upconv3 = nn.ConvTranspose2d(512, 256, 2, stride=2)
self.dec3 = self.conv_block(512, 256)
self.upconv2 = nn.ConvTranspose2d(256, 128, 2, stride=2)
self.dec2 = self.conv_block(256, 128)
self.upconv1 = nn.ConvTranspose2d(128, 64, 2, stride=2)
self.dec1 = self.conv_block(128, 64)
# Output layer
self.out_conv = nn.Conv2d(64, out_channels, 1)
self.pool = nn.MaxPool2d(2, 2)
def conv_block(self, in_channels, out_channels):
"""Block of two convolutional layers"""
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, 3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.Conv2d(out_channels, out_channels, 3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True)
)
def forward(self, x):
"""
Args:
x: [batch, 3, H, W] Input image
Returns:
mask: [batch, 1, H, W] Defect mask (0-1)
"""
# Encoder
enc1 = self.enc1(x)
enc2 = self.enc2(self.pool(enc1))
enc3 = self.enc3(self.pool(enc2))
enc4 = self.enc4(self.pool(enc3))
# Bottleneck
bottleneck = self.bottleneck(self.pool(enc4))
# Decoder (with skip connections)
dec4 = self.upconv4(bottleneck)
dec4 = torch.cat([dec4, enc4], dim=1) # Skip connection
dec4 = self.dec4(dec4)
dec3 = self.upconv3(dec4)
dec3 = torch.cat([dec3, enc3], dim=1)
dec3 = self.dec3(dec3)
dec2 = self.upconv2(dec3)
dec2 = torch.cat([dec2, enc2], dim=1)
dec2 = self.dec2(dec2)
dec1 = self.upconv1(dec2)
dec1 = torch.cat([dec1, enc1], dim=1)
dec1 = self.dec1(dec1)
# Output (normalize to 0-1 with sigmoid)
out = torch.sigmoid(self.out_conv(dec1))
return out
# Dice Loss (for segmentation)
class DiceLoss(nn.Module):
"""Loss based on Dice coefficient"""
def forward(self, pred, target, smooth=1.0):
"""
Args:
pred: [batch, 1, H, W] Predicted mask
target: [batch, 1, H, W] True mask
"""
pred = pred.view(-1)
target = target.view(-1)
intersection = (pred * target).sum()
dice = (2.0 * intersection + smooth) / (pred.sum() + target.sum() + smooth)
return 1 - dice
# Training example
model = UNet(in_channels=3, out_channels=1)
criterion = DiceLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Dummy data (in practice, use product images and defect masks)
dummy_images = torch.randn(4, 3, 256, 256)
dummy_masks = torch.randint(0, 2, (4, 1, 256, 256)).float()
# Training step
model.train()
optimizer.zero_grad()
pred_masks = model(dummy_images)
loss = criterion(pred_masks, dummy_masks)
loss.backward()
optimizer.step()
print(f"Predicted mask shape: {pred_masks.shape}")
print(f"Dice Loss: {loss.item():.4f}")
# Prediction statistics
print(f"Prediction range: [{pred_masks.min():.4f}, {pred_masks.max():.4f}]")
print(f"Defect pixels (>0.5): {(pred_masks > 0.5).sum().item()} / {pred_masks.numel()}")
# Output example:
# Predicted mask shape: torch.Size([4, 1, 256, 256])
# Dice Loss: 0.5234
# Prediction range: [0.0123, 0.9876]
# Defect pixels (>0.5): 31245 / 262144
3.5 Temperature Distribution Estimation via Thermal Image Analysis
Estimate equipment temperature distribution from thermal images captured by infrared cameras and detect hot spots.
Example 5: Temperature Prediction from Thermal Images
class ThermalCNN(nn.Module):
"""CNN for thermal image analysis"""
def __init__(self):
super(ThermalCNN, self).__init__()
# Thermal images are usually 1 channel (grayscale)
self.features = nn.Sequential(
nn.Conv2d(1, 32, 5, padding=2),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(32, 64, 5, padding=2),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(64, 128, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2, 2),
)
# Temperature estimation head (regression task)
self.regressor = nn.Sequential(
nn.Linear(128 * 28 * 28, 512),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(512, 3) # Max temperature, average temperature, min temperature
)
def forward(self, x):
"""
Args:
x: [batch, 1, 224, 224] Thermal image
Returns:
temps: [batch, 3] (max_temp, avg_temp, min_temp)
"""
features = self.features(x)
features = features.view(features.size(0), -1)
temps = self.regressor(features)
return temps
# Generate synthetic thermal image data
def generate_thermal_image(base_temp=300, hotspot_temp=450, size=224):
"""Thermal image simulation
Args:
base_temp: Base temperature [K]
hotspot_temp: Hot spot temperature [K]
size: Image size
Returns:
image: [1, size, size] Temperature map
stats: (max, avg, min) Temperature statistics
"""
# Base temperature
image = np.ones((size, size)) * base_temp
# Add Gaussian distributed hot spots
n_hotspots = np.random.randint(1, 4)
for _ in range(n_hotspots):
cx, cy = np.random.randint(20, size-20, 2)
sigma = np.random.uniform(10, 30)
y, x = np.ogrid[:size, :size]
gaussian = np.exp(-((x - cx)**2 + (y - cy)**2) / (2 * sigma**2))
image += gaussian * (hotspot_temp - base_temp)
# Noise
image += np.random.randn(size, size) * 5
# Statistics
stats = np.array([image.max(), image.mean(), image.min()])
# Normalize (to 0-1)
image_normalized = (image - 273) / 227 # 273-500K → 0-1
return torch.FloatTensor(image_normalized).unsqueeze(0), stats
# Generate data
n_samples = 100
thermal_images = []
temperature_stats = []
for _ in range(n_samples):
img, stats = generate_thermal_image()
thermal_images.append(img)
temperature_stats.append(stats)
thermal_images = torch.stack(thermal_images)
temperature_stats = torch.FloatTensor(np.array(temperature_stats))
# Model training
model = ThermalCNN()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(30):
model.train()
optimizer.zero_grad()
pred_temps = model(thermal_images)
loss = criterion(pred_temps, temperature_stats)
loss.backward()
optimizer.step()
if (epoch + 1) % 5 == 0:
mae = torch.mean(torch.abs(pred_temps - temperature_stats))
print(f'Epoch {epoch+1}, Loss: {loss.item():.4f}, MAE: {mae.item():.2f}K')
# Test
model.eval()
with torch.no_grad():
test_img, true_stats = generate_thermal_image(base_temp=320, hotspot_temp=480)
pred_stats = model(test_img.unsqueeze(0))
print(f"\nTest prediction:")
print(f" True: Max={true_stats[0]:.1f}K, Avg={true_stats[1]:.1f}K, Min={true_stats[2]:.1f}K")
print(f" Predicted: Max={pred_stats[0,0]:.1f}K, Avg={pred_stats[0,1]:.1f}K, Min={pred_stats[0,2]:.1f}K")
# Output example:
# Epoch 5, Loss: 1234.5678, MAE: 12.34K
# Epoch 10, Loss: 567.8901, MAE: 8.76K
# Epoch 15, Loss: 234.5678, MAE: 5.43K
# Epoch 20, Loss: 123.4567, MAE: 3.21K
# Epoch 25, Loss: 89.0123, MAE: 2.15K
# Epoch 30, Loss: 67.8901, MAE: 1.87K
#
# Test prediction:
# True: Max=489.3K, Avg=345.7K, Min=318.2K
# Predicted: Max=485.6K, Avg=347.2K, Min=320.1K
3.6 Transfer Learning (ResNet, EfficientNet)
By utilizing models pretrained on ImageNet, high accuracy can be achieved even with a small amount of process data.
Example 6: Transfer Learning with ResNet50
# Requirements:
# - Python 3.9+
# - torchvision>=0.15.0
import torchvision.models as models
class ProcessClassifierWithTransferLearning(nn.Module):
"""Process classifier using transfer learning"""
def __init__(self, num_classes=3, pretrained=True, freeze_backbone=True):
"""
Args:
num_classes: Number of classification classes
pretrained: Use ImageNet pretrained weights
freeze_backbone: Freeze backbone (do not fine-tune)
"""
super(ProcessClassifierWithTransferLearning, self).__init__()
# Load ResNet50
self.backbone = models.resnet50(pretrained=pretrained)
# Freeze backbone
if freeze_backbone:
for param in self.backbone.parameters():
param.requires_grad = False
# Replace final layer (ImageNet 1000 classes → process num_classes)
num_features = self.backbone.fc.in_features
self.backbone.fc = nn.Linear(num_features, num_classes)
def forward(self, x):
return self.backbone(x)
# Model initialization
model = ProcessClassifierWithTransferLearning(num_classes=3, pretrained=True, freeze_backbone=True)
# Check trainable parameters
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in model.parameters())
print(f"Trainable parameters: {trainable_params:,} / {total_params:,} ({100*trainable_params/total_params:.2f}%)")
# Check operation with dummy data
dummy_batch = torch.randn(8, 3, 224, 224)
output = model(dummy_batch)
print(f"Output shape: {output.shape}")
# Transfer learning with EfficientNet-B0
class EfficientNetClassifier(nn.Module):
"""Transfer learning with EfficientNet"""
def __init__(self, num_classes=3):
super(EfficientNetClassifier, self).__init__()
# EfficientNet-B0 (lightweight version)
self.backbone = models.efficientnet_b0(pretrained=True)
# Replace final layer
num_features = self.backbone.classifier[1].in_features
self.backbone.classifier[1] = nn.Linear(num_features, num_classes)
def forward(self, x):
return self.backbone(x)
efficient_model = EfficientNetClassifier(num_classes=3)
efficient_output = efficient_model(dummy_batch)
# Parameter count comparison
resnet_params = sum(p.numel() for p in model.parameters())
efficient_params = sum(p.numel() for p in efficient_model.parameters())
print(f"\nModel comparison:")
print(f" ResNet50: {resnet_params:,} parameters")
print(f" EfficientNet-B0: {efficient_params:,} parameters")
print(f" Reduction: {100*(1 - efficient_params/resnet_params):.1f}%")
# Output example:
# Trainable parameters: 6,147 / 23,520,835 (0.03%)
# Output shape: torch.Size([8, 3])
#
# Model comparison:
# ResNet50: 23,520,835 parameters
# EfficientNet-B0: 4,011,391 parameters
# Reduction: 82.9%
3.7 Visual Explanation with Grad-CAM
Using Gradient-weighted Class Activation Mapping (Grad-CAM) to visualize where the model looks to make decisions.
Example 7: Grad-CAM Implementation
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - opencv-python>=4.8.0
class GradCAM:
"""Grad-CAM: Visual explanation generation"""
def __init__(self, model, target_layer):
"""
Args:
model: CNN model
target_layer: Layer to visualize (usually the final convolutional layer)
"""
self.model = model
self.target_layer = target_layer
self.gradients = None
self.activations = None
# Register hooks
target_layer.register_forward_hook(self.save_activation)
target_layer.register_backward_hook(self.save_gradient)
def save_activation(self, module, input, output):
"""Save activations during forward pass"""
self.activations = output.detach()
def save_gradient(self, module, grad_input, grad_output):
"""Save gradients during backward pass"""
self.gradients = grad_output[0].detach()
def generate_cam(self, input_image, target_class=None):
"""
Generate CAM
Args:
input_image: [1, 3, H, W] Input image
target_class: Target class (if None, use max probability class)
Returns:
cam: [H, W] Class Activation Map
"""
# Forward pass
self.model.eval()
output = self.model(input_image)
if target_class is None:
target_class = output.argmax(dim=1).item()
# Backward pass with target class score
self.model.zero_grad()
class_score = output[0, target_class]
class_score.backward()
# Calculate weights from gradients and activations
pooled_gradients = torch.mean(self.gradients, dim=[0, 2, 3]) # [channels]
# Weighted sum
cam = torch.zeros(self.activations.shape[2:], dtype=torch.float32)
for i, w in enumerate(pooled_gradients):
cam += w * self.activations[0, i, :, :]
# ReLU + normalization
cam = F.relu(cam)
cam = cam - cam.min()
cam = cam / cam.max() if cam.max() > 0 else cam
return cam.numpy(), target_class
# Usage example
model = SimpleCNN(num_classes=3)
model.eval()
# Get final convolutional layer
target_layer = model.conv3
# Initialize Grad-CAM
grad_cam = GradCAM(model, target_layer)
# Test image
test_image = torch.randn(1, 3, 224, 224)
# Generate CAM
cam, predicted_class = grad_cam.generate_cam(test_image)
print(f"Predicted class: {predicted_class}")
print(f"CAM shape: {cam.shape}")
print(f"CAM range: [{cam.min():.4f}, {cam.max():.4f}]")
# Visualization function
import matplotlib.pyplot as plt
import cv2
def visualize_gradcam(original_image, cam, alpha=0.5):
"""Visualize Grad-CAM overlaid on original image
Args:
original_image: [3, H, W] Tensor
cam: [h, w] numpy array
alpha: Transparency
"""
# Convert Tensor to numpy array
img = original_image.permute(1, 2, 0).numpy()
img = (img - img.min()) / (img.max() - img.min()) # Normalize to 0-1
# Resize CAM to image size
cam_resized = cv2.resize(cam, (img.shape[1], img.shape[0]))
# Create heatmap
heatmap = plt.cm.jet(cam_resized)[:, :, :3] # RGB only
# Overlay
overlayed = heatmap * alpha + img * (1 - alpha)
# Plot
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
axes[0].imshow(img)
axes[0].set_title('Original Image')
axes[0].axis('off')
axes[1].imshow(cam_resized, cmap='jet')
axes[1].set_title('Grad-CAM')
axes[1].axis('off')
axes[2].imshow(overlayed)
axes[2].set_title('Overlay')
axes[2].axis('off')
plt.tight_layout()
# plt.savefig('gradcam_visualization.png', dpi=150)
print("Grad-CAM visualization created")
# Visualization
visualize_gradcam(test_image[0], cam)
# Output example:
# Predicted class: 1
# CAM shape: (28, 28)
# CAM range: [0.0000, 1.0000]
# Grad-CAM visualization created
✅ Applications of Grad-CAM
- Misclassification diagnosis: Check if the model is looking at incorrect locations
- Domain knowledge validation: Verify if chemically plausible regions are being observed
- Model improvement: Apply data augmentation so important regions are correctly recognized
3.8 Real-Time Image Processing Pipeline
Assuming practical application in production lines, construct a pipeline that processes continuous images from cameras at high speed.
Example 8: Inference Pipeline and Optimization
import time
import threading
from queue import Queue
from collections import deque
class RealTimeInferencePipeline:
"""Real-time image processing pipeline"""
def __init__(self, model, device='cuda', batch_size=4, buffer_size=100):
"""
Args:
model: Inference model
device: 'cuda' or 'cpu'
batch_size: Batch processing size
buffer_size: Frame buffer size
"""
self.model = model.to(device).eval()
self.device = device
self.batch_size = batch_size
self.frame_queue = Queue(maxsize=buffer_size)
self.result_queue = Queue()
self.fps_buffer = deque(maxlen=30)
self.running = False
# Model optimization (TorchScript)
self.optimize_model()
def optimize_model(self):
"""Convert model to TorchScript for speedup"""
dummy_input = torch.randn(1, 3, 224, 224).to(self.device)
try:
self.model = torch.jit.trace(self.model, dummy_input)
print("Model optimized with TorchScript")
except Exception as e:
print(f"TorchScript optimization failed: {e}")
def preprocess_frame(self, frame):
"""Frame preprocessing
Args:
frame: numpy array [H, W, 3] (BGR)
Returns:
tensor: [1, 3, 224, 224]
"""
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Resize
frame_resized = cv2.resize(frame_rgb, (224, 224))
# Tensor conversion and normalization
frame_tensor = torch.FloatTensor(frame_resized).permute(2, 0, 1) / 255.0
frame_tensor = transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)(frame_tensor)
return frame_tensor.unsqueeze(0)
def inference_worker(self):
"""Inference worker thread (batch processing)"""
batch_frames = []
batch_ids = []
while self.running:
try:
# Collect frames
while len(batch_frames) < self.batch_size and not self.frame_queue.empty():
frame_id, frame = self.frame_queue.get(timeout=0.01)
batch_frames.append(frame)
batch_ids.append(frame_id)
if len(batch_frames) > 0:
# Batch inference
start_time = time.time()
batch_tensor = torch.cat(batch_frames, dim=0).to(self.device)
with torch.no_grad():
outputs = self.model(batch_tensor)
predictions = torch.argmax(outputs, dim=1).cpu().numpy()
confidences = torch.softmax(outputs, dim=1).cpu().numpy()
inference_time = time.time() - start_time
# Store results
for i, frame_id in enumerate(batch_ids):
self.result_queue.put({
'frame_id': frame_id,
'prediction': predictions[i],
'confidence': confidences[i],
'inference_time': inference_time / len(batch_frames)
})
# Calculate FPS
fps = len(batch_frames) / inference_time
self.fps_buffer.append(fps)
# Clear
batch_frames.clear()
batch_ids.clear()
else:
time.sleep(0.001) # Release CPU
except Exception as e:
print(f"Inference error: {e}")
def start(self):
"""Start pipeline"""
self.running = True
self.inference_thread = threading.Thread(target=self.inference_worker)
self.inference_thread.start()
print("Inference pipeline started")
def stop(self):
"""Stop pipeline"""
self.running = False
self.inference_thread.join()
print("Inference pipeline stopped")
def process_frame(self, frame_id, frame):
"""Add frame to processing queue
Args:
frame_id: Frame ID
frame: numpy array [H, W, 3]
"""
preprocessed = self.preprocess_frame(frame)
if not self.frame_queue.full():
self.frame_queue.put((frame_id, preprocessed))
else:
print(f"Warning: Frame buffer full, dropping frame {frame_id}")
def get_result(self):
"""Get processing result"""
if not self.result_queue.empty():
return self.result_queue.get()
return None
def get_fps(self):
"""Get current FPS"""
if len(self.fps_buffer) > 0:
return sum(self.fps_buffer) / len(self.fps_buffer)
return 0.0
# Usage example
model = SimpleCNN(num_classes=3)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
pipeline = RealTimeInferencePipeline(model, device=device, batch_size=8)
pipeline.start()
# Simulation: Process continuous frames
n_frames = 100
results = []
start_time = time.time()
for frame_id in range(n_frames):
# Generate dummy frame (in practice, images from camera)
dummy_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
# Process
pipeline.process_frame(frame_id, dummy_frame)
# Get result
result = pipeline.get_result()
if result:
results.append(result)
time.sleep(0.01) # Simulate 100 FPS
# Get remaining results
time.sleep(1)
while True:
result = pipeline.get_result()
if result is None:
break
results.append(result)
pipeline.stop()
# Statistics
total_time = time.time() - start_time
avg_fps = pipeline.get_fps()
print(f"\nProcessing statistics:")
print(f" Total frames: {n_frames}")
print(f" Processed frames: {len(results)}")
print(f" Total time: {total_time:.2f}s")
print(f" Average FPS: {avg_fps:.2f}")
print(f" Average inference time: {np.mean([r['inference_time'] for r in results])*1000:.2f}ms")
# Output example:
# Model optimized with TorchScript
# Inference pipeline started
# Inference pipeline stopped
#
# Processing statistics:
# Total frames: 100
# Processed frames: 100
# Total time: 1.23s
# Average FPS: 87.34
# Average inference time: 11.47ms
💡 Real-Time Processing Optimization
- TorchScript conversion: Improves inference speed by 20-40%
- Quantization: INT8 quantization reduces model size and inference time to 1/4
- ONNX conversion: Further speedup with engines like TensorRT
- GPU parallelization: Adjust batch size to maximize GPU utilization
Learning Objectives Check
Upon completing this chapter, you will be able to implement and explain the following:
Basic Understanding
- Explain the roles of convolutional and pooling layers
- Understand how CNNs preserve spatial structure of images
- Explain the encoder-decoder structure of U-Net and the significance of skip connections
- Understand why transfer learning improves data efficiency
Practical Skills
- Implement CNNs in PyTorch and classify process images
- Detect product defects at pixel level using U-Net
- Perform equipment object detection with YOLO architecture
- Estimate temperature distribution from thermal images
- Implement transfer learning with ResNet/EfficientNet
- Visualize model decision reasoning with Grad-CAM
- Build real-time inference pipelines
Application Ability
- Select appropriate CNN architectures according to process characteristics
- Derive accuracy improvement strategies from Grad-CAM analysis for anomaly detection
- Optimize models according to production line throughput requirements
- Leverage transfer learning with limited data to achieve practical accuracy levels
References
- LeCun, Y., et al. (1998). "Gradient-Based Learning Applied to Document Recognition." Proceedings of the IEEE, 86(11), 2278-2324.
- Ronneberger, O., et al. (2015). "U-Net: Convolutional Networks for Biomedical Image Segmentation." MICCAI 2015.
- Redmon, J., et al. (2016). "You Only Look Once: Unified, Real-Time Object Detection." CVPR 2016.
- He, K., et al. (2016). "Deep Residual Learning for Image Recognition." CVPR 2016.
- Tan, M., & Le, Q. V. (2019). "EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks." ICML 2019.
- Selvaraju, R. R., et al. (2017). "Grad-CAM: Visual Explanations from Deep Networks via Gradient-based Localization." ICCV 2017.
Disclaimer
- This content is provided solely for educational, research, and informational purposes and does not constitute professional advice (legal, accounting, technical warranty, etc.).
- This content and accompanying code examples are provided "AS IS" without any warranty, express or implied, including but not limited to merchantability, fitness for a particular purpose, non-infringement, accuracy, completeness, operation, or safety.
- The author and Tohoku University assume no responsibility for the content, availability, or safety of external links, third-party data, tools, libraries, etc.
- To the maximum extent permitted by applicable law, the author and Tohoku University shall not be liable for any direct, indirect, incidental, special, consequential, or punitive damages arising from the use, execution, or interpretation of this content.
- The content may be changed, updated, or discontinued without notice.
- The copyright and license of this content are subject to the stated conditions (e.g., CC BY 4.0). Such licenses typically include no-warranty clauses.