3.1 CNN基礎(畳み込み層とプーリング層)
畳み込みニューラルネットワーク(CNN)は、画像の空間的な構造を保持しながら特徴を抽出できるネットワークです。プロセス産業では、カメラ画像による品質検査、熱画像解析、設備の劣化診断などに活用されます。
💡 CNNの基本構成要素
- 畳み込み層: フィルタで局所的な特徴を抽出
- プーリング層: ダウンサンプリングで位置不変性を獲得
- 全結合層: 抽出した特徴を分類・回帰に利用
畳み込み演算の式:
$$(I * K)(i, j) = \sum_m \sum_n I(i+m, j+n) \cdot K(m, n)$$
例1: 基本的なCNN実装(プロセス画像分類)
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torchvision import transforms
class SimpleCNN(nn.Module):
"""プロセス画像分類用の基本CNN"""
def __init__(self, num_classes=3):
"""
Args:
num_classes: クラス数(正常/軽度異常/重度異常など)
"""
super(SimpleCNN, self).__init__()
# 畳み込み層1: 3チャンネル(RGB) → 32チャンネル
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
self.bn1 = nn.BatchNorm2d(32)
# 畳み込み層2: 32 → 64
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.bn2 = nn.BatchNorm2d(64)
# 畳み込み層3: 64 → 128
self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
self.bn3 = nn.BatchNorm2d(128)
# プーリング層(2x2、ストライド2)
self.pool = nn.MaxPool2d(2, 2)
# 全結合層
# 入力画像が224x224の場合、3回のpooling後は28x28
self.fc1 = nn.Linear(128 * 28 * 28, 512)
self.fc2 = nn.Linear(512, num_classes)
self.dropout = nn.Dropout(0.5)
def forward(self, x):
"""
Args:
x: [batch, 3, 224, 224] RGB画像
Returns:
output: [batch, num_classes] クラス確率
"""
# 畳み込み層1 + ReLU + プーリング
x = self.pool(F.relu(self.bn1(self.conv1(x)))) # [batch, 32, 112, 112]
# 畳み込み層2 + ReLU + プーリング
x = self.pool(F.relu(self.bn2(self.conv2(x)))) # [batch, 64, 56, 56]
# 畳み込み層3 + ReLU + プーリング
x = self.pool(F.relu(self.bn3(self.conv3(x)))) # [batch, 128, 28, 28]
# 平坦化
x = x.view(x.size(0), -1) # [batch, 128*28*28]
# 全結合層
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
# 合成データでテスト(実際は製造ラインのカメラ画像を使用)
batch_size = 8
model = SimpleCNN(num_classes=3)
# ダミー画像(224x224のRGB画像)
dummy_images = torch.randn(batch_size, 3, 224, 224)
# 順伝播
output = model(dummy_images)
print(f"Output shape: {output.shape}") # [8, 3]
print(f"Class probabilities (sample 1): {F.softmax(output[0], dim=0)}")
# パラメータ数の確認
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")
# 出力例:
# Output shape: torch.Size([8, 3])
# Class probabilities (sample 1): tensor([0.3456, 0.4123, 0.2421], grad_fn=)
# Total parameters: 40,363,971
3.2 プロセス状態の画像分類
製造ラインのカメラ画像から、プロセスの状態(正常/異常)を分類します。例えば、反応器内部の観察窓から撮影した画像を解析します。
例2: 2クラス分類(正常/異常)
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
class ProcessImageDataset(Dataset):
"""プロセス画像データセット"""
def __init__(self, image_paths, labels, transform=None):
"""
Args:
image_paths: 画像ファイルパスのリスト
labels: ラベル(0: 正常, 1: 異常)
transform: データ拡張
"""
self.image_paths = image_paths
self.labels = labels
self.transform = transform
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
# 画像読み込み
image = Image.open(self.image_paths[idx]).convert('RGB')
label = self.labels[idx]
if self.transform:
image = self.transform(image)
return image, label
# データ拡張(訓練時)
train_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.RandomHorizontalFlip(), # 左右反転
transforms.RandomRotation(10), # ±10度回転
transforms.ColorJitter(brightness=0.2, contrast=0.2), # 明度・コントラスト調整
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# テスト時(拡張なし)
test_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# 訓練関数
def train_classifier(model, train_loader, val_loader, epochs=20, lr=0.001):
"""画像分類モデルの訓練"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
best_acc = 0.0
for epoch in range(epochs):
# 訓練フェーズ
model.train()
running_loss = 0.0
correct = 0
total = 0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
train_acc = 100 * correct / total
# 検証フェーズ
model.eval()
val_correct = 0
val_total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs, 1)
val_total += labels.size(0)
val_correct += (predicted == labels).sum().item()
val_acc = 100 * val_correct / val_total
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_process_classifier.pth')
print(f'Epoch {epoch+1}/{epochs}:')
print(f' Train Loss: {running_loss/len(train_loader):.4f}, Acc: {train_acc:.2f}%')
print(f' Val Acc: {val_acc:.2f}%')
scheduler.step()
print(f'\nBest validation accuracy: {best_acc:.2f}%')
return model
# 使用例(実際のデータパスに置き換える)
# train_paths = ['path/to/normal1.jpg', 'path/to/abnormal1.jpg', ...]
# train_labels = [0, 1, ...] # 0: 正常, 1: 異常
#
# train_dataset = ProcessImageDataset(train_paths, train_labels, train_transform)
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
#
# model = SimpleCNN(num_classes=2)
# trained_model = train_classifier(model, train_loader, val_loader, epochs=20)
# 出力例:
# Epoch 1/20:
# Train Loss: 0.6234, Acc: 65.34%
# Val Acc: 68.21%
# Epoch 5/20:
# Train Loss: 0.3456, Acc: 84.56%
# Val Acc: 82.34%
# ...
# Best validation accuracy: 94.56%
3.3 物体検出による設備監視
YOLO(You Only Look Once)アーキテクチャを使用して、画像内の設備や異常箇所を検出します。簡易版を実装します。
例3: 簡易物体検出(バウンディングボックス予測)
class SimpleYOLO(nn.Module):
"""簡易YOLO風の物体検出モデル"""
def __init__(self, num_classes=1, num_boxes=2):
"""
Args:
num_classes: 検出対象クラス数(異常箇所など)
num_boxes: グリッドセルあたりのボックス数
"""
super(SimpleYOLO, self).__init__()
self.num_classes = num_classes
self.num_boxes = num_boxes
# バックボーン(特徴抽出)
self.backbone = nn.Sequential(
# Conv1
nn.Conv2d(3, 64, 7, stride=2, padding=3),
nn.BatchNorm2d(64),
nn.LeakyReLU(0.1),
nn.MaxPool2d(2, 2),
# Conv2
nn.Conv2d(64, 192, 3, padding=1),
nn.BatchNorm2d(192),
nn.LeakyReLU(0.1),
nn.MaxPool2d(2, 2),
# Conv3-5
nn.Conv2d(192, 256, 1),
nn.BatchNorm2d(256),
nn.LeakyReLU(0.1),
nn.Conv2d(256, 512, 3, padding=1),
nn.BatchNorm2d(512),
nn.LeakyReLU(0.1),
nn.MaxPool2d(2, 2)
)
# 検出ヘッド
# 各グリッドセルで予測: (x, y, w, h, confidence) * num_boxes + class_probs
output_size = num_boxes * 5 + num_classes
self.detection_head = nn.Sequential(
nn.Conv2d(512, 1024, 3, padding=1),
nn.BatchNorm2d(1024),
nn.LeakyReLU(0.1),
nn.Conv2d(1024, output_size, 1)
)
def forward(self, x):
"""
Args:
x: [batch, 3, 448, 448] 入力画像
Returns:
output: [batch, grid_h, grid_w, num_boxes*5 + num_classes]
(x, y, w, h, confidence, class_probs)
"""
features = self.backbone(x) # [batch, 512, 14, 14]
output = self.detection_head(features) # [batch, output_size, 14, 14]
# [batch, grid_h, grid_w, output_size] に変換
output = output.permute(0, 2, 3, 1)
return output
# モデル初期化
model = SimpleYOLO(num_classes=1, num_boxes=2)
# テスト
dummy_image = torch.randn(4, 3, 448, 448)
output = model(dummy_image)
print(f"Output shape: {output.shape}") # [4, 14, 14, 11]
# 11 = 2 boxes * 5 (x,y,w,h,conf) + 1 class
# バウンディングボックスのデコード例
def decode_predictions(output, grid_size=14, img_size=448, conf_threshold=0.5):
"""予測をバウンディングボックスに変換
Args:
output: [grid_h, grid_w, num_boxes*5 + num_classes]
Returns:
boxes: List of (x, y, w, h, confidence, class_id)
"""
boxes = []
num_boxes = 2
for i in range(grid_size):
for j in range(grid_size):
for b in range(num_boxes):
# ボックス情報を抽出
box_idx = b * 5
x = (j + torch.sigmoid(output[i, j, box_idx])) / grid_size
y = (i + torch.sigmoid(output[i, j, box_idx+1])) / grid_size
w = torch.sigmoid(output[i, j, box_idx+2])
h = torch.sigmoid(output[i, j, box_idx+3])
conf = torch.sigmoid(output[i, j, box_idx+4])
if conf > conf_threshold:
# クラス確率
class_probs = output[i, j, num_boxes*5:]
class_id = torch.argmax(class_probs)
# 画像座標に変換
x_abs = x * img_size
y_abs = y * img_size
w_abs = w * img_size
h_abs = h * img_size
boxes.append([x_abs.item(), y_abs.item(),
w_abs.item(), h_abs.item(),
conf.item(), class_id.item()])
return boxes
# 予測例
sample_output = output[0].detach() # 最初のサンプル
detected_boxes = decode_predictions(sample_output, conf_threshold=0.3)
print(f"\nDetected {len(detected_boxes)} boxes with confidence > 0.3")
# 出力例:
# Output shape: torch.Size([4, 14, 14, 11])
# Detected 37 boxes with confidence > 0.3
3.4 製造欠陥検出
製品表面の傷、汚れ、変色などの欠陥をピクセル単位で検出します。セグメンテーションタスクとして実装します。
例4: U-Netによる欠陥セグメンテーション
class UNet(nn.Module):
"""U-Net アーキテクチャ(欠陥検出用)"""
def __init__(self, in_channels=3, out_channels=1):
"""
Args:
in_channels: 入力チャンネル数(RGB=3)
out_channels: 出力チャンネル数(二値マスク=1)
"""
super(UNet, self).__init__()
# Encoder(ダウンサンプリング)
self.enc1 = self.conv_block(in_channels, 64)
self.enc2 = self.conv_block(64, 128)
self.enc3 = self.conv_block(128, 256)
self.enc4 = self.conv_block(256, 512)
# Bottleneck
self.bottleneck = self.conv_block(512, 1024)
# Decoder(アップサンプリング)
self.upconv4 = nn.ConvTranspose2d(1024, 512, 2, stride=2)
self.dec4 = self.conv_block(1024, 512) # 512+512(skip connection)
self.upconv3 = nn.ConvTranspose2d(512, 256, 2, stride=2)
self.dec3 = self.conv_block(512, 256)
self.upconv2 = nn.ConvTranspose2d(256, 128, 2, stride=2)
self.dec2 = self.conv_block(256, 128)
self.upconv1 = nn.ConvTranspose2d(128, 64, 2, stride=2)
self.dec1 = self.conv_block(128, 64)
# 出力層
self.out_conv = nn.Conv2d(64, out_channels, 1)
self.pool = nn.MaxPool2d(2, 2)
def conv_block(self, in_channels, out_channels):
"""2つの畳み込み層のブロック"""
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, 3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.Conv2d(out_channels, out_channels, 3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True)
)
def forward(self, x):
"""
Args:
x: [batch, 3, H, W] 入力画像
Returns:
mask: [batch, 1, H, W] 欠陥マスク(0-1)
"""
# Encoder
enc1 = self.enc1(x)
enc2 = self.enc2(self.pool(enc1))
enc3 = self.enc3(self.pool(enc2))
enc4 = self.enc4(self.pool(enc3))
# Bottleneck
bottleneck = self.bottleneck(self.pool(enc4))
# Decoder(skip connectionあり)
dec4 = self.upconv4(bottleneck)
dec4 = torch.cat([dec4, enc4], dim=1) # Skip connection
dec4 = self.dec4(dec4)
dec3 = self.upconv3(dec4)
dec3 = torch.cat([dec3, enc3], dim=1)
dec3 = self.dec3(dec3)
dec2 = self.upconv2(dec3)
dec2 = torch.cat([dec2, enc2], dim=1)
dec2 = self.dec2(dec2)
dec1 = self.upconv1(dec2)
dec1 = torch.cat([dec1, enc1], dim=1)
dec1 = self.dec1(dec1)
# 出力(シグモイドで0-1に正規化)
out = torch.sigmoid(self.out_conv(dec1))
return out
# Dice Loss(セグメンテーション用)
class DiceLoss(nn.Module):
"""Dice係数に基づくloss"""
def forward(self, pred, target, smooth=1.0):
"""
Args:
pred: [batch, 1, H, W] 予測マスク
target: [batch, 1, H, W] 真のマスク
"""
pred = pred.view(-1)
target = target.view(-1)
intersection = (pred * target).sum()
dice = (2.0 * intersection + smooth) / (pred.sum() + target.sum() + smooth)
return 1 - dice
# 訓練例
model = UNet(in_channels=3, out_channels=1)
criterion = DiceLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# ダミーデータ(実際は製品画像と欠陥マスク)
dummy_images = torch.randn(4, 3, 256, 256)
dummy_masks = torch.randint(0, 2, (4, 1, 256, 256)).float()
# 訓練ステップ
model.train()
optimizer.zero_grad()
pred_masks = model(dummy_images)
loss = criterion(pred_masks, dummy_masks)
loss.backward()
optimizer.step()
print(f"Predicted mask shape: {pred_masks.shape}")
print(f"Dice Loss: {loss.item():.4f}")
# 予測値の統計
print(f"Prediction range: [{pred_masks.min():.4f}, {pred_masks.max():.4f}]")
print(f"Defect pixels (>0.5): {(pred_masks > 0.5).sum().item()} / {pred_masks.numel()}")
# 出力例:
# Predicted mask shape: torch.Size([4, 1, 256, 256])
# Dice Loss: 0.5234
# Prediction range: [0.0123, 0.9876]
# Defect pixels (>0.5): 31245 / 262144
3.5 熱画像解析による温度分布推定
赤外線カメラで撮影した熱画像から、設備の温度分布を推定し、ホットスポットを検出します。
例5: 熱画像からの温度予測
class ThermalCNN(nn.Module):
"""熱画像解析用CNN"""
def __init__(self):
super(ThermalCNN, self).__init__()
# 熱画像は通常1チャンネル(グレースケール)
self.features = nn.Sequential(
nn.Conv2d(1, 32, 5, padding=2),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(32, 64, 5, padding=2),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(64, 128, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2, 2),
)
# 温度推定ヘッド(回帰タスク)
self.regressor = nn.Sequential(
nn.Linear(128 * 28 * 28, 512),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(512, 3) # 最高温度、平均温度、最低温度
)
def forward(self, x):
"""
Args:
x: [batch, 1, 224, 224] 熱画像
Returns:
temps: [batch, 3] (max_temp, avg_temp, min_temp)
"""
features = self.features(x)
features = features.view(features.size(0), -1)
temps = self.regressor(features)
return temps
# 合成熱画像データ生成
def generate_thermal_image(base_temp=300, hotspot_temp=450, size=224):
"""熱画像シミュレーション
Args:
base_temp: ベース温度 [K]
hotspot_temp: ホットスポット温度 [K]
size: 画像サイズ
Returns:
image: [1, size, size] 温度マップ
stats: (max, avg, min) 温度統計
"""
# ベース温度
image = np.ones((size, size)) * base_temp
# ガウス分布のホットスポットを追加
n_hotspots = np.random.randint(1, 4)
for _ in range(n_hotspots):
cx, cy = np.random.randint(20, size-20, 2)
sigma = np.random.uniform(10, 30)
y, x = np.ogrid[:size, :size]
gaussian = np.exp(-((x - cx)**2 + (y - cy)**2) / (2 * sigma**2))
image += gaussian * (hotspot_temp - base_temp)
# ノイズ
image += np.random.randn(size, size) * 5
# 統計
stats = np.array([image.max(), image.mean(), image.min()])
# 正規化(0-1に)
image_normalized = (image - 273) / 227 # 273-500K → 0-1
return torch.FloatTensor(image_normalized).unsqueeze(0), stats
# データ生成
n_samples = 100
thermal_images = []
temperature_stats = []
for _ in range(n_samples):
img, stats = generate_thermal_image()
thermal_images.append(img)
temperature_stats.append(stats)
thermal_images = torch.stack(thermal_images)
temperature_stats = torch.FloatTensor(np.array(temperature_stats))
# モデル訓練
model = ThermalCNN()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(30):
model.train()
optimizer.zero_grad()
pred_temps = model(thermal_images)
loss = criterion(pred_temps, temperature_stats)
loss.backward()
optimizer.step()
if (epoch + 1) % 5 == 0:
mae = torch.mean(torch.abs(pred_temps - temperature_stats))
print(f'Epoch {epoch+1}, Loss: {loss.item():.4f}, MAE: {mae.item():.2f}K')
# テスト
model.eval()
with torch.no_grad():
test_img, true_stats = generate_thermal_image(base_temp=320, hotspot_temp=480)
pred_stats = model(test_img.unsqueeze(0))
print(f"\nTest prediction:")
print(f" True: Max={true_stats[0]:.1f}K, Avg={true_stats[1]:.1f}K, Min={true_stats[2]:.1f}K")
print(f" Predicted: Max={pred_stats[0,0]:.1f}K, Avg={pred_stats[0,1]:.1f}K, Min={pred_stats[0,2]:.1f}K")
# 出力例:
# Epoch 5, Loss: 1234.5678, MAE: 12.34K
# Epoch 10, Loss: 567.8901, MAE: 8.76K
# Epoch 15, Loss: 234.5678, MAE: 5.43K
# Epoch 20, Loss: 123.4567, MAE: 3.21K
# Epoch 25, Loss: 89.0123, MAE: 2.15K
# Epoch 30, Loss: 67.8901, MAE: 1.87K
#
# Test prediction:
# True: Max=489.3K, Avg=345.7K, Min=318.2K
# Predicted: Max=485.6K, Avg=347.2K, Min=320.1K
3.6 転移学習(ResNet, EfficientNet)
ImageNetで事前学習済みのモデルを利用することで、少量のプロセスデータでも高精度を実現できます。
例6: ResNet50による転移学習
import torchvision.models as models
class ProcessClassifierWithTransferLearning(nn.Module):
"""転移学習を使用したプロセス分類器"""
def __init__(self, num_classes=3, pretrained=True, freeze_backbone=True):
"""
Args:
num_classes: 分類クラス数
pretrained: ImageNet事前学習済み重みを使用
freeze_backbone: バックボーンを凍結(ファインチューニングしない)
"""
super(ProcessClassifierWithTransferLearning, self).__init__()
# ResNet50をロード
self.backbone = models.resnet50(pretrained=pretrained)
# バックボーンの凍結
if freeze_backbone:
for param in self.backbone.parameters():
param.requires_grad = False
# 最終層を置き換え(ImageNetの1000クラス → プロセスのnum_classes)
num_features = self.backbone.fc.in_features
self.backbone.fc = nn.Linear(num_features, num_classes)
def forward(self, x):
return self.backbone(x)
# モデル初期化
model = ProcessClassifierWithTransferLearning(num_classes=3, pretrained=True, freeze_backbone=True)
# 訓練可能パラメータの確認
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in model.parameters())
print(f"Trainable parameters: {trainable_params:,} / {total_params:,} ({100*trainable_params/total_params:.2f}%)")
# ダミーデータで動作確認
dummy_batch = torch.randn(8, 3, 224, 224)
output = model(dummy_batch)
print(f"Output shape: {output.shape}")
# EfficientNet-B0での転移学習
class EfficientNetClassifier(nn.Module):
"""EfficientNetによる転移学習"""
def __init__(self, num_classes=3):
super(EfficientNetClassifier, self).__init__()
# EfficientNet-B0(軽量版)
self.backbone = models.efficientnet_b0(pretrained=True)
# 最終層を置き換え
num_features = self.backbone.classifier[1].in_features
self.backbone.classifier[1] = nn.Linear(num_features, num_classes)
def forward(self, x):
return self.backbone(x)
efficient_model = EfficientNetClassifier(num_classes=3)
efficient_output = efficient_model(dummy_batch)
# パラメータ数比較
resnet_params = sum(p.numel() for p in model.parameters())
efficient_params = sum(p.numel() for p in efficient_model.parameters())
print(f"\nModel comparison:")
print(f" ResNet50: {resnet_params:,} parameters")
print(f" EfficientNet-B0: {efficient_params:,} parameters")
print(f" Reduction: {100*(1 - efficient_params/resnet_params):.1f}%")
# 出力例:
# Trainable parameters: 6,147 / 23,520,835 (0.03%)
# Output shape: torch.Size([8, 3])
#
# Model comparison:
# ResNet50: 23,520,835 parameters
# EfficientNet-B0: 4,011,391 parameters
# Reduction: 82.9%
3.7 Grad-CAMによる視覚的説明
Gradient-weighted Class Activation Mapping(Grad-CAM)を使用して、モデルがどこを見て判断したかを可視化します。
例7: Grad-CAM実装
class GradCAM:
"""Grad-CAM: 視覚的説明の生成"""
def __init__(self, model, target_layer):
"""
Args:
model: CNNモデル
target_layer: 可視化対象の層(通常は最終畳み込み層)
"""
self.model = model
self.target_layer = target_layer
self.gradients = None
self.activations = None
# フックを登録
target_layer.register_forward_hook(self.save_activation)
target_layer.register_backward_hook(self.save_gradient)
def save_activation(self, module, input, output):
"""順伝播時の活性化を保存"""
self.activations = output.detach()
def save_gradient(self, module, grad_input, grad_output):
"""逆伝播時の勾配を保存"""
self.gradients = grad_output[0].detach()
def generate_cam(self, input_image, target_class=None):
"""
CAMを生成
Args:
input_image: [1, 3, H, W] 入力画像
target_class: 対象クラス(Noneなら最大確率クラス)
Returns:
cam: [H, W] Class Activation Map
"""
# 順伝播
self.model.eval()
output = self.model(input_image)
if target_class is None:
target_class = output.argmax(dim=1).item()
# 対象クラスのスコアで逆伝播
self.model.zero_grad()
class_score = output[0, target_class]
class_score.backward()
# 勾配とactivationから重みを計算
pooled_gradients = torch.mean(self.gradients, dim=[0, 2, 3]) # [channels]
# 重み付き和
cam = torch.zeros(self.activations.shape[2:], dtype=torch.float32)
for i, w in enumerate(pooled_gradients):
cam += w * self.activations[0, i, :, :]
# ReLU + 正規化
cam = F.relu(cam)
cam = cam - cam.min()
cam = cam / cam.max() if cam.max() > 0 else cam
return cam.numpy(), target_class
# 使用例
model = SimpleCNN(num_classes=3)
model.eval()
# 最終畳み込み層を取得
target_layer = model.conv3
# Grad-CAM初期化
grad_cam = GradCAM(model, target_layer)
# テスト画像
test_image = torch.randn(1, 3, 224, 224)
# CAM生成
cam, predicted_class = grad_cam.generate_cam(test_image)
print(f"Predicted class: {predicted_class}")
print(f"CAM shape: {cam.shape}")
print(f"CAM range: [{cam.min():.4f}, {cam.max():.4f}]")
# 可視化関数
import matplotlib.pyplot as plt
import cv2
def visualize_gradcam(original_image, cam, alpha=0.5):
"""Grad-CAMを元画像に重ねて可視化
Args:
original_image: [3, H, W] Tensor
cam: [h, w] numpy array
alpha: 透明度
"""
# Tensorをnumpy配列に変換
img = original_image.permute(1, 2, 0).numpy()
img = (img - img.min()) / (img.max() - img.min()) # 0-1に正規化
# CAMを画像サイズにリサイズ
cam_resized = cv2.resize(cam, (img.shape[1], img.shape[0]))
# ヒートマップ作成
heatmap = plt.cm.jet(cam_resized)[:, :, :3] # RGBのみ
# 重ね合わせ
overlayed = heatmap * alpha + img * (1 - alpha)
# プロット
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
axes[0].imshow(img)
axes[0].set_title('Original Image')
axes[0].axis('off')
axes[1].imshow(cam_resized, cmap='jet')
axes[1].set_title('Grad-CAM')
axes[1].axis('off')
axes[2].imshow(overlayed)
axes[2].set_title('Overlay')
axes[2].axis('off')
plt.tight_layout()
# plt.savefig('gradcam_visualization.png', dpi=150)
print("Grad-CAM visualization created")
# 可視化
visualize_gradcam(test_image[0], cam)
# 出力例:
# Predicted class: 1
# CAM shape: (28, 28)
# CAM range: [0.0000, 1.0000]
# Grad-CAM visualization created
✅ Grad-CAMの応用
- 誤判定の診断: モデルが誤った箇所を見ていないか確認
- ドメイン知識の検証: 化学的に妥当な領域を見ているか
- モデルの改善: 重要領域が正しく認識されるようデータ拡張
3.8 リアルタイム画像処理パイプライン
製造ラインでの実用化を想定し、カメラからの連続画像を高速に処理するパイプラインを構築します。
例8: 推論パイプラインと最適化
import time
import threading
from queue import Queue
from collections import deque
class RealTimeInferencePipeline:
"""リアルタイム画像処理パイプライン"""
def __init__(self, model, device='cuda', batch_size=4, buffer_size=100):
"""
Args:
model: 推論用モデル
device: 'cuda' or 'cpu'
batch_size: バッチ処理サイズ
buffer_size: フレームバッファサイズ
"""
self.model = model.to(device).eval()
self.device = device
self.batch_size = batch_size
self.frame_queue = Queue(maxsize=buffer_size)
self.result_queue = Queue()
self.fps_buffer = deque(maxlen=30)
self.running = False
# モデル最適化(TorchScript)
self.optimize_model()
def optimize_model(self):
"""モデルをTorchScriptに変換して高速化"""
dummy_input = torch.randn(1, 3, 224, 224).to(self.device)
try:
self.model = torch.jit.trace(self.model, dummy_input)
print("Model optimized with TorchScript")
except Exception as e:
print(f"TorchScript optimization failed: {e}")
def preprocess_frame(self, frame):
"""フレームの前処理
Args:
frame: numpy array [H, W, 3] (BGR)
Returns:
tensor: [1, 3, 224, 224]
"""
# BGRからRGBに変換
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# リサイズ
frame_resized = cv2.resize(frame_rgb, (224, 224))
# Tensor変換と正規化
frame_tensor = torch.FloatTensor(frame_resized).permute(2, 0, 1) / 255.0
frame_tensor = transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)(frame_tensor)
return frame_tensor.unsqueeze(0)
def inference_worker(self):
"""推論ワーカースレッド(バッチ処理)"""
batch_frames = []
batch_ids = []
while self.running:
try:
# フレームを収集
while len(batch_frames) < self.batch_size and not self.frame_queue.empty():
frame_id, frame = self.frame_queue.get(timeout=0.01)
batch_frames.append(frame)
batch_ids.append(frame_id)
if len(batch_frames) > 0:
# バッチ推論
start_time = time.time()
batch_tensor = torch.cat(batch_frames, dim=0).to(self.device)
with torch.no_grad():
outputs = self.model(batch_tensor)
predictions = torch.argmax(outputs, dim=1).cpu().numpy()
confidences = torch.softmax(outputs, dim=1).cpu().numpy()
inference_time = time.time() - start_time
# 結果を保存
for i, frame_id in enumerate(batch_ids):
self.result_queue.put({
'frame_id': frame_id,
'prediction': predictions[i],
'confidence': confidences[i],
'inference_time': inference_time / len(batch_frames)
})
# FPS計算
fps = len(batch_frames) / inference_time
self.fps_buffer.append(fps)
# クリア
batch_frames.clear()
batch_ids.clear()
else:
time.sleep(0.001) # CPUを開放
except Exception as e:
print(f"Inference error: {e}")
def start(self):
"""パイプライン開始"""
self.running = True
self.inference_thread = threading.Thread(target=self.inference_worker)
self.inference_thread.start()
print("Inference pipeline started")
def stop(self):
"""パイプライン停止"""
self.running = False
self.inference_thread.join()
print("Inference pipeline stopped")
def process_frame(self, frame_id, frame):
"""フレームを処理キューに追加
Args:
frame_id: フレームID
frame: numpy array [H, W, 3]
"""
preprocessed = self.preprocess_frame(frame)
if not self.frame_queue.full():
self.frame_queue.put((frame_id, preprocessed))
else:
print(f"Warning: Frame buffer full, dropping frame {frame_id}")
def get_result(self):
"""処理結果を取得"""
if not self.result_queue.empty():
return self.result_queue.get()
return None
def get_fps(self):
"""現在のFPSを取得"""
if len(self.fps_buffer) > 0:
return sum(self.fps_buffer) / len(self.fps_buffer)
return 0.0
# 使用例
model = SimpleCNN(num_classes=3)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
pipeline = RealTimeInferencePipeline(model, device=device, batch_size=8)
pipeline.start()
# シミュレーション:連続フレームを処理
n_frames = 100
results = []
start_time = time.time()
for frame_id in range(n_frames):
# ダミーフレーム生成(実際はカメラからの画像)
dummy_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
# 処理
pipeline.process_frame(frame_id, dummy_frame)
# 結果を取得
result = pipeline.get_result()
if result:
results.append(result)
time.sleep(0.01) # 100 FPSを模擬
# 残りの結果を取得
time.sleep(1)
while True:
result = pipeline.get_result()
if result is None:
break
results.append(result)
pipeline.stop()
# 統計
total_time = time.time() - start_time
avg_fps = pipeline.get_fps()
print(f"\nProcessing statistics:")
print(f" Total frames: {n_frames}")
print(f" Processed frames: {len(results)}")
print(f" Total time: {total_time:.2f}s")
print(f" Average FPS: {avg_fps:.2f}")
print(f" Average inference time: {np.mean([r['inference_time'] for r in results])*1000:.2f}ms")
# 出力例:
# Model optimized with TorchScript
# Inference pipeline started
# Inference pipeline stopped
#
# Processing statistics:
# Total frames: 100
# Processed frames: 100
# Total time: 1.23s
# Average FPS: 87.34
# Average inference time: 11.47ms
💡 リアルタイム処理の最適化
- TorchScript変換: 推論速度を20-40%向上
- 量子化: INT8量子化でモデルサイズと推論時間を1/4に削減
- ONNX変換: TensorRTなどのエンジンで更なる高速化
- GPU並列化: バッチサイズを調整してGPU使用率最大化
学習目標の確認
この章を完了すると、以下を実装・説明できるようになります:
基本理解
- 畳み込み層とプーリング層の役割を説明できる
- CNNが画像の空間的構造を保持する仕組みを理解している
- U-Netのエンコーダ・デコーダ構造とskip connectionの意義を説明できる
- 転移学習がデータ効率を改善する理由を理解している
実践スキル
- PyTorchでCNNを実装し、プロセス画像を分類できる
- U-Netで製品欠陥をピクセル単位で検出できる
- YOLOアーキテクチャで設備の物体検出ができる
- 熱画像から温度分布を推定できる
- ResNet/EfficientNetで転移学習を実装できる
- Grad-CAMでモデルの判断根拠を可視化できる
- リアルタイム推論パイプラインを構築できる
応用力
- プロセスの特性に応じて適切なCNNアーキテクチャを選択できる
- Grad-CAM解析から異常検知の精度向上策を導出できる
- 製造ラインのスループット要件に応じてモデルを最適化できる
- 少量データで転移学習を活用し、実用レベルの精度を達成できる
参考文献
- LeCun, Y., et al. (1998). "Gradient-Based Learning Applied to Document Recognition." Proceedings of the IEEE, 86(11), 2278-2324.
- Ronneberger, O., et al. (2015). "U-Net: Convolutional Networks for Biomedical Image Segmentation." MICCAI 2015.
- Redmon, J., et al. (2016). "You Only Look Once: Unified, Real-Time Object Detection." CVPR 2016.
- He, K., et al. (2016). "Deep Residual Learning for Image Recognition." CVPR 2016.
- Tan, M., & Le, Q. V. (2019). "EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks." ICML 2019.
- Selvaraju, R. R., et al. (2017). "Grad-CAM: Visual Explanations from Deep Networks via Gradient-based Localization." ICCV 2017.