🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 3: CNN-Based Image Process Analysis

Visual Quality Control and Equipment Monitoring with Convolutional Neural Networks

📖 Reading time: 35-40 minutes 💡 Difficulty: Intermediate to Advanced 🔬 Examples: Defect Detection, Thermal Image Analysis

This chapter covers CNN. You will learn roles of convolutional, how CNNs preserve spatial structure of images, and encoder-decoder structure of U-Net.

3.1 CNN Fundamentals (Convolutional and Pooling Layers)

Convolutional Neural Networks (CNNs) are networks that can extract features while preserving the spatial structure of images. In process industries, they are utilized for quality inspection using camera images, thermal image analysis, equipment degradation diagnosis, and more.

💡 Basic Components of CNN

  • Convolutional Layer: Extracts local features using filters
  • Pooling Layer: Achieves position invariance through downsampling
  • Fully Connected Layer: Uses extracted features for classification and regression

Convolution operation formula:

$$(I * K)(i, j) = \sum_m \sum_n I(i+m, j+n) \cdot K(m, n)$$

Example 1: Basic CNN Implementation (Process Image Classification)

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - torch>=2.0.0, <2.3.0
# - torchvision>=0.15.0

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torchvision import transforms

class SimpleCNN(nn.Module):
    """Basic CNN for process image classification"""

    def __init__(self, num_classes=3):
        """
        Args:
            num_classes: Number of classes (normal/minor anomaly/severe anomaly, etc.)
        """
        super(SimpleCNN, self).__init__()

        # Convolutional layer 1: 3 channels (RGB) → 32 channels
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)

        # Convolutional layer 2: 32 → 64
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)

        # Convolutional layer 3: 64 → 128
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)

        # Pooling layer (2x2, stride 2)
        self.pool = nn.MaxPool2d(2, 2)

        # Fully connected layers
        # For 224x224 input images, after 3 pooling operations: 28x28
        self.fc1 = nn.Linear(128 * 28 * 28, 512)
        self.fc2 = nn.Linear(512, num_classes)

        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        """
        Args:
            x: [batch, 3, 224, 224] RGB image
        Returns:
            output: [batch, num_classes] Class probabilities
        """
        # Convolutional layer 1 + ReLU + Pooling
        x = self.pool(F.relu(self.bn1(self.conv1(x))))  # [batch, 32, 112, 112]

        # Convolutional layer 2 + ReLU + Pooling
        x = self.pool(F.relu(self.bn2(self.conv2(x))))  # [batch, 64, 56, 56]

        # Convolutional layer 3 + ReLU + Pooling
        x = self.pool(F.relu(self.bn3(self.conv3(x))))  # [batch, 128, 28, 28]

        # Flatten
        x = x.view(x.size(0), -1)  # [batch, 128*28*28]

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x

# Test with synthetic data (in practice, use camera images from production line)
batch_size = 8
model = SimpleCNN(num_classes=3)

# Dummy images (224x224 RGB images)
dummy_images = torch.randn(batch_size, 3, 224, 224)

# Forward pass
output = model(dummy_images)
print(f"Output shape: {output.shape}")  # [8, 3]
print(f"Class probabilities (sample 1): {F.softmax(output[0], dim=0)}")

# Check parameter count
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")

# Output example:
# Output shape: torch.Size([8, 3])
# Class probabilities (sample 1): tensor([0.3456, 0.4123, 0.2421], grad_fn=<softmaxbackward>)
# Total parameters: 40,363,971
</softmaxbackward>

3.2 Image Classification of Process States

Classify the state of the process (normal/abnormal) from camera images of production lines. For example, analyze images captured from observation windows inside reactors.

Example 2: Binary Classification (Normal/Abnormal)

# Requirements:
# - Python 3.9+
# - pillow>=10.0.0
# - torch>=2.0.0, <2.3.0

import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image

class ProcessImageDataset(Dataset):
    """Process image dataset"""

    def __init__(self, image_paths, labels, transform=None):
        """
        Args:
            image_paths: List of image file paths
            labels: Labels (0: normal, 1: abnormal)
            transform: Data augmentation
        """
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Load image
        image = Image.open(self.image_paths[idx]).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

# Data augmentation (training)
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),  # Horizontal flip
    transforms.RandomRotation(10),  # ±10 degree rotation
    transforms.ColorJitter(brightness=0.2, contrast=0.2),  # Brightness and contrast adjustment
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Test time (no augmentation)
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Training function
def train_classifier(model, train_loader, val_loader, epochs=20, lr=0.001):
    """Train image classification model"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    best_acc = 0.0

    for epoch in range(epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_acc = 100 * correct / total

        # Validation phase
        model.eval()
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_acc = 100 * val_correct / val_total

        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), 'best_process_classifier.pth')

        print(f'Epoch {epoch+1}/{epochs}:')
        print(f'  Train Loss: {running_loss/len(train_loader):.4f}, Acc: {train_acc:.2f}%')
        print(f'  Val Acc: {val_acc:.2f}%')

        scheduler.step()

    print(f'\nBest validation accuracy: {best_acc:.2f}%')
    return model

# Usage example (replace with actual data paths)
# train_paths = ['path/to/normal1.jpg', 'path/to/abnormal1.jpg', ...]
# train_labels = [0, 1, ...]  # 0: normal, 1: abnormal
#
# train_dataset = ProcessImageDataset(train_paths, train_labels, train_transform)
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
#
# model = SimpleCNN(num_classes=2)
# trained_model = train_classifier(model, train_loader, val_loader, epochs=20)

# Output example:
# Epoch 1/20:
#   Train Loss: 0.6234, Acc: 65.34%
#   Val Acc: 68.21%
# Epoch 5/20:
#   Train Loss: 0.3456, Acc: 84.56%
#   Val Acc: 82.34%
# ...
# Best validation accuracy: 94.56%

3.3 Object Detection for Equipment Monitoring

Using the YOLO (You Only Look Once) architecture to detect equipment and anomalous locations in images. We implement a simplified version.

Example 3: Simplified Object Detection (Bounding Box Prediction)

class SimpleYOLO(nn.Module):
    """Simplified YOLO-style object detection model"""

    def __init__(self, num_classes=1, num_boxes=2):
        """
        Args:
            num_classes: Number of detection target classes (anomalous locations, etc.)
            num_boxes: Number of boxes per grid cell
        """
        super(SimpleYOLO, self).__init__()

        self.num_classes = num_classes
        self.num_boxes = num_boxes

        # Backbone (feature extraction)
        self.backbone = nn.Sequential(
            # Conv1
            nn.Conv2d(3, 64, 7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(2, 2),

            # Conv2
            nn.Conv2d(64, 192, 3, padding=1),
            nn.BatchNorm2d(192),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(2, 2),

            # Conv3-5
            nn.Conv2d(192, 256, 1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.1),
            nn.Conv2d(256, 512, 3, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(2, 2)
        )

        # Detection head
        # Predict per grid cell: (x, y, w, h, confidence) * num_boxes + class_probs
        output_size = num_boxes * 5 + num_classes
        self.detection_head = nn.Sequential(
            nn.Conv2d(512, 1024, 3, padding=1),
            nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.1),
            nn.Conv2d(1024, output_size, 1)
        )

    def forward(self, x):
        """
        Args:
            x: [batch, 3, 448, 448] Input image
        Returns:
            output: [batch, grid_h, grid_w, num_boxes*5 + num_classes]
                    (x, y, w, h, confidence, class_probs)
        """
        features = self.backbone(x)  # [batch, 512, 14, 14]
        output = self.detection_head(features)  # [batch, output_size, 14, 14]

        # Convert to [batch, grid_h, grid_w, output_size]
        output = output.permute(0, 2, 3, 1)

        return output

# Model initialization
model = SimpleYOLO(num_classes=1, num_boxes=2)

# Test
dummy_image = torch.randn(4, 3, 448, 448)
output = model(dummy_image)

print(f"Output shape: {output.shape}")  # [4, 14, 14, 11]
# 11 = 2 boxes * 5 (x,y,w,h,conf) + 1 class

# Bounding box decoding example
def decode_predictions(output, grid_size=14, img_size=448, conf_threshold=0.5):
    """Convert predictions to bounding boxes

    Args:
        output: [grid_h, grid_w, num_boxes*5 + num_classes]

    Returns:
        boxes: List of (x, y, w, h, confidence, class_id)
    """
    boxes = []
    num_boxes = 2

    for i in range(grid_size):
        for j in range(grid_size):
            for b in range(num_boxes):
                # Extract box information
                box_idx = b * 5
                x = (j + torch.sigmoid(output[i, j, box_idx])) / grid_size
                y = (i + torch.sigmoid(output[i, j, box_idx+1])) / grid_size
                w = torch.sigmoid(output[i, j, box_idx+2])
                h = torch.sigmoid(output[i, j, box_idx+3])
                conf = torch.sigmoid(output[i, j, box_idx+4])

                if conf > conf_threshold:
                    # Class probabilities
                    class_probs = output[i, j, num_boxes*5:]
                    class_id = torch.argmax(class_probs)

                    # Convert to image coordinates
                    x_abs = x * img_size
                    y_abs = y * img_size
                    w_abs = w * img_size
                    h_abs = h * img_size

                    boxes.append([x_abs.item(), y_abs.item(),
                                 w_abs.item(), h_abs.item(),
                                 conf.item(), class_id.item()])

    return boxes

# Prediction example
sample_output = output[0].detach()  # First sample
detected_boxes = decode_predictions(sample_output, conf_threshold=0.3)
print(f"\nDetected {len(detected_boxes)} boxes with confidence > 0.3")

# Output example:
# Output shape: torch.Size([4, 14, 14, 11])
# Detected 37 boxes with confidence > 0.3

3.4 Manufacturing Defect Detection

Detect defects such as scratches, stains, and discoloration on product surfaces at the pixel level. Implemented as a segmentation task.

Example 4: Defect Segmentation with U-Net

class UNet(nn.Module):
    """U-Net architecture (for defect detection)"""

    def __init__(self, in_channels=3, out_channels=1):
        """
        Args:
            in_channels: Number of input channels (RGB=3)
            out_channels: Number of output channels (binary mask=1)
        """
        super(UNet, self).__init__()

        # Encoder (downsampling)
        self.enc1 = self.conv_block(in_channels, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)

        # Bottleneck
        self.bottleneck = self.conv_block(512, 1024)

        # Decoder (upsampling)
        self.upconv4 = nn.ConvTranspose2d(1024, 512, 2, stride=2)
        self.dec4 = self.conv_block(1024, 512)  # 512+512 (skip connection)

        self.upconv3 = nn.ConvTranspose2d(512, 256, 2, stride=2)
        self.dec3 = self.conv_block(512, 256)

        self.upconv2 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.dec2 = self.conv_block(256, 128)

        self.upconv1 = nn.ConvTranspose2d(128, 64, 2, stride=2)
        self.dec1 = self.conv_block(128, 64)

        # Output layer
        self.out_conv = nn.Conv2d(64, out_channels, 1)

        self.pool = nn.MaxPool2d(2, 2)

    def conv_block(self, in_channels, out_channels):
        """Block of two convolutional layers"""
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        """
        Args:
            x: [batch, 3, H, W] Input image
        Returns:
            mask: [batch, 1, H, W] Defect mask (0-1)
        """
        # Encoder
        enc1 = self.enc1(x)
        enc2 = self.enc2(self.pool(enc1))
        enc3 = self.enc3(self.pool(enc2))
        enc4 = self.enc4(self.pool(enc3))

        # Bottleneck
        bottleneck = self.bottleneck(self.pool(enc4))

        # Decoder (with skip connections)
        dec4 = self.upconv4(bottleneck)
        dec4 = torch.cat([dec4, enc4], dim=1)  # Skip connection
        dec4 = self.dec4(dec4)

        dec3 = self.upconv3(dec4)
        dec3 = torch.cat([dec3, enc3], dim=1)
        dec3 = self.dec3(dec3)

        dec2 = self.upconv2(dec3)
        dec2 = torch.cat([dec2, enc2], dim=1)
        dec2 = self.dec2(dec2)

        dec1 = self.upconv1(dec2)
        dec1 = torch.cat([dec1, enc1], dim=1)
        dec1 = self.dec1(dec1)

        # Output (normalize to 0-1 with sigmoid)
        out = torch.sigmoid(self.out_conv(dec1))

        return out

# Dice Loss (for segmentation)
class DiceLoss(nn.Module):
    """Loss based on Dice coefficient"""

    def forward(self, pred, target, smooth=1.0):
        """
        Args:
            pred: [batch, 1, H, W] Predicted mask
            target: [batch, 1, H, W] True mask
        """
        pred = pred.view(-1)
        target = target.view(-1)

        intersection = (pred * target).sum()
        dice = (2.0 * intersection + smooth) / (pred.sum() + target.sum() + smooth)

        return 1 - dice

# Training example
model = UNet(in_channels=3, out_channels=1)
criterion = DiceLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Dummy data (in practice, use product images and defect masks)
dummy_images = torch.randn(4, 3, 256, 256)
dummy_masks = torch.randint(0, 2, (4, 1, 256, 256)).float()

# Training step
model.train()
optimizer.zero_grad()

pred_masks = model(dummy_images)
loss = criterion(pred_masks, dummy_masks)

loss.backward()
optimizer.step()

print(f"Predicted mask shape: {pred_masks.shape}")
print(f"Dice Loss: {loss.item():.4f}")

# Prediction statistics
print(f"Prediction range: [{pred_masks.min():.4f}, {pred_masks.max():.4f}]")
print(f"Defect pixels (>0.5): {(pred_masks > 0.5).sum().item()} / {pred_masks.numel()}")

# Output example:
# Predicted mask shape: torch.Size([4, 1, 256, 256])
# Dice Loss: 0.5234
# Prediction range: [0.0123, 0.9876]
# Defect pixels (>0.5): 31245 / 262144

3.5 Temperature Distribution Estimation via Thermal Image Analysis

Estimate equipment temperature distribution from thermal images captured by infrared cameras and detect hot spots.

Example 5: Temperature Prediction from Thermal Images

class ThermalCNN(nn.Module):
    """CNN for thermal image analysis"""

    def __init__(self):
        super(ThermalCNN, self).__init__()

        # Thermal images are usually 1 channel (grayscale)
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, 5, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(32, 64, 5, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        # Temperature estimation head (regression task)
        self.regressor = nn.Sequential(
            nn.Linear(128 * 28 * 28, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 3)  # Max temperature, average temperature, min temperature
        )

    def forward(self, x):
        """
        Args:
            x: [batch, 1, 224, 224] Thermal image
        Returns:
            temps: [batch, 3] (max_temp, avg_temp, min_temp)
        """
        features = self.features(x)
        features = features.view(features.size(0), -1)
        temps = self.regressor(features)
        return temps

# Generate synthetic thermal image data
def generate_thermal_image(base_temp=300, hotspot_temp=450, size=224):
    """Thermal image simulation

    Args:
        base_temp: Base temperature [K]
        hotspot_temp: Hot spot temperature [K]
        size: Image size

    Returns:
        image: [1, size, size] Temperature map
        stats: (max, avg, min) Temperature statistics
    """
    # Base temperature
    image = np.ones((size, size)) * base_temp

    # Add Gaussian distributed hot spots
    n_hotspots = np.random.randint(1, 4)
    for _ in range(n_hotspots):
        cx, cy = np.random.randint(20, size-20, 2)
        sigma = np.random.uniform(10, 30)

        y, x = np.ogrid[:size, :size]
        gaussian = np.exp(-((x - cx)**2 + (y - cy)**2) / (2 * sigma**2))
        image += gaussian * (hotspot_temp - base_temp)

    # Noise
    image += np.random.randn(size, size) * 5

    # Statistics
    stats = np.array([image.max(), image.mean(), image.min()])

    # Normalize (to 0-1)
    image_normalized = (image - 273) / 227  # 273-500K → 0-1

    return torch.FloatTensor(image_normalized).unsqueeze(0), stats

# Generate data
n_samples = 100
thermal_images = []
temperature_stats = []

for _ in range(n_samples):
    img, stats = generate_thermal_image()
    thermal_images.append(img)
    temperature_stats.append(stats)

thermal_images = torch.stack(thermal_images)
temperature_stats = torch.FloatTensor(np.array(temperature_stats))

# Model training
model = ThermalCNN()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(30):
    model.train()
    optimizer.zero_grad()

    pred_temps = model(thermal_images)
    loss = criterion(pred_temps, temperature_stats)

    loss.backward()
    optimizer.step()

    if (epoch + 1) % 5 == 0:
        mae = torch.mean(torch.abs(pred_temps - temperature_stats))
        print(f'Epoch {epoch+1}, Loss: {loss.item():.4f}, MAE: {mae.item():.2f}K')

# Test
model.eval()
with torch.no_grad():
    test_img, true_stats = generate_thermal_image(base_temp=320, hotspot_temp=480)
    pred_stats = model(test_img.unsqueeze(0))

print(f"\nTest prediction:")
print(f"  True:      Max={true_stats[0]:.1f}K, Avg={true_stats[1]:.1f}K, Min={true_stats[2]:.1f}K")
print(f"  Predicted: Max={pred_stats[0,0]:.1f}K, Avg={pred_stats[0,1]:.1f}K, Min={pred_stats[0,2]:.1f}K")

# Output example:
# Epoch 5, Loss: 1234.5678, MAE: 12.34K
# Epoch 10, Loss: 567.8901, MAE: 8.76K
# Epoch 15, Loss: 234.5678, MAE: 5.43K
# Epoch 20, Loss: 123.4567, MAE: 3.21K
# Epoch 25, Loss: 89.0123, MAE: 2.15K
# Epoch 30, Loss: 67.8901, MAE: 1.87K
#
# Test prediction:
#   True:      Max=489.3K, Avg=345.7K, Min=318.2K
#   Predicted: Max=485.6K, Avg=347.2K, Min=320.1K

3.6 Transfer Learning (ResNet, EfficientNet)

By utilizing models pretrained on ImageNet, high accuracy can be achieved even with a small amount of process data.

Example 6: Transfer Learning with ResNet50

# Requirements:
# - Python 3.9+
# - torchvision>=0.15.0

import torchvision.models as models

class ProcessClassifierWithTransferLearning(nn.Module):
    """Process classifier using transfer learning"""

    def __init__(self, num_classes=3, pretrained=True, freeze_backbone=True):
        """
        Args:
            num_classes: Number of classification classes
            pretrained: Use ImageNet pretrained weights
            freeze_backbone: Freeze backbone (do not fine-tune)
        """
        super(ProcessClassifierWithTransferLearning, self).__init__()

        # Load ResNet50
        self.backbone = models.resnet50(pretrained=pretrained)

        # Freeze backbone
        if freeze_backbone:
            for param in self.backbone.parameters():
                param.requires_grad = False

        # Replace final layer (ImageNet 1000 classes → process num_classes)
        num_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Linear(num_features, num_classes)

    def forward(self, x):
        return self.backbone(x)

# Model initialization
model = ProcessClassifierWithTransferLearning(num_classes=3, pretrained=True, freeze_backbone=True)

# Check trainable parameters
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in model.parameters())
print(f"Trainable parameters: {trainable_params:,} / {total_params:,} ({100*trainable_params/total_params:.2f}%)")

# Check operation with dummy data
dummy_batch = torch.randn(8, 3, 224, 224)
output = model(dummy_batch)
print(f"Output shape: {output.shape}")

# Transfer learning with EfficientNet-B0
class EfficientNetClassifier(nn.Module):
    """Transfer learning with EfficientNet"""

    def __init__(self, num_classes=3):
        super(EfficientNetClassifier, self).__init__()

        # EfficientNet-B0 (lightweight version)
        self.backbone = models.efficientnet_b0(pretrained=True)

        # Replace final layer
        num_features = self.backbone.classifier[1].in_features
        self.backbone.classifier[1] = nn.Linear(num_features, num_classes)

    def forward(self, x):
        return self.backbone(x)

efficient_model = EfficientNetClassifier(num_classes=3)
efficient_output = efficient_model(dummy_batch)

# Parameter count comparison
resnet_params = sum(p.numel() for p in model.parameters())
efficient_params = sum(p.numel() for p in efficient_model.parameters())

print(f"\nModel comparison:")
print(f"  ResNet50:       {resnet_params:,} parameters")
print(f"  EfficientNet-B0: {efficient_params:,} parameters")
print(f"  Reduction:       {100*(1 - efficient_params/resnet_params):.1f}%")

# Output example:
# Trainable parameters: 6,147 / 23,520,835 (0.03%)
# Output shape: torch.Size([8, 3])
#
# Model comparison:
#   ResNet50:       23,520,835 parameters
#   EfficientNet-B0: 4,011,391 parameters
#   Reduction:       82.9%

3.7 Visual Explanation with Grad-CAM

Using Gradient-weighted Class Activation Mapping (Grad-CAM) to visualize where the model looks to make decisions.

Example 7: Grad-CAM Implementation

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - opencv-python>=4.8.0

class GradCAM:
    """Grad-CAM: Visual explanation generation"""

    def __init__(self, model, target_layer):
        """
        Args:
            model: CNN model
            target_layer: Layer to visualize (usually the final convolutional layer)
        """
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None

        # Register hooks
        target_layer.register_forward_hook(self.save_activation)
        target_layer.register_backward_hook(self.save_gradient)

    def save_activation(self, module, input, output):
        """Save activations during forward pass"""
        self.activations = output.detach()

    def save_gradient(self, module, grad_input, grad_output):
        """Save gradients during backward pass"""
        self.gradients = grad_output[0].detach()

    def generate_cam(self, input_image, target_class=None):
        """
        Generate CAM

        Args:
            input_image: [1, 3, H, W] Input image
            target_class: Target class (if None, use max probability class)

        Returns:
            cam: [H, W] Class Activation Map
        """
        # Forward pass
        self.model.eval()
        output = self.model(input_image)

        if target_class is None:
            target_class = output.argmax(dim=1).item()

        # Backward pass with target class score
        self.model.zero_grad()
        class_score = output[0, target_class]
        class_score.backward()

        # Calculate weights from gradients and activations
        pooled_gradients = torch.mean(self.gradients, dim=[0, 2, 3])  # [channels]

        # Weighted sum
        cam = torch.zeros(self.activations.shape[2:], dtype=torch.float32)
        for i, w in enumerate(pooled_gradients):
            cam += w * self.activations[0, i, :, :]

        # ReLU + normalization
        cam = F.relu(cam)
        cam = cam - cam.min()
        cam = cam / cam.max() if cam.max() > 0 else cam

        return cam.numpy(), target_class

# Usage example
model = SimpleCNN(num_classes=3)
model.eval()

# Get final convolutional layer
target_layer = model.conv3

# Initialize Grad-CAM
grad_cam = GradCAM(model, target_layer)

# Test image
test_image = torch.randn(1, 3, 224, 224)

# Generate CAM
cam, predicted_class = grad_cam.generate_cam(test_image)

print(f"Predicted class: {predicted_class}")
print(f"CAM shape: {cam.shape}")
print(f"CAM range: [{cam.min():.4f}, {cam.max():.4f}]")

# Visualization function
import matplotlib.pyplot as plt
import cv2

def visualize_gradcam(original_image, cam, alpha=0.5):
    """Visualize Grad-CAM overlaid on original image

    Args:
        original_image: [3, H, W] Tensor
        cam: [h, w] numpy array
        alpha: Transparency
    """
    # Convert Tensor to numpy array
    img = original_image.permute(1, 2, 0).numpy()
    img = (img - img.min()) / (img.max() - img.min())  # Normalize to 0-1

    # Resize CAM to image size
    cam_resized = cv2.resize(cam, (img.shape[1], img.shape[0]))

    # Create heatmap
    heatmap = plt.cm.jet(cam_resized)[:, :, :3]  # RGB only

    # Overlay
    overlayed = heatmap * alpha + img * (1 - alpha)

    # Plot
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    axes[0].imshow(img)
    axes[0].set_title('Original Image')
    axes[0].axis('off')

    axes[1].imshow(cam_resized, cmap='jet')
    axes[1].set_title('Grad-CAM')
    axes[1].axis('off')

    axes[2].imshow(overlayed)
    axes[2].set_title('Overlay')
    axes[2].axis('off')

    plt.tight_layout()
    # plt.savefig('gradcam_visualization.png', dpi=150)
    print("Grad-CAM visualization created")

# Visualization
visualize_gradcam(test_image[0], cam)

# Output example:
# Predicted class: 1
# CAM shape: (28, 28)
# CAM range: [0.0000, 1.0000]
# Grad-CAM visualization created

✅ Applications of Grad-CAM

  • Misclassification diagnosis: Check if the model is looking at incorrect locations
  • Domain knowledge validation: Verify if chemically plausible regions are being observed
  • Model improvement: Apply data augmentation so important regions are correctly recognized

3.8 Real-Time Image Processing Pipeline

Assuming practical application in production lines, construct a pipeline that processes continuous images from cameras at high speed.

Example 8: Inference Pipeline and Optimization

import time
import threading
from queue import Queue
from collections import deque

class RealTimeInferencePipeline:
    """Real-time image processing pipeline"""

    def __init__(self, model, device='cuda', batch_size=4, buffer_size=100):
        """
        Args:
            model: Inference model
            device: 'cuda' or 'cpu'
            batch_size: Batch processing size
            buffer_size: Frame buffer size
        """
        self.model = model.to(device).eval()
        self.device = device
        self.batch_size = batch_size

        self.frame_queue = Queue(maxsize=buffer_size)
        self.result_queue = Queue()

        self.fps_buffer = deque(maxlen=30)
        self.running = False

        # Model optimization (TorchScript)
        self.optimize_model()

    def optimize_model(self):
        """Convert model to TorchScript for speedup"""
        dummy_input = torch.randn(1, 3, 224, 224).to(self.device)
        try:
            self.model = torch.jit.trace(self.model, dummy_input)
            print("Model optimized with TorchScript")
        except Exception as e:
            print(f"TorchScript optimization failed: {e}")

    def preprocess_frame(self, frame):
        """Frame preprocessing

        Args:
            frame: numpy array [H, W, 3] (BGR)
        Returns:
            tensor: [1, 3, 224, 224]
        """
        # Convert BGR to RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Resize
        frame_resized = cv2.resize(frame_rgb, (224, 224))

        # Tensor conversion and normalization
        frame_tensor = torch.FloatTensor(frame_resized).permute(2, 0, 1) / 255.0
        frame_tensor = transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )(frame_tensor)

        return frame_tensor.unsqueeze(0)

    def inference_worker(self):
        """Inference worker thread (batch processing)"""
        batch_frames = []
        batch_ids = []

        while self.running:
            try:
                # Collect frames
                while len(batch_frames) < self.batch_size and not self.frame_queue.empty():
                    frame_id, frame = self.frame_queue.get(timeout=0.01)
                    batch_frames.append(frame)
                    batch_ids.append(frame_id)

                if len(batch_frames) > 0:
                    # Batch inference
                    start_time = time.time()

                    batch_tensor = torch.cat(batch_frames, dim=0).to(self.device)

                    with torch.no_grad():
                        outputs = self.model(batch_tensor)
                        predictions = torch.argmax(outputs, dim=1).cpu().numpy()
                        confidences = torch.softmax(outputs, dim=1).cpu().numpy()

                    inference_time = time.time() - start_time

                    # Store results
                    for i, frame_id in enumerate(batch_ids):
                        self.result_queue.put({
                            'frame_id': frame_id,
                            'prediction': predictions[i],
                            'confidence': confidences[i],
                            'inference_time': inference_time / len(batch_frames)
                        })

                    # Calculate FPS
                    fps = len(batch_frames) / inference_time
                    self.fps_buffer.append(fps)

                    # Clear
                    batch_frames.clear()
                    batch_ids.clear()

                else:
                    time.sleep(0.001)  # Release CPU

            except Exception as e:
                print(f"Inference error: {e}")

    def start(self):
        """Start pipeline"""
        self.running = True
        self.inference_thread = threading.Thread(target=self.inference_worker)
        self.inference_thread.start()
        print("Inference pipeline started")

    def stop(self):
        """Stop pipeline"""
        self.running = False
        self.inference_thread.join()
        print("Inference pipeline stopped")

    def process_frame(self, frame_id, frame):
        """Add frame to processing queue

        Args:
            frame_id: Frame ID
            frame: numpy array [H, W, 3]
        """
        preprocessed = self.preprocess_frame(frame)

        if not self.frame_queue.full():
            self.frame_queue.put((frame_id, preprocessed))
        else:
            print(f"Warning: Frame buffer full, dropping frame {frame_id}")

    def get_result(self):
        """Get processing result"""
        if not self.result_queue.empty():
            return self.result_queue.get()
        return None

    def get_fps(self):
        """Get current FPS"""
        if len(self.fps_buffer) > 0:
            return sum(self.fps_buffer) / len(self.fps_buffer)
        return 0.0

# Usage example
model = SimpleCNN(num_classes=3)
device = 'cuda' if torch.cuda.is_available() else 'cpu'

pipeline = RealTimeInferencePipeline(model, device=device, batch_size=8)
pipeline.start()

# Simulation: Process continuous frames
n_frames = 100
results = []

start_time = time.time()

for frame_id in range(n_frames):
    # Generate dummy frame (in practice, images from camera)
    dummy_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)

    # Process
    pipeline.process_frame(frame_id, dummy_frame)

    # Get result
    result = pipeline.get_result()
    if result:
        results.append(result)

    time.sleep(0.01)  # Simulate 100 FPS

# Get remaining results
time.sleep(1)
while True:
    result = pipeline.get_result()
    if result is None:
        break
    results.append(result)

pipeline.stop()

# Statistics
total_time = time.time() - start_time
avg_fps = pipeline.get_fps()

print(f"\nProcessing statistics:")
print(f"  Total frames: {n_frames}")
print(f"  Processed frames: {len(results)}")
print(f"  Total time: {total_time:.2f}s")
print(f"  Average FPS: {avg_fps:.2f}")
print(f"  Average inference time: {np.mean([r['inference_time'] for r in results])*1000:.2f}ms")

# Output example:
# Model optimized with TorchScript
# Inference pipeline started
# Inference pipeline stopped
#
# Processing statistics:
#   Total frames: 100
#   Processed frames: 100
#   Total time: 1.23s
#   Average FPS: 87.34
#   Average inference time: 11.47ms

💡 Real-Time Processing Optimization

  • TorchScript conversion: Improves inference speed by 20-40%
  • Quantization: INT8 quantization reduces model size and inference time to 1/4
  • ONNX conversion: Further speedup with engines like TensorRT
  • GPU parallelization: Adjust batch size to maximize GPU utilization

Learning Objectives Check

Upon completing this chapter, you will be able to implement and explain the following:

Basic Understanding

Practical Skills

Application Ability

References

  1. LeCun, Y., et al. (1998). "Gradient-Based Learning Applied to Document Recognition." Proceedings of the IEEE, 86(11), 2278-2324.
  2. Ronneberger, O., et al. (2015). "U-Net: Convolutional Networks for Biomedical Image Segmentation." MICCAI 2015.
  3. Redmon, J., et al. (2016). "You Only Look Once: Unified, Real-Time Object Detection." CVPR 2016.
  4. He, K., et al. (2016). "Deep Residual Learning for Image Recognition." CVPR 2016.
  5. Tan, M., & Le, Q. V. (2019). "EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks." ICML 2019.
  6. Selvaraju, R. R., et al. (2017). "Grad-CAM: Visual Explanations from Deep Networks via Gradient-based Localization." ICCV 2017.

Disclaimer