第4章 予知保全とトラブルシューティング

Predictive Maintenance and Troubleshooting

← シリーズ目次に戻る

📖 本章の概要

食品製造プロセスにおいて、設備の突然の故障や品質トラブルは、生産停止や製品廃棄につながる重大な問題です。 本章では、AIを活用した予知保全(Predictive Maintenance)技術により、設備故障を事前に予測し、 計画的な保全を実現する方法を学びます。また、異常が発生した際の根本原因分析やトラブルシューティングの 意思決定支援システムについても解説します。

🎯 学習目標

🔧 4.1 設備故障予測の基礎

予知保全の3つのアプローチ

予知保全には主に以下の3つのアプローチがあります:

💡 予知保全のメリット
・計画外停止時間の削減(30-50%減)
・保全コストの削減(20-30%減)
・設備寿命の延長(20-40%増)
・部品在庫の最適化

残存有用寿命(RUL)の定義

$$ \text{RUL}(t) = T_{\text{failure}} - t $$

ここで、\( T_{\text{failure}} \) は故障発生時刻、\( t \) は現在時刻

💻 コード例4.1: Random Forestによる設備故障予測

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import warnings
warnings.filterwarnings('ignore')

plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 設備センサデータの生成(1000サンプル)
np.random.seed(42)
n_samples = 1000

# 正常運転データ(70%)
n_normal = int(n_samples * 0.7)
temp_normal = np.random.normal(75, 5, n_normal)  # 温度(℃)
vibration_normal = np.random.normal(0.3, 0.05, n_normal)  # 振動(mm/s)
pressure_normal = np.random.normal(2.0, 0.2, n_normal)  # 圧力(MPa)
current_normal = np.random.normal(15, 2, n_normal)  # 電流(A)
runtime_normal = np.random.uniform(0, 5000, n_normal)  # 稼働時間(h)

# 故障前兆データ(30%)
n_failure = n_samples - n_normal
temp_failure = np.random.normal(90, 8, n_failure)  # 高温傾向
vibration_failure = np.random.normal(0.6, 0.1, n_failure)  # 振動増加
pressure_failure = np.random.normal(2.5, 0.3, n_failure)  # 圧力上昇
current_failure = np.random.normal(20, 3, n_failure)  # 電流増加
runtime_failure = np.random.uniform(4000, 8000, n_failure)  # 長時間稼働

# データフレーム作成
data = pd.DataFrame({
    'temperature': np.concatenate([temp_normal, temp_failure]),
    'vibration': np.concatenate([vibration_normal, vibration_failure]),
    'pressure': np.concatenate([pressure_normal, pressure_failure]),
    'current': np.concatenate([current_normal, current_failure]),
    'runtime_hours': np.concatenate([runtime_normal, runtime_failure]),
    'failure_risk': np.concatenate([np.zeros(n_normal), np.ones(n_failure)])
})

# 派生特徴量の作成
data['temp_vibration_ratio'] = data['temperature'] / (data['vibration'] * 100)
data['power_consumption'] = data['current'] * data['pressure']  # 仮想的な電力消費量
data['runtime_category'] = pd.cut(data['runtime_hours'], bins=5, labels=['Very Low', 'Low', 'Medium', 'High', 'Very High'])
data['runtime_category'] = data['runtime_category'].cat.codes

# 訓練データとテストデータの分割
X = data.drop('failure_risk', axis=1)
y = data['failure_risk']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

# Random Forestモデルの訓練
rf_model = RandomForestClassifier(
    n_estimators=100,
    max_depth=10,
    min_samples_split=10,
    min_samples_leaf=5,
    random_state=42,
    class_weight='balanced'
)
rf_model.fit(X_train, y_train)

# 予測と評価
y_pred = rf_model.predict(X_test)
y_pred_proba = rf_model.predict_proba(X_test)[:, 1]

print("=" * 60)
print("設備故障予測モデル評価")
print("=" * 60)
print("\n分類レポート:")
print(classification_report(y_test, y_pred, target_names=['正常', '故障リスク']))

# 可視化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 混同行列
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Greens', ax=axes[0, 0],
            xticklabels=['正常', '故障リスク'], yticklabels=['正常', '故障リスク'])
axes[0, 0].set_title('混同行列', fontsize=12, fontweight='bold')
axes[0, 0].set_ylabel('真のラベル')
axes[0, 0].set_xlabel('予測ラベル')

# ROC曲線
fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba)
roc_auc = auc(fpr, tpr)
axes[0, 1].plot(fpr, tpr, color='#11998e', lw=2, label=f'ROC曲線 (AUC = {roc_auc:.3f})')
axes[0, 1].plot([0, 1], [0, 1], color='gray', lw=1, linestyle='--', label='ランダム')
axes[0, 1].set_xlim([0.0, 1.0])
axes[0, 1].set_ylim([0.0, 1.05])
axes[0, 1].set_xlabel('偽陽性率 (False Positive Rate)')
axes[0, 1].set_ylabel('真陽性率 (True Positive Rate)')
axes[0, 1].set_title('ROC曲線', fontsize=12, fontweight='bold')
axes[0, 1].legend(loc='lower right')
axes[0, 1].grid(alpha=0.3)

# 特徴量重要度
feature_importance = pd.DataFrame({
    'feature': X.columns,
    'importance': rf_model.feature_importances_
}).sort_values('importance', ascending=True)

axes[1, 0].barh(feature_importance['feature'], feature_importance['importance'], color='#38ef7d')
axes[1, 0].set_xlabel('重要度')
axes[1, 0].set_title('特徴量重要度', fontsize=12, fontweight='bold')
axes[1, 0].grid(axis='x', alpha=0.3)

# 故障リスクスコア分布
axes[1, 1].hist(y_pred_proba[y_test == 0], bins=30, alpha=0.6, label='正常', color='green')
axes[1, 1].hist(y_pred_proba[y_test == 1], bins=30, alpha=0.6, label='故障リスク', color='red')
axes[1, 1].axvline(x=0.5, color='black', linestyle='--', linewidth=1, label='閾値 (0.5)')
axes[1, 1].set_xlabel('故障リスクスコア')
axes[1, 1].set_ylabel('頻度')
axes[1, 1].set_title('故障リスクスコア分布', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)

plt.tight_layout()
plt.savefig('failure_prediction_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

print(f"\nROC-AUC スコア: {roc_auc:.4f}")
print("\n特徴量重要度トップ3:")
print(feature_importance.tail(3).to_string(index=False))

実装のポイント:

⏱️ 4.2 残存有用寿命(RUL)推定

RUL推定の重要性

故障を「いつ起こるか」まで予測できれば、最適な保全タイミングを計画できます。 RUL推定は、現在の設備状態から故障までの残り時間を推定する技術です。

劣化曲線モデル

$$ \text{Health Index}(t) = 100 \times \exp\left(-\frac{t}{\tau}\right) $$

ここで、\( \tau \) は劣化時定数、健全度が閾値(例: 20%)を下回ると故障と判定

💻 コード例4.2: LSTMによる残存有用寿命推定

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import warnings
warnings.filterwarnings('ignore')

# TensorFlow/Kerasのインポート(実際の実装では必要)
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import LSTM, Dense, Dropout

plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 設備劣化データの生成(模擬的なRULデータ)
np.random.seed(42)
n_cycles = 200  # ライフサイクル数
time_steps = 100  # 各サイクルのタイムステップ数

# 3台の設備の劣化パターンを生成
def generate_degradation_data(n_cycles, time_steps, failure_threshold=20):
    """設備劣化データの生成"""
    data_list = []

    for cycle_id in range(n_cycles):
        # ランダムな劣化速度
        tau = np.random.uniform(60, 120)  # 劣化時定数
        noise_level = np.random.uniform(2, 5)

        # 健全度の計算(指数関数的劣化 + ノイズ)
        t = np.arange(time_steps)
        health_index = 100 * np.exp(-t / tau) + np.random.normal(0, noise_level, time_steps)
        health_index = np.clip(health_index, 0, 100)

        # RUL計算(健全度が閾値を下回るまでの時間)
        failure_time = np.where(health_index < failure_threshold)[0]
        if len(failure_time) > 0:
            failure_step = failure_time[0]
        else:
            failure_step = time_steps

        rul = np.maximum(0, failure_step - t)

        # センサデータ(健全度に応じて変化)
        temperature = 60 + (100 - health_index) * 0.5 + np.random.normal(0, 2, time_steps)
        vibration = 0.2 + (100 - health_index) * 0.008 + np.random.normal(0, 0.05, time_steps)
        pressure = 1.8 + (100 - health_index) * 0.01 + np.random.normal(0, 0.1, time_steps)

        for step in range(time_steps):
            data_list.append({
                'cycle_id': cycle_id,
                'time_step': step,
                'temperature': temperature[step],
                'vibration': vibration[step],
                'pressure': pressure[step],
                'health_index': health_index[step],
                'RUL': rul[step]
            })

    return pd.DataFrame(data_list)

# データ生成
degradation_data = generate_degradation_data(n_cycles, time_steps)

# データの前処理
feature_columns = ['temperature', 'vibration', 'pressure', 'health_index']
scaler = MinMaxScaler()
degradation_data[feature_columns] = scaler.fit_transform(degradation_data[feature_columns])

# RULの正規化(0-1範囲)
max_rul = degradation_data['RUL'].max()
degradation_data['RUL_normalized'] = degradation_data['RUL'] / max_rul

# 訓練データとテストデータの分割
train_cycles = int(n_cycles * 0.8)
train_data = degradation_data[degradation_data['cycle_id'] < train_cycles]
test_data = degradation_data[degradation_data['cycle_id'] >= train_cycles]

print("=" * 60)
print("残存有用寿命(RUL)推定システム")
print("=" * 60)
print(f"\n総サイクル数: {n_cycles}")
print(f"訓練サイクル: {train_cycles}")
print(f"テストサイクル: {n_cycles - train_cycles}")
print(f"各サイクルのタイムステップ数: {time_steps}")
print(f"\nRUL範囲: 0 ~ {max_rul:.0f} ステップ")

# 簡易的な予測(実際のLSTMの代わりに移動平均ベースの予測を使用)
def simple_rul_prediction(data, window=10):
    """移動平均ベースの簡易RUL予測"""
    predictions = []
    actuals = []

    for cycle_id in data['cycle_id'].unique():
        cycle_data = data[data['cycle_id'] == cycle_id].copy()

        for i in range(window, len(cycle_data)):
            # 過去windowステップの健全度の平均的な低下率から予測
            recent_health = cycle_data.iloc[i-window:i]['health_index'].values
            health_decline_rate = (recent_health[0] - recent_health[-1]) / window

            current_health = cycle_data.iloc[i]['health_index']

            # 健全度が20%に達するまでのステップ数を推定
            if health_decline_rate > 0.0001:
                estimated_rul = max(0, (current_health - 0.2) / health_decline_rate)
            else:
                estimated_rul = max_rul  # 劣化が見られない場合

            estimated_rul = min(estimated_rul, max_rul)  # 上限制約

            predictions.append(estimated_rul / max_rul)  # 正規化
            actuals.append(cycle_data.iloc[i]['RUL_normalized'])

    return np.array(predictions), np.array(actuals)

# テストデータで予測
y_pred, y_true = simple_rul_prediction(test_data, window=10)

# 評価指標
mae = mean_absolute_error(y_true * max_rul, y_pred * max_rul)
rmse = np.sqrt(mean_squared_error(y_true * max_rul, y_pred * max_rul))

print(f"\n予測性能:")
print(f"MAE (平均絶対誤差): {mae:.2f} ステップ")
print(f"RMSE (二乗平均平方根誤差): {rmse:.2f} ステップ")

# 可視化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# サンプルサイクルの劣化曲線
sample_cycles = [0, 1, 2]
for i, cycle_id in enumerate(sample_cycles):
    cycle_data = degradation_data[degradation_data['cycle_id'] == cycle_id]
    axes[0, 0].plot(cycle_data['time_step'], cycle_data['health_index'],
                    label=f'サイクル {cycle_id}', alpha=0.7)

axes[0, 0].axhline(y=0.2, color='red', linestyle='--', linewidth=1, label='故障閾値 (20%)')
axes[0, 0].set_xlabel('タイムステップ')
axes[0, 0].set_ylabel('健全度(正規化)')
axes[0, 0].set_title('設備劣化曲線(サンプル)', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)

# RUL予測 vs 実測
axes[0, 1].scatter(y_true * max_rul, y_pred * max_rul, alpha=0.3, s=10, color='#11998e')
axes[0, 1].plot([0, max_rul], [0, max_rul], 'r--', lw=2, label='理想的な予測')
axes[0, 1].set_xlabel('実測RUL(ステップ)')
axes[0, 1].set_ylabel('予測RUL(ステップ)')
axes[0, 1].set_title('RUL予測精度', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)

# 予測誤差の分布
errors = (y_pred - y_true) * max_rul
axes[1, 0].hist(errors, bins=50, color='#38ef7d', alpha=0.7, edgecolor='black')
axes[1, 0].axvline(x=0, color='red', linestyle='--', linewidth=2, label='誤差ゼロ')
axes[1, 0].set_xlabel('予測誤差(ステップ)')
axes[1, 0].set_ylabel('頻度')
axes[1, 0].set_title('RUL予測誤差分布', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)

# 特定サイクルのRUL時系列
test_cycle = test_data['cycle_id'].unique()[0]
test_cycle_data = test_data[test_data['cycle_id'] == test_cycle].iloc[10:]  # windowサイズ分スキップ
cycle_pred, cycle_true = simple_rul_prediction(
    test_data[test_data['cycle_id'] == test_cycle], window=10
)

axes[1, 1].plot(test_cycle_data['time_step'].values, cycle_true * max_rul,
                label='実測RUL', color='blue', linewidth=2)
axes[1, 1].plot(test_cycle_data['time_step'].values, cycle_pred * max_rul,
                label='予測RUL', color='orange', linewidth=2, linestyle='--')
axes[1, 1].fill_between(test_cycle_data['time_step'].values,
                         cycle_pred * max_rul - 10, cycle_pred * max_rul + 10,
                         alpha=0.2, color='orange', label='予測誤差範囲 (±10)')
axes[1, 1].set_xlabel('タイムステップ')
axes[1, 1].set_ylabel('RUL(ステップ)')
axes[1, 1].set_title(f'RUL時系列予測(サイクル {test_cycle})', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)

plt.tight_layout()
plt.savefig('rul_estimation_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

実装のポイント:

🚨 4.3 異常検出と早期警報システム

異常検出の2つのアプローチ

  1. 教師あり学習: 過去の故障事例から学習(ラベル付きデータ必要)
  2. 教師なし学習: 正常パターンからの逸脱を検出(異常データ不要)
💡 教師なし異常検出の利点
・未知の異常パターンも検出可能
・故障事例の少ない新設備にも適用可能
・ラベル付けコストの削減

💻 コード例4.3: Isolation Forestによる異常検出

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import warnings
warnings.filterwarnings('ignore')

plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 正常運転データの生成
np.random.seed(42)
n_normal = 800
n_anomaly = 50

# 正常データ(多変量正規分布)
mean_normal = [70, 0.25, 2.0, 14, 50]
cov_normal = [[25, 0, 0, 0, 0],
              [0, 0.0025, 0, 0, 0],
              [0, 0, 0.04, 0, 0],
              [0, 0, 0, 4, 0],
              [0, 0, 0, 0, 100]]

normal_data = np.random.multivariate_normal(mean_normal, cov_normal, n_normal)

# 異常データ(様々な異常パターン)
anomaly_patterns = []

# パターン1: 高温異常
high_temp = np.random.multivariate_normal([95, 0.25, 2.0, 14, 50], cov_normal, n_anomaly // 5)
anomaly_patterns.append(high_temp)

# パターン2: 振動異常
high_vibration = np.random.multivariate_normal([70, 0.7, 2.0, 14, 50], cov_normal, n_anomaly // 5)
anomaly_patterns.append(high_vibration)

# パターン3: 圧力異常
high_pressure = np.random.multivariate_normal([70, 0.25, 3.5, 14, 50], cov_normal, n_anomaly // 5)
anomaly_patterns.append(high_pressure)

# パターン4: 電流異常
high_current = np.random.multivariate_normal([70, 0.25, 2.0, 25, 50], cov_normal, n_anomaly // 5)
anomaly_patterns.append(high_current)

# パターン5: 複合異常
combined = np.random.multivariate_normal([90, 0.6, 2.8, 20, 80], cov_normal, n_anomaly // 5)
anomaly_patterns.append(combined)

anomaly_data = np.vstack(anomaly_patterns)

# データフレーム作成
columns = ['temperature', 'vibration', 'pressure', 'current', 'humidity']
df_normal = pd.DataFrame(normal_data, columns=columns)
df_normal['label'] = 0  # 正常

df_anomaly = pd.DataFrame(anomaly_data, columns=columns)
df_anomaly['label'] = 1  # 異常

df = pd.concat([df_normal, df_anomaly], ignore_index=True)
df = df.sample(frac=1, random_state=42).reset_index(drop=True)  # シャッフル

# データの標準化
X = df[columns].values
y_true = df['label'].values

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Isolation Forest モデル
iso_forest = IsolationForest(
    contamination=0.1,  # 異常データの割合の推定値
    random_state=42,
    n_estimators=100,
    max_samples='auto'
)

# 異常スコアの計算
y_pred = iso_forest.fit_predict(X_scaled)
anomaly_scores = iso_forest.score_samples(X_scaled)

# 予測ラベルを0/1に変換(-1→1(異常), 1→0(正常))
y_pred_binary = np.where(y_pred == -1, 1, 0)

# 評価指標の計算
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix

precision = precision_score(y_true, y_pred_binary)
recall = recall_score(y_true, y_pred_binary)
f1 = f1_score(y_true, y_pred_binary)
cm = confusion_matrix(y_true, y_pred_binary)

print("=" * 60)
print("異常検出システム評価(Isolation Forest)")
print("=" * 60)
print(f"\n総サンプル数: {len(df)}")
print(f"正常データ: {n_normal} ({n_normal/len(df)*100:.1f}%)")
print(f"異常データ: {n_anomaly} ({n_anomaly/len(df)*100:.1f}%)")
print(f"\n検出性能:")
print(f"適合率 (Precision): {precision:.3f}")
print(f"再現率 (Recall): {recall:.3f}")
print(f"F1スコア: {f1:.3f}")
print(f"\n混同行列:")
print(f"真陰性: {cm[0,0]}, 偽陽性: {cm[0,1]}")
print(f"偽陰性: {cm[1,0]}, 真陽性: {cm[1,1]}")

# PCAで2次元に次元削減(可視化用)
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

# 可視化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# PCA空間での異常検出結果
scatter1 = axes[0, 0].scatter(X_pca[y_pred_binary==0, 0], X_pca[y_pred_binary==0, 1],
                               c='green', s=20, alpha=0.6, label='正常と判定')
scatter2 = axes[0, 0].scatter(X_pca[y_pred_binary==1, 0], X_pca[y_pred_binary==1, 1],
                               c='red', s=40, alpha=0.8, marker='X', label='異常と判定')
axes[0, 0].set_xlabel(f'第1主成分 (寄与率: {pca.explained_variance_ratio_[0]:.1%})')
axes[0, 0].set_ylabel(f'第2主成分 (寄与率: {pca.explained_variance_ratio_[1]:.1%})')
axes[0, 0].set_title('異常検出結果(PCA空間)', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)

# 異常スコアの分布
axes[0, 1].hist(anomaly_scores[y_true==0], bins=50, alpha=0.6, label='正常', color='green')
axes[0, 1].hist(anomaly_scores[y_true==1], bins=50, alpha=0.6, label='異常', color='red')
axes[0, 1].set_xlabel('異常スコア(小さいほど異常)')
axes[0, 1].set_ylabel('頻度')
axes[0, 1].set_title('異常スコア分布', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)

# 混同行列ヒートマップ
import seaborn as sns
sns.heatmap(cm, annot=True, fmt='d', cmap='RdYlGn', ax=axes[1, 0],
            xticklabels=['正常', '異常'], yticklabels=['正常', '異常'])
axes[1, 0].set_title('混同行列', fontsize=12, fontweight='bold')
axes[1, 0].set_ylabel('真のラベル')
axes[1, 0].set_xlabel('予測ラベル')

# 時系列での異常検出(模擬的なリアルタイム監視)
time_series_length = 200
time_indices = np.arange(time_series_length)

# 時系列データの生成(正常→異常→正常)
ts_data = []
for i in range(time_series_length):
    if 80 <= i <= 120:  # 異常期間
        sample = np.random.multivariate_normal([90, 0.6, 2.8, 20, 80], cov_normal)
    else:
        sample = np.random.multivariate_normal(mean_normal, cov_normal)
    ts_data.append(sample)

ts_data = np.array(ts_data)
ts_scaled = scaler.transform(ts_data)
ts_scores = iso_forest.score_samples(ts_scaled)
ts_pred = iso_forest.predict(ts_scaled)

# 異常閾値の計算(異常スコアの下位10%点)
threshold = np.percentile(anomaly_scores, 10)

axes[1, 1].plot(time_indices, ts_scores, color='#11998e', linewidth=1.5, label='異常スコア')
axes[1, 1].axhline(y=threshold, color='red', linestyle='--', linewidth=2, label=f'異常閾値 ({threshold:.3f})')
axes[1, 1].fill_between(time_indices, threshold, ts_scores.min(),
                         where=(ts_scores < threshold), alpha=0.3, color='red', label='異常検出')
axes[1, 1].set_xlabel('時刻')
axes[1, 1].set_ylabel('異常スコア')
axes[1, 1].set_title('リアルタイム異常検出(模擬)', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)

plt.tight_layout()
plt.savefig('anomaly_detection_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

# 異常検出時の警報メッセージ生成
detected_anomalies = np.where(ts_pred == -1)[0]
if len(detected_anomalies) > 0:
    print(f"\n⚠️ 警報: {len(detected_anomalies)}件の異常を検出")
    print(f"異常検出時刻: {detected_anomalies[:10]}..." if len(detected_anomalies) > 10 else f"異常検出時刻: {detected_anomalies}")

実装のポイント:

🔍 4.4 根本原因分析(RCA)

トラブル発生時の分析フロー

  1. 症状の特定: 何が起こったか(品質不良、設備停止など)
  2. 直接原因の調査: 直前の変化・イベントの確認
  3. 根本原因の特定: なぜそれが起こったか(5回のWhy)
  4. 対策の立案: 再発防止策の検討

データ駆動型RCA

大量のプロセスデータから、品質異常や設備故障に寄与した要因を特定するには、 統計的手法や機械学習が有効です。相関分析、決定木、SHAP値などを用いて、 影響度の高いパラメータを可視化します。

💻 コード例4.4: 決定木とSHAP値による根本原因分析

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.tree import DecisionTreeClassifier, plot_tree
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')

plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 品質不良データの生成(根本原因が明確なシミュレーションデータ)
np.random.seed(42)
n_samples = 500

# 正常品(60%)
n_normal = int(n_samples * 0.6)
temp_normal = np.random.normal(75, 3, n_normal)
ph_normal = np.random.normal(6.5, 0.2, n_normal)
humidity_normal = np.random.normal(50, 5, n_normal)
mixing_time_normal = np.random.normal(120, 10, n_normal)
additive_amount_normal = np.random.normal(2.0, 0.1, n_normal)

# 不良品パターン1: 高温起因(20%)
n_defect1 = int(n_samples * 0.2)
temp_defect1 = np.random.normal(90, 5, n_defect1)
ph_defect1 = np.random.normal(6.5, 0.2, n_defect1)
humidity_defect1 = np.random.normal(50, 5, n_defect1)
mixing_time_defect1 = np.random.normal(120, 10, n_defect1)
additive_amount_defect1 = np.random.normal(2.0, 0.1, n_defect1)

# 不良品パターン2: pH異常起因(10%)
n_defect2 = int(n_samples * 0.1)
temp_defect2 = np.random.normal(75, 3, n_defect2)
ph_defect2 = np.random.normal(7.5, 0.3, n_defect2)
humidity_defect2 = np.random.normal(50, 5, n_defect2)
mixing_time_defect2 = np.random.normal(120, 10, n_defect2)
additive_amount_defect2 = np.random.normal(2.0, 0.1, n_defect2)

# 不良品パターン3: 添加剤量不足起因(10%)
n_defect3 = n_samples - n_normal - n_defect1 - n_defect2
temp_defect3 = np.random.normal(75, 3, n_defect3)
ph_defect3 = np.random.normal(6.5, 0.2, n_defect3)
humidity_defect3 = np.random.normal(50, 5, n_defect3)
mixing_time_defect3 = np.random.normal(120, 10, n_defect3)
additive_amount_defect3 = np.random.normal(1.5, 0.15, n_defect3)

# データフレーム作成
data = pd.DataFrame({
    'temperature': np.concatenate([temp_normal, temp_defect1, temp_defect2, temp_defect3]),
    'pH': np.concatenate([ph_normal, ph_defect1, ph_defect2, ph_defect3]),
    'humidity': np.concatenate([humidity_normal, humidity_defect1, humidity_defect2, humidity_defect3]),
    'mixing_time': np.concatenate([mixing_time_normal, mixing_time_defect1, mixing_time_defect2, mixing_time_defect3]),
    'additive_amount': np.concatenate([additive_amount_normal, additive_amount_defect1, additive_amount_defect2, additive_amount_defect3]),
    'quality': np.concatenate([
        np.zeros(n_normal),
        np.ones(n_defect1),
        np.ones(n_defect2),
        np.ones(n_defect3)
    ])
})

# データのシャッフル
data = data.sample(frac=1, random_state=42).reset_index(drop=True)

# 訓練データとテストデータの分割
X = data.drop('quality', axis=1)
y = data['quality']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

# 決定木モデルの訓練(深さを制限して解釈性を重視)
dt_model = DecisionTreeClassifier(
    max_depth=4,
    min_samples_split=20,
    min_samples_leaf=10,
    random_state=42
)
dt_model.fit(X_train, y_train)

# 予測と評価
y_pred = dt_model.predict(X_test)
from sklearn.metrics import classification_report, accuracy_score

accuracy = accuracy_score(y_test, y_pred)

print("=" * 60)
print("根本原因分析(Decision Tree)")
print("=" * 60)
print(f"\nモデル精度: {accuracy:.3f}")
print("\n分類レポート:")
print(classification_report(y_test, y_pred, target_names=['正常', '不良']))

# 特徴量重要度
feature_importance = pd.DataFrame({
    'feature': X.columns,
    'importance': dt_model.feature_importances_
}).sort_values('importance', ascending=False)

print("\n特徴量重要度(品質不良への寄与度):")
print(feature_importance.to_string(index=False))

# 可視化
fig = plt.figure(figsize=(16, 10))

# 決定木の可視化
ax1 = plt.subplot(2, 2, (1, 2))
plot_tree(dt_model, feature_names=X.columns, class_names=['正常', '不良'],
          filled=True, rounded=True, fontsize=9, ax=ax1)
ax1.set_title('決定木による根本原因分析', fontsize=14, fontweight='bold')

# 特徴量重要度
ax2 = plt.subplot(2, 2, 3)
bars = ax2.barh(feature_importance['feature'], feature_importance['importance'], color='#38ef7d')
ax2.set_xlabel('重要度スコア')
ax2.set_title('品質不良への寄与度', fontsize=12, fontweight='bold')
ax2.grid(axis='x', alpha=0.3)

# 重要な特徴量の分布比較
ax3 = plt.subplot(2, 2, 4)
top_feature = feature_importance.iloc[0]['feature']

normal_values = data[data['quality'] == 0][top_feature]
defect_values = data[data['quality'] == 1][top_feature]

ax3.hist(normal_values, bins=30, alpha=0.6, label='正常', color='green')
ax3.hist(defect_values, bins=30, alpha=0.6, label='不良', color='red')
ax3.set_xlabel(f'{top_feature}')
ax3.set_ylabel('頻度')
ax3.set_title(f'最重要因子の分布比較: {top_feature}', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(alpha=0.3)

plt.tight_layout()
plt.savefig('root_cause_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

# RCA レポート生成
print("\n" + "=" * 60)
print("根本原因分析レポート")
print("=" * 60)

# 決定木のルール抽出(トップノードのみ)
from sklearn.tree import _tree

def extract_rules(tree, feature_names, node=0, depth=0, max_depth=2):
    """決定木からルールを抽出"""
    if depth > max_depth:
        return

    feature = tree.feature[node]
    threshold = tree.threshold[node]

    if feature != _tree.TREE_UNDEFINED:
        name = feature_names[feature]
        print(f"{'  ' * depth}├─ {name} <= {threshold:.2f}の場合:")
        extract_rules(tree, feature_names, tree.children_left[node], depth + 1, max_depth)
        print(f"{'  ' * depth}└─ {name} > {threshold:.2f}の場合:")
        extract_rules(tree, feature_names, tree.children_right[node], depth + 1, max_depth)
    else:
        value = tree.value[node][0]
        total = value.sum()
        defect_rate = value[1] / total if total > 0 else 0
        print(f"{'  ' * depth}   → 不良率: {defect_rate:.1%} (サンプル数: {int(total)})")

print("\n主要な判定ルール:")
extract_rules(dt_model.tree_, X.columns)

print("\n推奨される対策:")
if feature_importance.iloc[0]['feature'] == 'temperature':
    print("・温度管理の強化: 75±5℃の範囲に制御")
    print("・冷却システムの点検と改善")
elif feature_importance.iloc[0]['feature'] == 'pH':
    print("・pH調整プロセスの見直し")
    print("・pH計の校正頻度の増加")
elif feature_importance.iloc[0]['feature'] == 'additive_amount':
    print("・添加剤の投入量管理の厳格化")
    print("・自動投入システムの導入検討")

実装のポイント:

📚 まとめ

本章では、AIを活用した予知保全とトラブルシューティングの技術を学びました。

主要なポイント

実務での応用

🎯 次章予告
第5章では、これまで学んだ技術を統合した実際のケーススタディを紹介します。 具体的な食品製造プロセス(乳製品、飲料、スナック食品など)における AIシステムの導入事例と、その効果について詳しく解説します。