Predictive Maintenance and Troubleshooting
食品製造プロセスにおいて、設備の突然の故障や品質トラブルは、生産停止や製品廃棄につながる重大な問題です。 本章では、AIを活用した予知保全(Predictive Maintenance)技術により、設備故障を事前に予測し、 計画的な保全を実現する方法を学びます。また、異常が発生した際の根本原因分析やトラブルシューティングの 意思決定支援システムについても解説します。
予知保全には主に以下の3つのアプローチがあります:
ここで、\( T_{\text{failure}} \) は故障発生時刻、\( t \) は現在時刻
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import warnings
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# 設備センサデータの生成(1000サンプル)
np.random.seed(42)
n_samples = 1000
# 正常運転データ(70%)
n_normal = int(n_samples * 0.7)
temp_normal = np.random.normal(75, 5, n_normal) # 温度(℃)
vibration_normal = np.random.normal(0.3, 0.05, n_normal) # 振動(mm/s)
pressure_normal = np.random.normal(2.0, 0.2, n_normal) # 圧力(MPa)
current_normal = np.random.normal(15, 2, n_normal) # 電流(A)
runtime_normal = np.random.uniform(0, 5000, n_normal) # 稼働時間(h)
# 故障前兆データ(30%)
n_failure = n_samples - n_normal
temp_failure = np.random.normal(90, 8, n_failure) # 高温傾向
vibration_failure = np.random.normal(0.6, 0.1, n_failure) # 振動増加
pressure_failure = np.random.normal(2.5, 0.3, n_failure) # 圧力上昇
current_failure = np.random.normal(20, 3, n_failure) # 電流増加
runtime_failure = np.random.uniform(4000, 8000, n_failure) # 長時間稼働
# データフレーム作成
data = pd.DataFrame({
'temperature': np.concatenate([temp_normal, temp_failure]),
'vibration': np.concatenate([vibration_normal, vibration_failure]),
'pressure': np.concatenate([pressure_normal, pressure_failure]),
'current': np.concatenate([current_normal, current_failure]),
'runtime_hours': np.concatenate([runtime_normal, runtime_failure]),
'failure_risk': np.concatenate([np.zeros(n_normal), np.ones(n_failure)])
})
# 派生特徴量の作成
data['temp_vibration_ratio'] = data['temperature'] / (data['vibration'] * 100)
data['power_consumption'] = data['current'] * data['pressure'] # 仮想的な電力消費量
data['runtime_category'] = pd.cut(data['runtime_hours'], bins=5, labels=['Very Low', 'Low', 'Medium', 'High', 'Very High'])
data['runtime_category'] = data['runtime_category'].cat.codes
# 訓練データとテストデータの分割
X = data.drop('failure_risk', axis=1)
y = data['failure_risk']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)
# Random Forestモデルの訓練
rf_model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=10,
min_samples_leaf=5,
random_state=42,
class_weight='balanced'
)
rf_model.fit(X_train, y_train)
# 予測と評価
y_pred = rf_model.predict(X_test)
y_pred_proba = rf_model.predict_proba(X_test)[:, 1]
print("=" * 60)
print("設備故障予測モデル評価")
print("=" * 60)
print("\n分類レポート:")
print(classification_report(y_test, y_pred, target_names=['正常', '故障リスク']))
# 可視化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 混同行列
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Greens', ax=axes[0, 0],
xticklabels=['正常', '故障リスク'], yticklabels=['正常', '故障リスク'])
axes[0, 0].set_title('混同行列', fontsize=12, fontweight='bold')
axes[0, 0].set_ylabel('真のラベル')
axes[0, 0].set_xlabel('予測ラベル')
# ROC曲線
fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba)
roc_auc = auc(fpr, tpr)
axes[0, 1].plot(fpr, tpr, color='#11998e', lw=2, label=f'ROC曲線 (AUC = {roc_auc:.3f})')
axes[0, 1].plot([0, 1], [0, 1], color='gray', lw=1, linestyle='--', label='ランダム')
axes[0, 1].set_xlim([0.0, 1.0])
axes[0, 1].set_ylim([0.0, 1.05])
axes[0, 1].set_xlabel('偽陽性率 (False Positive Rate)')
axes[0, 1].set_ylabel('真陽性率 (True Positive Rate)')
axes[0, 1].set_title('ROC曲線', fontsize=12, fontweight='bold')
axes[0, 1].legend(loc='lower right')
axes[0, 1].grid(alpha=0.3)
# 特徴量重要度
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': rf_model.feature_importances_
}).sort_values('importance', ascending=True)
axes[1, 0].barh(feature_importance['feature'], feature_importance['importance'], color='#38ef7d')
axes[1, 0].set_xlabel('重要度')
axes[1, 0].set_title('特徴量重要度', fontsize=12, fontweight='bold')
axes[1, 0].grid(axis='x', alpha=0.3)
# 故障リスクスコア分布
axes[1, 1].hist(y_pred_proba[y_test == 0], bins=30, alpha=0.6, label='正常', color='green')
axes[1, 1].hist(y_pred_proba[y_test == 1], bins=30, alpha=0.6, label='故障リスク', color='red')
axes[1, 1].axvline(x=0.5, color='black', linestyle='--', linewidth=1, label='閾値 (0.5)')
axes[1, 1].set_xlabel('故障リスクスコア')
axes[1, 1].set_ylabel('頻度')
axes[1, 1].set_title('故障リスクスコア分布', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.savefig('failure_prediction_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
print(f"\nROC-AUC スコア: {roc_auc:.4f}")
print("\n特徴量重要度トップ3:")
print(feature_importance.tail(3).to_string(index=False))
実装のポイント:
故障を「いつ起こるか」まで予測できれば、最適な保全タイミングを計画できます。 RUL推定は、現在の設備状態から故障までの残り時間を推定する技術です。
ここで、\( \tau \) は劣化時定数、健全度が閾値(例: 20%)を下回ると故障と判定
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import warnings
warnings.filterwarnings('ignore')
# TensorFlow/Kerasのインポート(実際の実装では必要)
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import LSTM, Dense, Dropout
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# 設備劣化データの生成(模擬的なRULデータ)
np.random.seed(42)
n_cycles = 200 # ライフサイクル数
time_steps = 100 # 各サイクルのタイムステップ数
# 3台の設備の劣化パターンを生成
def generate_degradation_data(n_cycles, time_steps, failure_threshold=20):
"""設備劣化データの生成"""
data_list = []
for cycle_id in range(n_cycles):
# ランダムな劣化速度
tau = np.random.uniform(60, 120) # 劣化時定数
noise_level = np.random.uniform(2, 5)
# 健全度の計算(指数関数的劣化 + ノイズ)
t = np.arange(time_steps)
health_index = 100 * np.exp(-t / tau) + np.random.normal(0, noise_level, time_steps)
health_index = np.clip(health_index, 0, 100)
# RUL計算(健全度が閾値を下回るまでの時間)
failure_time = np.where(health_index < failure_threshold)[0]
if len(failure_time) > 0:
failure_step = failure_time[0]
else:
failure_step = time_steps
rul = np.maximum(0, failure_step - t)
# センサデータ(健全度に応じて変化)
temperature = 60 + (100 - health_index) * 0.5 + np.random.normal(0, 2, time_steps)
vibration = 0.2 + (100 - health_index) * 0.008 + np.random.normal(0, 0.05, time_steps)
pressure = 1.8 + (100 - health_index) * 0.01 + np.random.normal(0, 0.1, time_steps)
for step in range(time_steps):
data_list.append({
'cycle_id': cycle_id,
'time_step': step,
'temperature': temperature[step],
'vibration': vibration[step],
'pressure': pressure[step],
'health_index': health_index[step],
'RUL': rul[step]
})
return pd.DataFrame(data_list)
# データ生成
degradation_data = generate_degradation_data(n_cycles, time_steps)
# データの前処理
feature_columns = ['temperature', 'vibration', 'pressure', 'health_index']
scaler = MinMaxScaler()
degradation_data[feature_columns] = scaler.fit_transform(degradation_data[feature_columns])
# RULの正規化(0-1範囲)
max_rul = degradation_data['RUL'].max()
degradation_data['RUL_normalized'] = degradation_data['RUL'] / max_rul
# 訓練データとテストデータの分割
train_cycles = int(n_cycles * 0.8)
train_data = degradation_data[degradation_data['cycle_id'] < train_cycles]
test_data = degradation_data[degradation_data['cycle_id'] >= train_cycles]
print("=" * 60)
print("残存有用寿命(RUL)推定システム")
print("=" * 60)
print(f"\n総サイクル数: {n_cycles}")
print(f"訓練サイクル: {train_cycles}")
print(f"テストサイクル: {n_cycles - train_cycles}")
print(f"各サイクルのタイムステップ数: {time_steps}")
print(f"\nRUL範囲: 0 ~ {max_rul:.0f} ステップ")
# 簡易的な予測(実際のLSTMの代わりに移動平均ベースの予測を使用)
def simple_rul_prediction(data, window=10):
"""移動平均ベースの簡易RUL予測"""
predictions = []
actuals = []
for cycle_id in data['cycle_id'].unique():
cycle_data = data[data['cycle_id'] == cycle_id].copy()
for i in range(window, len(cycle_data)):
# 過去windowステップの健全度の平均的な低下率から予測
recent_health = cycle_data.iloc[i-window:i]['health_index'].values
health_decline_rate = (recent_health[0] - recent_health[-1]) / window
current_health = cycle_data.iloc[i]['health_index']
# 健全度が20%に達するまでのステップ数を推定
if health_decline_rate > 0.0001:
estimated_rul = max(0, (current_health - 0.2) / health_decline_rate)
else:
estimated_rul = max_rul # 劣化が見られない場合
estimated_rul = min(estimated_rul, max_rul) # 上限制約
predictions.append(estimated_rul / max_rul) # 正規化
actuals.append(cycle_data.iloc[i]['RUL_normalized'])
return np.array(predictions), np.array(actuals)
# テストデータで予測
y_pred, y_true = simple_rul_prediction(test_data, window=10)
# 評価指標
mae = mean_absolute_error(y_true * max_rul, y_pred * max_rul)
rmse = np.sqrt(mean_squared_error(y_true * max_rul, y_pred * max_rul))
print(f"\n予測性能:")
print(f"MAE (平均絶対誤差): {mae:.2f} ステップ")
print(f"RMSE (二乗平均平方根誤差): {rmse:.2f} ステップ")
# 可視化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# サンプルサイクルの劣化曲線
sample_cycles = [0, 1, 2]
for i, cycle_id in enumerate(sample_cycles):
cycle_data = degradation_data[degradation_data['cycle_id'] == cycle_id]
axes[0, 0].plot(cycle_data['time_step'], cycle_data['health_index'],
label=f'サイクル {cycle_id}', alpha=0.7)
axes[0, 0].axhline(y=0.2, color='red', linestyle='--', linewidth=1, label='故障閾値 (20%)')
axes[0, 0].set_xlabel('タイムステップ')
axes[0, 0].set_ylabel('健全度(正規化)')
axes[0, 0].set_title('設備劣化曲線(サンプル)', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# RUL予測 vs 実測
axes[0, 1].scatter(y_true * max_rul, y_pred * max_rul, alpha=0.3, s=10, color='#11998e')
axes[0, 1].plot([0, max_rul], [0, max_rul], 'r--', lw=2, label='理想的な予測')
axes[0, 1].set_xlabel('実測RUL(ステップ)')
axes[0, 1].set_ylabel('予測RUL(ステップ)')
axes[0, 1].set_title('RUL予測精度', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# 予測誤差の分布
errors = (y_pred - y_true) * max_rul
axes[1, 0].hist(errors, bins=50, color='#38ef7d', alpha=0.7, edgecolor='black')
axes[1, 0].axvline(x=0, color='red', linestyle='--', linewidth=2, label='誤差ゼロ')
axes[1, 0].set_xlabel('予測誤差(ステップ)')
axes[1, 0].set_ylabel('頻度')
axes[1, 0].set_title('RUL予測誤差分布', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# 特定サイクルのRUL時系列
test_cycle = test_data['cycle_id'].unique()[0]
test_cycle_data = test_data[test_data['cycle_id'] == test_cycle].iloc[10:] # windowサイズ分スキップ
cycle_pred, cycle_true = simple_rul_prediction(
test_data[test_data['cycle_id'] == test_cycle], window=10
)
axes[1, 1].plot(test_cycle_data['time_step'].values, cycle_true * max_rul,
label='実測RUL', color='blue', linewidth=2)
axes[1, 1].plot(test_cycle_data['time_step'].values, cycle_pred * max_rul,
label='予測RUL', color='orange', linewidth=2, linestyle='--')
axes[1, 1].fill_between(test_cycle_data['time_step'].values,
cycle_pred * max_rul - 10, cycle_pred * max_rul + 10,
alpha=0.2, color='orange', label='予測誤差範囲 (±10)')
axes[1, 1].set_xlabel('タイムステップ')
axes[1, 1].set_ylabel('RUL(ステップ)')
axes[1, 1].set_title(f'RUL時系列予測(サイクル {test_cycle})', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.savefig('rul_estimation_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
実装のポイント:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import warnings
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# 正常運転データの生成
np.random.seed(42)
n_normal = 800
n_anomaly = 50
# 正常データ(多変量正規分布)
mean_normal = [70, 0.25, 2.0, 14, 50]
cov_normal = [[25, 0, 0, 0, 0],
[0, 0.0025, 0, 0, 0],
[0, 0, 0.04, 0, 0],
[0, 0, 0, 4, 0],
[0, 0, 0, 0, 100]]
normal_data = np.random.multivariate_normal(mean_normal, cov_normal, n_normal)
# 異常データ(様々な異常パターン)
anomaly_patterns = []
# パターン1: 高温異常
high_temp = np.random.multivariate_normal([95, 0.25, 2.0, 14, 50], cov_normal, n_anomaly // 5)
anomaly_patterns.append(high_temp)
# パターン2: 振動異常
high_vibration = np.random.multivariate_normal([70, 0.7, 2.0, 14, 50], cov_normal, n_anomaly // 5)
anomaly_patterns.append(high_vibration)
# パターン3: 圧力異常
high_pressure = np.random.multivariate_normal([70, 0.25, 3.5, 14, 50], cov_normal, n_anomaly // 5)
anomaly_patterns.append(high_pressure)
# パターン4: 電流異常
high_current = np.random.multivariate_normal([70, 0.25, 2.0, 25, 50], cov_normal, n_anomaly // 5)
anomaly_patterns.append(high_current)
# パターン5: 複合異常
combined = np.random.multivariate_normal([90, 0.6, 2.8, 20, 80], cov_normal, n_anomaly // 5)
anomaly_patterns.append(combined)
anomaly_data = np.vstack(anomaly_patterns)
# データフレーム作成
columns = ['temperature', 'vibration', 'pressure', 'current', 'humidity']
df_normal = pd.DataFrame(normal_data, columns=columns)
df_normal['label'] = 0 # 正常
df_anomaly = pd.DataFrame(anomaly_data, columns=columns)
df_anomaly['label'] = 1 # 異常
df = pd.concat([df_normal, df_anomaly], ignore_index=True)
df = df.sample(frac=1, random_state=42).reset_index(drop=True) # シャッフル
# データの標準化
X = df[columns].values
y_true = df['label'].values
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Isolation Forest モデル
iso_forest = IsolationForest(
contamination=0.1, # 異常データの割合の推定値
random_state=42,
n_estimators=100,
max_samples='auto'
)
# 異常スコアの計算
y_pred = iso_forest.fit_predict(X_scaled)
anomaly_scores = iso_forest.score_samples(X_scaled)
# 予測ラベルを0/1に変換(-1→1(異常), 1→0(正常))
y_pred_binary = np.where(y_pred == -1, 1, 0)
# 評価指標の計算
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
precision = precision_score(y_true, y_pred_binary)
recall = recall_score(y_true, y_pred_binary)
f1 = f1_score(y_true, y_pred_binary)
cm = confusion_matrix(y_true, y_pred_binary)
print("=" * 60)
print("異常検出システム評価(Isolation Forest)")
print("=" * 60)
print(f"\n総サンプル数: {len(df)}")
print(f"正常データ: {n_normal} ({n_normal/len(df)*100:.1f}%)")
print(f"異常データ: {n_anomaly} ({n_anomaly/len(df)*100:.1f}%)")
print(f"\n検出性能:")
print(f"適合率 (Precision): {precision:.3f}")
print(f"再現率 (Recall): {recall:.3f}")
print(f"F1スコア: {f1:.3f}")
print(f"\n混同行列:")
print(f"真陰性: {cm[0,0]}, 偽陽性: {cm[0,1]}")
print(f"偽陰性: {cm[1,0]}, 真陽性: {cm[1,1]}")
# PCAで2次元に次元削減(可視化用)
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)
# 可視化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# PCA空間での異常検出結果
scatter1 = axes[0, 0].scatter(X_pca[y_pred_binary==0, 0], X_pca[y_pred_binary==0, 1],
c='green', s=20, alpha=0.6, label='正常と判定')
scatter2 = axes[0, 0].scatter(X_pca[y_pred_binary==1, 0], X_pca[y_pred_binary==1, 1],
c='red', s=40, alpha=0.8, marker='X', label='異常と判定')
axes[0, 0].set_xlabel(f'第1主成分 (寄与率: {pca.explained_variance_ratio_[0]:.1%})')
axes[0, 0].set_ylabel(f'第2主成分 (寄与率: {pca.explained_variance_ratio_[1]:.1%})')
axes[0, 0].set_title('異常検出結果(PCA空間)', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# 異常スコアの分布
axes[0, 1].hist(anomaly_scores[y_true==0], bins=50, alpha=0.6, label='正常', color='green')
axes[0, 1].hist(anomaly_scores[y_true==1], bins=50, alpha=0.6, label='異常', color='red')
axes[0, 1].set_xlabel('異常スコア(小さいほど異常)')
axes[0, 1].set_ylabel('頻度')
axes[0, 1].set_title('異常スコア分布', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# 混同行列ヒートマップ
import seaborn as sns
sns.heatmap(cm, annot=True, fmt='d', cmap='RdYlGn', ax=axes[1, 0],
xticklabels=['正常', '異常'], yticklabels=['正常', '異常'])
axes[1, 0].set_title('混同行列', fontsize=12, fontweight='bold')
axes[1, 0].set_ylabel('真のラベル')
axes[1, 0].set_xlabel('予測ラベル')
# 時系列での異常検出(模擬的なリアルタイム監視)
time_series_length = 200
time_indices = np.arange(time_series_length)
# 時系列データの生成(正常→異常→正常)
ts_data = []
for i in range(time_series_length):
if 80 <= i <= 120: # 異常期間
sample = np.random.multivariate_normal([90, 0.6, 2.8, 20, 80], cov_normal)
else:
sample = np.random.multivariate_normal(mean_normal, cov_normal)
ts_data.append(sample)
ts_data = np.array(ts_data)
ts_scaled = scaler.transform(ts_data)
ts_scores = iso_forest.score_samples(ts_scaled)
ts_pred = iso_forest.predict(ts_scaled)
# 異常閾値の計算(異常スコアの下位10%点)
threshold = np.percentile(anomaly_scores, 10)
axes[1, 1].plot(time_indices, ts_scores, color='#11998e', linewidth=1.5, label='異常スコア')
axes[1, 1].axhline(y=threshold, color='red', linestyle='--', linewidth=2, label=f'異常閾値 ({threshold:.3f})')
axes[1, 1].fill_between(time_indices, threshold, ts_scores.min(),
where=(ts_scores < threshold), alpha=0.3, color='red', label='異常検出')
axes[1, 1].set_xlabel('時刻')
axes[1, 1].set_ylabel('異常スコア')
axes[1, 1].set_title('リアルタイム異常検出(模擬)', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.savefig('anomaly_detection_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
# 異常検出時の警報メッセージ生成
detected_anomalies = np.where(ts_pred == -1)[0]
if len(detected_anomalies) > 0:
print(f"\n⚠️ 警報: {len(detected_anomalies)}件の異常を検出")
print(f"異常検出時刻: {detected_anomalies[:10]}..." if len(detected_anomalies) > 10 else f"異常検出時刻: {detected_anomalies}")
実装のポイント:
大量のプロセスデータから、品質異常や設備故障に寄与した要因を特定するには、 統計的手法や機械学習が有効です。相関分析、決定木、SHAP値などを用いて、 影響度の高いパラメータを可視化します。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.tree import DecisionTreeClassifier, plot_tree
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# 品質不良データの生成(根本原因が明確なシミュレーションデータ)
np.random.seed(42)
n_samples = 500
# 正常品(60%)
n_normal = int(n_samples * 0.6)
temp_normal = np.random.normal(75, 3, n_normal)
ph_normal = np.random.normal(6.5, 0.2, n_normal)
humidity_normal = np.random.normal(50, 5, n_normal)
mixing_time_normal = np.random.normal(120, 10, n_normal)
additive_amount_normal = np.random.normal(2.0, 0.1, n_normal)
# 不良品パターン1: 高温起因(20%)
n_defect1 = int(n_samples * 0.2)
temp_defect1 = np.random.normal(90, 5, n_defect1)
ph_defect1 = np.random.normal(6.5, 0.2, n_defect1)
humidity_defect1 = np.random.normal(50, 5, n_defect1)
mixing_time_defect1 = np.random.normal(120, 10, n_defect1)
additive_amount_defect1 = np.random.normal(2.0, 0.1, n_defect1)
# 不良品パターン2: pH異常起因(10%)
n_defect2 = int(n_samples * 0.1)
temp_defect2 = np.random.normal(75, 3, n_defect2)
ph_defect2 = np.random.normal(7.5, 0.3, n_defect2)
humidity_defect2 = np.random.normal(50, 5, n_defect2)
mixing_time_defect2 = np.random.normal(120, 10, n_defect2)
additive_amount_defect2 = np.random.normal(2.0, 0.1, n_defect2)
# 不良品パターン3: 添加剤量不足起因(10%)
n_defect3 = n_samples - n_normal - n_defect1 - n_defect2
temp_defect3 = np.random.normal(75, 3, n_defect3)
ph_defect3 = np.random.normal(6.5, 0.2, n_defect3)
humidity_defect3 = np.random.normal(50, 5, n_defect3)
mixing_time_defect3 = np.random.normal(120, 10, n_defect3)
additive_amount_defect3 = np.random.normal(1.5, 0.15, n_defect3)
# データフレーム作成
data = pd.DataFrame({
'temperature': np.concatenate([temp_normal, temp_defect1, temp_defect2, temp_defect3]),
'pH': np.concatenate([ph_normal, ph_defect1, ph_defect2, ph_defect3]),
'humidity': np.concatenate([humidity_normal, humidity_defect1, humidity_defect2, humidity_defect3]),
'mixing_time': np.concatenate([mixing_time_normal, mixing_time_defect1, mixing_time_defect2, mixing_time_defect3]),
'additive_amount': np.concatenate([additive_amount_normal, additive_amount_defect1, additive_amount_defect2, additive_amount_defect3]),
'quality': np.concatenate([
np.zeros(n_normal),
np.ones(n_defect1),
np.ones(n_defect2),
np.ones(n_defect3)
])
})
# データのシャッフル
data = data.sample(frac=1, random_state=42).reset_index(drop=True)
# 訓練データとテストデータの分割
X = data.drop('quality', axis=1)
y = data['quality']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)
# 決定木モデルの訓練(深さを制限して解釈性を重視)
dt_model = DecisionTreeClassifier(
max_depth=4,
min_samples_split=20,
min_samples_leaf=10,
random_state=42
)
dt_model.fit(X_train, y_train)
# 予測と評価
y_pred = dt_model.predict(X_test)
from sklearn.metrics import classification_report, accuracy_score
accuracy = accuracy_score(y_test, y_pred)
print("=" * 60)
print("根本原因分析(Decision Tree)")
print("=" * 60)
print(f"\nモデル精度: {accuracy:.3f}")
print("\n分類レポート:")
print(classification_report(y_test, y_pred, target_names=['正常', '不良']))
# 特徴量重要度
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': dt_model.feature_importances_
}).sort_values('importance', ascending=False)
print("\n特徴量重要度(品質不良への寄与度):")
print(feature_importance.to_string(index=False))
# 可視化
fig = plt.figure(figsize=(16, 10))
# 決定木の可視化
ax1 = plt.subplot(2, 2, (1, 2))
plot_tree(dt_model, feature_names=X.columns, class_names=['正常', '不良'],
filled=True, rounded=True, fontsize=9, ax=ax1)
ax1.set_title('決定木による根本原因分析', fontsize=14, fontweight='bold')
# 特徴量重要度
ax2 = plt.subplot(2, 2, 3)
bars = ax2.barh(feature_importance['feature'], feature_importance['importance'], color='#38ef7d')
ax2.set_xlabel('重要度スコア')
ax2.set_title('品質不良への寄与度', fontsize=12, fontweight='bold')
ax2.grid(axis='x', alpha=0.3)
# 重要な特徴量の分布比較
ax3 = plt.subplot(2, 2, 4)
top_feature = feature_importance.iloc[0]['feature']
normal_values = data[data['quality'] == 0][top_feature]
defect_values = data[data['quality'] == 1][top_feature]
ax3.hist(normal_values, bins=30, alpha=0.6, label='正常', color='green')
ax3.hist(defect_values, bins=30, alpha=0.6, label='不良', color='red')
ax3.set_xlabel(f'{top_feature}')
ax3.set_ylabel('頻度')
ax3.set_title(f'最重要因子の分布比較: {top_feature}', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(alpha=0.3)
plt.tight_layout()
plt.savefig('root_cause_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
# RCA レポート生成
print("\n" + "=" * 60)
print("根本原因分析レポート")
print("=" * 60)
# 決定木のルール抽出(トップノードのみ)
from sklearn.tree import _tree
def extract_rules(tree, feature_names, node=0, depth=0, max_depth=2):
"""決定木からルールを抽出"""
if depth > max_depth:
return
feature = tree.feature[node]
threshold = tree.threshold[node]
if feature != _tree.TREE_UNDEFINED:
name = feature_names[feature]
print(f"{' ' * depth}├─ {name} <= {threshold:.2f}の場合:")
extract_rules(tree, feature_names, tree.children_left[node], depth + 1, max_depth)
print(f"{' ' * depth}└─ {name} > {threshold:.2f}の場合:")
extract_rules(tree, feature_names, tree.children_right[node], depth + 1, max_depth)
else:
value = tree.value[node][0]
total = value.sum()
defect_rate = value[1] / total if total > 0 else 0
print(f"{' ' * depth} → 不良率: {defect_rate:.1%} (サンプル数: {int(total)})")
print("\n主要な判定ルール:")
extract_rules(dt_model.tree_, X.columns)
print("\n推奨される対策:")
if feature_importance.iloc[0]['feature'] == 'temperature':
print("・温度管理の強化: 75±5℃の範囲に制御")
print("・冷却システムの点検と改善")
elif feature_importance.iloc[0]['feature'] == 'pH':
print("・pH調整プロセスの見直し")
print("・pH計の校正頻度の増加")
elif feature_importance.iloc[0]['feature'] == 'additive_amount':
print("・添加剤の投入量管理の厳格化")
print("・自動投入システムの導入検討")
実装のポイント:
本章では、AIを活用した予知保全とトラブルシューティングの技術を学びました。