学習目標
この章を読むことで、以下を習得できます:
- ✅ ルールベース異常検知の実装と限界を理解する
- ✅ 統計的異常検知手法(Z-score, 修正Z-score)を適用できる
- ✅ 機械学習による異常検知(Isolation Forest, One-Class SVM)を実装できる
- ✅ ディープラーニング(LSTM Autoencoder)で時系列異常を検知できる
- ✅ アラーム管理システムを設計し誤報を削減できる
3.1 異常検知の基礎
異常検知とは
異常検知(Anomaly Detection)は、正常なパターンから大きく逸脱したデータポイントやパターンを識別するプロセスです。プロセス産業では、設備故障、品質異常、プロセス異常の早期発見に不可欠です。
| 異常検知手法 | 特徴 | 利点 | 欠点 | 適用例 |
|---|---|---|---|---|
| ルールベース | 閾値による判定 | シンプル、解釈容易 | 複雑なパターン検出困難 | 温度・圧力監視 |
| 統計的手法 | 統計分布からの逸脱 | 理論的根拠あり | 正規分布仮定が必要 | プロセス変数監視 |
| 機械学習 | データからパターン学習 | 複雑なパターン検出可能 | 解釈性低い、データ必要 | 多変量異常検知 |
| ディープラーニング | 深層特徴抽出 | 高次元データ対応 | 計算コスト高、データ大量 | 時系列異常検知 |
異常検知システムのアーキテクチャ
3.2 コード例:異常検知手法の実装
コード例1: ルールベース異常検知(閾値ベース)
目的: シンプルな閾値ルールによる異常検知システムを実装する。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# 日本語フォント設定
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
np.random.seed(42)
# プロセスデータ生成(反応器温度、200サンプル)
n_samples = 200
time_index = pd.date_range('2025-01-01', periods=n_samples, freq='1min')
# 正常データ(平均175°C、標準偏差2°C)
temperature = np.random.normal(175, 2, n_samples)
# 意図的な異常を挿入
# 異常1: スパイク(サンプル50)
temperature[50] = 190
# 異常2: 徐々に上昇(サンプル120-140)
temperature[120:141] = 175 + np.linspace(0, 10, 21)
# 異常3: 低温異常(サンプル180-190)
temperature[180:191] = np.random.normal(165, 1.5, 11)
# DataFrameに格納
df = pd.DataFrame({
'timestamp': time_index,
'temperature': temperature
})
df.set_index('timestamp', inplace=True)
class RuleBasedAnomalyDetector:
"""ルールベース異常検知システム"""
def __init__(self, rules):
"""
Parameters:
-----------
rules : dict
異常検知ルールの辞書
例: {'upper_limit': 180, 'lower_limit': 170, ...}
"""
self.rules = rules
self.anomalies = []
def detect(self, data):
"""異常検知を実行"""
anomaly_flags = np.zeros(len(data), dtype=bool)
anomaly_types = [''] * len(data)
for i, value in enumerate(data):
# ルール1: 上限閾値チェック
if value > self.rules.get('upper_limit', float('inf')):
anomaly_flags[i] = True
anomaly_types[i] = '上限超過'
self.anomalies.append({
'index': i,
'value': value,
'type': '上限超過',
'severity': 'HIGH'
})
# ルール2: 下限閾値チェック
elif value < self.rules.get('lower_limit', float('-inf')):
anomaly_flags[i] = True
anomaly_types[i] = '下限未満'
self.anomalies.append({
'index': i,
'value': value,
'type': '下限未満',
'severity': 'HIGH'
})
# ルール3: 変化率チェック(前サンプルとの差)
if i > 0:
rate_of_change = abs(value - data[i-1])
if rate_of_change > self.rules.get('max_change_rate', float('inf')):
anomaly_flags[i] = True
anomaly_types[i] = '急激な変化'
self.anomalies.append({
'index': i,
'value': value,
'type': '急激な変化',
'severity': 'MEDIUM'
})
# ルール4: 連続逸脱チェック(警告範囲)
# 実装は次のループで実行(連続性の判定が必要)
# ルール4の実装: 連続5サンプルが警告範囲を超える
warning_upper = self.rules.get('warning_upper', float('inf'))
warning_lower = self.rules.get('warning_lower', float('-inf'))
consecutive_warning = 0
for i, value in enumerate(data):
if value > warning_upper or value < warning_lower:
consecutive_warning += 1
if consecutive_warning >= 5 and not anomaly_flags[i]:
anomaly_flags[i] = True
anomaly_types[i] = '連続警告'
self.anomalies.append({
'index': i,
'value': value,
'type': '連続警告',
'severity': 'MEDIUM'
})
else:
consecutive_warning = 0
return anomaly_flags, anomaly_types
# ルールベース異常検知器の設定
rules = {
'upper_limit': 180, # 上限閾値
'lower_limit': 170, # 下限閾値
'warning_upper': 178, # 警告上限
'warning_lower': 172, # 警告下限
'max_change_rate': 8 # 最大変化率(°C/分)
}
detector = RuleBasedAnomalyDetector(rules)
anomaly_flags, anomaly_types = detector.detect(df['temperature'].values)
# 可視化
fig, ax = plt.subplots(figsize=(16, 6))
# 正常データ
normal_mask = ~anomaly_flags
ax.plot(df.index[normal_mask], df['temperature'][normal_mask],
'o', color='#11998e', markersize=4, alpha=0.6, label='正常')
# 異常データ
anomaly_mask = anomaly_flags
ax.scatter(df.index[anomaly_mask], df['temperature'][anomaly_mask],
color='red', s=80, marker='o', zorder=5, label='異常')
# 閾値線
ax.axhline(y=rules['upper_limit'], color='red', linestyle='--',
linewidth=2, label=f'上限閾値 ({rules["upper_limit"]}°C)')
ax.axhline(y=rules['lower_limit'], color='red', linestyle='--',
linewidth=2, label=f'下限閾値 ({rules["lower_limit"]}°C)')
ax.axhline(y=rules['warning_upper'], color='orange', linestyle=':',
linewidth=1.5, alpha=0.7, label='警告範囲')
ax.axhline(y=rules['warning_lower'], color='orange', linestyle=':',
linewidth=1.5, alpha=0.7)
# 目標値
target = 175
ax.axhline(y=target, color='blue', linestyle='-', linewidth=2,
label=f'目標値 ({target}°C)')
ax.set_xlabel('時刻', fontsize=12)
ax.set_ylabel('温度 (°C)', fontsize=12)
ax.set_title('ルールベース異常検知', fontsize=14, fontweight='bold')
ax.legend(loc='upper left')
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 異常検知結果のサマリー
print("=== ルールベース異常検知 結果 ===")
print(f"\n総データポイント数: {len(df)}")
print(f"異常検出数: {anomaly_flags.sum()} ({anomaly_flags.sum()/len(df)*100:.1f}%)")
# 異常タイプ別の集計
df_anomalies = pd.DataFrame(detector.anomalies)
if len(df_anomalies) > 0:
print(f"\n【異常タイプ別集計】")
type_counts = df_anomalies['type'].value_counts()
for anomaly_type, count in type_counts.items():
print(f" {anomaly_type}: {count}件")
print(f"\n【重要度別集計】")
severity_counts = df_anomalies['severity'].value_counts()
for severity, count in severity_counts.items():
print(f" {severity}: {count}件")
# 最初の5件の異常を表示
print(f"\n【検出された異常(最初の5件)】")
for i, anomaly in enumerate(df_anomalies.head(5).to_dict('records')):
print(f"{i+1}. サンプル {anomaly['index']+1} | "
f"値: {anomaly['value']:.2f}°C | "
f"タイプ: {anomaly['type']} | "
f"重要度: {anomaly['severity']}")
解説: ルールベース異常検知は、明確な閾値や変化率に基づいて異常を判定する最もシンプルな手法です。実装が容易で解釈性が高い反面、複雑なパターンや文脈依存の異常を検出することが困難です。プロセス産業では、安全上重要な変数(温度、圧力)の一次監視として広く使用されています。
コード例2: 統計的異常検知(Z-scoreと修正Z-score)
目的: Z-scoreおよび修正Z-score(Median Absolute Deviation)を用いた統計的異常検知を実装する。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
np.random.seed(42)
# プロセスデータ生成
n_samples = 300
data = np.random.normal(100, 5, n_samples)
# 外れ値を追加
outlier_indices = [50, 100, 150, 200, 250]
data[outlier_indices] = [130, 70, 125, 65, 135]
def z_score_detection(data, threshold=3):
"""
Z-scoreによる異常検知
Parameters:
-----------
data : array-like
入力データ
threshold : float
異常判定の閾値(標準偏差の倍数)
Returns:
--------
anomalies : boolean array
異常フラグ
z_scores : array
Z-score値
"""
mean = np.mean(data)
std = np.std(data, ddof=1)
z_scores = np.abs((data - mean) / std)
anomalies = z_scores > threshold
return anomalies, z_scores
def modified_z_score_detection(data, threshold=3.5):
"""
修正Z-score(MADベース)による異常検知
外れ値に頑健な手法
Parameters:
-----------
data : array-like
入力データ
threshold : float
異常判定の閾値(デフォルト3.5)
Returns:
--------
anomalies : boolean array
異常フラグ
modified_z_scores : array
修正Z-score値
"""
median = np.median(data)
# MAD(Median Absolute Deviation)の計算
mad = np.median(np.abs(data - median))
# 修正Z-scoreの計算
# 定数0.6745はMADを標準偏差に変換するための係数
modified_z_scores = 0.6745 * (data - median) / mad
anomalies = np.abs(modified_z_scores) > threshold
return anomalies, modified_z_scores
# Z-score法による検出
anomalies_z, z_scores = z_score_detection(data, threshold=3)
# 修正Z-score法による検出
anomalies_modified, modified_z_scores = modified_z_score_detection(data, threshold=3.5)
# 可視化
fig, axes = plt.subplots(3, 1, figsize=(16, 12))
# 元データと異常検出結果(Z-score法)
axes[0].plot(range(n_samples), data, 'o', color='lightgray',
markersize=4, alpha=0.5, label='全データ')
axes[0].scatter(np.where(anomalies_z)[0], data[anomalies_z],
color='red', s=100, marker='o', zorder=5, label='異常(Z-score法)')
axes[0].axhline(y=np.mean(data), color='blue', linestyle='-', linewidth=2,
label=f'平均 = {np.mean(data):.2f}')
axes[0].axhline(y=np.mean(data) + 3*np.std(data), color='orange',
linestyle='--', linewidth=2, label='±3σ')
axes[0].axhline(y=np.mean(data) - 3*np.std(data), color='orange',
linestyle='--', linewidth=2)
axes[0].set_ylabel('データ値', fontsize=11)
axes[0].set_title('Z-score法による異常検知', fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)
# Z-scoreの可視化
axes[1].plot(range(n_samples), z_scores, 'o-', color='#11998e',
markersize=4, linewidth=1, alpha=0.7, label='Z-score')
axes[1].axhline(y=3, color='red', linestyle='--', linewidth=2,
label='閾値 = 3')
axes[1].scatter(np.where(anomalies_z)[0], z_scores[anomalies_z],
color='red', s=100, marker='o', zorder=5)
axes[1].set_ylabel('Z-score', fontsize=11)
axes[1].set_title('Z-scoreの分布', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
# 修正Z-scoreの可視化
axes[2].plot(range(n_samples), np.abs(modified_z_scores), 'o-', color='#7b2cbf',
markersize=4, linewidth=1, alpha=0.7, label='修正Z-score')
axes[2].axhline(y=3.5, color='red', linestyle='--', linewidth=2,
label='閾値 = 3.5')
axes[2].scatter(np.where(anomalies_modified)[0],
np.abs(modified_z_scores[anomalies_modified]),
color='red', s=100, marker='o', zorder=5)
axes[2].set_xlabel('サンプル番号', fontsize=11)
axes[2].set_ylabel('修正Z-score(絶対値)', fontsize=11)
axes[2].set_title('修正Z-scoreの分布(MADベース)', fontsize=13, fontweight='bold')
axes[2].legend()
axes[2].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 統計サマリー
print("=== 統計的異常検知 結果 ===")
print(f"\nデータ統計:")
print(f" 平均: {np.mean(data):.2f}")
print(f" 中央値: {np.median(data):.2f}")
print(f" 標準偏差: {np.std(data, ddof=1):.2f}")
print(f" MAD: {np.median(np.abs(data - np.median(data))):.2f}")
print(f"\n【Z-score法】")
print(f" 検出された異常数: {anomalies_z.sum()} ({anomalies_z.sum()/n_samples*100:.1f}%)")
if anomalies_z.any():
print(f" 異常サンプル: {np.where(anomalies_z)[0] + 1}")
print(f" 異常値: {data[anomalies_z]}")
print(f"\n【修正Z-score法(MADベース)】")
print(f" 検出された異常数: {anomalies_modified.sum()} "
f"({anomalies_modified.sum()/n_samples*100:.1f}%)")
if anomalies_modified.any():
print(f" 異常サンプル: {np.where(anomalies_modified)[0] + 1}")
print(f" 異常値: {data[anomalies_modified]}")
# 手法の比較
print(f"\n【手法比較】")
print(f" 両手法で検出: {np.sum(anomalies_z & anomalies_modified)}件")
print(f" Z-scoreのみ検出: {np.sum(anomalies_z & ~anomalies_modified)}件")
print(f" 修正Z-scoreのみ検出: {np.sum(~anomalies_z & anomalies_modified)}件")
print(f"\n【推奨】")
print(" - Z-score法: データが正規分布に従う場合に有効")
print(" - 修正Z-score法: 外れ値が多い場合に頑健(MADは外れ値の影響を受けにくい)")
解説: Z-score法は、データが正規分布に従うと仮定して、平均から何標準偏差離れているかを計算します。修正Z-score法は、平均の代わりに中央値、標準偏差の代わりにMAD(Median Absolute Deviation)を使用するため、外れ値の影響を受けにくい頑健な手法です。プロセスデータに外れ値が含まれる場合は、修正Z-score法が推奨されます。
コード例3: Isolation Forest(孤立森林法)による異常検知
目的: 機械学習ベースの異常検知手法であるIsolation Forestを実装する。
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
np.random.seed(42)
# 2変量プロセスデータ生成(温度と圧力)
n_normal = 300
n_anomalies = 20
# 正常データ(相関のある2変数)
mean_normal = [100, 50]
cov_normal = [[25, 10], [10, 16]]
normal_data = np.random.multivariate_normal(mean_normal, cov_normal, n_normal)
# 異常データ(正常範囲から外れた点)
anomaly_data = np.array([
np.random.uniform(85, 95, n_anomalies), # 低温
np.random.uniform(60, 70, n_anomalies) # 高圧
]).T
# データの結合
X = np.vstack([normal_data, anomaly_data])
labels_true = np.hstack([np.zeros(n_normal), np.ones(n_anomalies)])
# データの標準化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Isolation Forestモデルの訓練
clf = IsolationForest(
contamination=0.1, # 異常データの割合(10%と仮定)
random_state=42,
n_estimators=100 # 決定木の数
)
# 異常スコアの計算
y_pred = clf.fit_predict(X_scaled) # -1: 異常, 1: 正常
anomaly_scores = clf.score_samples(X_scaled) # 異常スコア(低いほど異常)
# 可視化
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
# 散布図(予測結果)
normal_pred = y_pred == 1
anomaly_pred = y_pred == -1
axes[0].scatter(X[normal_pred, 0], X[normal_pred, 1],
c='#11998e', s=40, alpha=0.6, label='正常(予測)')
axes[0].scatter(X[anomaly_pred, 0], X[anomaly_pred, 1],
c='red', s=80, marker='x', linewidths=2, label='異常(予測)')
# 真の異常データを強調
axes[0].scatter(X[n_normal:, 0], X[n_normal:, 1],
facecolors='none', edgecolors='orange', s=150,
linewidths=2, label='真の異常')
axes[0].set_xlabel('温度 (°C)', fontsize=12)
axes[0].set_ylabel('圧力 (kPa)', fontsize=12)
axes[0].set_title('Isolation Forest - 異常検知結果', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)
# 異常スコアの分布
axes[1].hist(anomaly_scores[labels_true == 0], bins=30, alpha=0.6,
color='#11998e', label='正常データ', density=True)
axes[1].hist(anomaly_scores[labels_true == 1], bins=30, alpha=0.6,
color='red', label='異常データ', density=True)
axes[1].axvline(x=np.percentile(anomaly_scores, 10), color='orange',
linestyle='--', linewidth=2, label='決定境界(10パーセンタイル)')
axes[1].set_xlabel('異常スコア', fontsize=12)
axes[1].set_ylabel('確率密度', fontsize=12)
axes[1].set_title('異常スコアの分布', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 性能評価
from sklearn.metrics import confusion_matrix, classification_report, roc_auc_score
# 予測ラベルを0/1に変換
y_pred_binary = (y_pred == -1).astype(int)
# 混同行列
cm = confusion_matrix(labels_true, y_pred_binary)
print("=== Isolation Forest 異常検知結果 ===")
print(f"\nデータ構成:")
print(f" 正常データ: {n_normal}サンプル")
print(f" 異常データ: {n_anomalies}サンプル")
print(f" 汚染率(contamination): {clf.contamination}")
print(f"\n混同行列:")
print(f" 予測")
print(f" 正常 異常")
print(f"実際 正常 {cm[0,0]:3d} {cm[0,1]:3d}")
print(f" 異常 {cm[1,0]:3d} {cm[1,1]:3d}")
# 評価指標
TP = cm[1, 1] # True Positive
TN = cm[0, 0] # True Negative
FP = cm[0, 1] # False Positive
FN = cm[1, 0] # False Negative
precision = TP / (TP + FP) if (TP + FP) > 0 else 0
recall = TP / (TP + FN) if (TP + FN) > 0 else 0
f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
accuracy = (TP + TN) / (TP + TN + FP + FN)
print(f"\n性能指標:")
print(f" 精度(Precision): {precision:.3f}")
print(f" 再現率(Recall): {recall:.3f}")
print(f" F1スコア: {f1_score:.3f}")
print(f" 正解率(Accuracy): {accuracy:.3f}")
# AUC-ROC
try:
auc_roc = roc_auc_score(labels_true, -anomaly_scores) # スコアが低いほど異常なので符号反転
print(f" AUC-ROC: {auc_roc:.3f}")
except:
print(" AUC-ROC: 計算できません")
print(f"\n異常スコア統計:")
print(f" 正常データの平均スコア: {np.mean(anomaly_scores[labels_true == 0]):.4f}")
print(f" 異常データの平均スコア: {np.mean(anomaly_scores[labels_true == 1]):.4f}")
解説: Isolation Forestは、異常データは正常データよりも「孤立しやすい」という原理に基づく教師なし学習手法です。ランダムに特徴と分割点を選び、異常データが少ない分割で孤立することを利用します。多変量データの異常検知に適しており、計算コストが低く、高次元データにも対応できる利点があります。プロセス産業では、複数のプロセス変数の組み合わせによる異常を検出する際に有効です。
コード例4: One-Class SVM(正常運転モデリング)
目的: 正常運転データのみから学習し、異常を検出するOne-Class SVMを実装する。
import numpy as np
import matplotlib.pyplot as plt
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
np.random.seed(42)
# 正常運転データの生成(訓練データ)
n_train = 200
train_data = np.random.multivariate_normal(
mean=[100, 50],
cov=[[20, 8], [8, 15]],
size=n_train
)
# テストデータの生成(正常 + 異常)
n_test_normal = 100
n_test_anomaly = 30
test_normal = np.random.multivariate_normal(
mean=[100, 50],
cov=[[20, 8], [8, 15]],
size=n_test_normal
)
# 異常データ(複数のパターン)
test_anomaly_1 = np.random.multivariate_normal(
mean=[85, 60], cov=[[10, 0], [0, 10]], size=n_test_anomaly//3
)
test_anomaly_2 = np.random.multivariate_normal(
mean=[115, 45], cov=[[10, 0], [0, 10]], size=n_test_anomaly//3
)
test_anomaly_3 = np.random.uniform(
low=[80, 35], high=[120, 65], size=(n_test_anomaly//3, 2)
)
test_data = np.vstack([test_normal, test_anomaly_1, test_anomaly_2, test_anomaly_3])
test_labels = np.hstack([
np.zeros(n_test_normal),
np.ones(n_test_anomaly)
])
# データの標準化
scaler = StandardScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled = scaler.transform(test_data)
# One-Class SVMモデルの訓練
oc_svm = OneClassSVM(
kernel='rbf', # RBFカーネル(放射基底関数)
gamma='auto', # カーネル係数
nu=0.05 # 異常データの上限割合(5%)
)
oc_svm.fit(train_scaled)
# テストデータでの予測
y_pred = oc_svm.predict(test_scaled) # 1: 正常, -1: 異常
decision_scores = oc_svm.decision_function(test_scaled) # 決定関数の値
# 可視化
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
# 訓練データと決定境界
xx, yy = np.meshgrid(
np.linspace(scaler.inverse_transform(train_scaled)[:, 0].min() - 10,
scaler.inverse_transform(train_scaled)[:, 0].max() + 10, 100),
np.linspace(scaler.inverse_transform(train_scaled)[:, 1].min() - 10,
scaler.inverse_transform(train_scaled)[:, 1].max() + 10, 100)
)
grid_scaled = scaler.transform(np.c_[xx.ravel(), yy.ravel()])
Z = oc_svm.decision_function(grid_scaled)
Z = Z.reshape(xx.shape)
# 決定境界のプロット
axes[0].contour(xx, yy, Z, levels=[0], linewidths=2, colors='orange',
linestyles='--', label='決定境界')
axes[0].contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7),
cmap='Reds', alpha=0.3)
# 訓練データ
axes[0].scatter(train_data[:, 0], train_data[:, 1],
c='#11998e', s=30, alpha=0.6, label='訓練データ(正常)')
# テストデータ
test_normal_pred = (y_pred == 1)
test_anomaly_pred = (y_pred == -1)
axes[0].scatter(test_data[test_normal_pred, 0], test_data[test_normal_pred, 1],
c='green', s=40, alpha=0.6, marker='o', label='正常(予測)')
axes[0].scatter(test_data[test_anomaly_pred, 0], test_data[test_anomaly_pred, 1],
c='red', s=60, alpha=0.8, marker='x', linewidths=2, label='異常(予測)')
axes[0].set_xlabel('温度 (°C)', fontsize=12)
axes[0].set_ylabel('圧力 (kPa)', fontsize=12)
axes[0].set_title('One-Class SVM - 決定境界と異常検知',
fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)
# 決定関数スコアの分布
axes[1].hist(decision_scores[test_labels == 0], bins=30, alpha=0.6,
color='#11998e', label='正常データ', density=True)
axes[1].hist(decision_scores[test_labels == 1], bins=30, alpha=0.6,
color='red', label='異常データ', density=True)
axes[1].axvline(x=0, color='orange', linestyle='--', linewidth=2,
label='決定境界')
axes[1].set_xlabel('決定関数スコア', fontsize=12)
axes[1].set_ylabel('確率密度', fontsize=12)
axes[1].set_title('決定関数スコアの分布', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 性能評価
from sklearn.metrics import confusion_matrix, classification_report
# 予測ラベルを0/1に変換
y_pred_binary = (y_pred == -1).astype(int)
cm = confusion_matrix(test_labels, y_pred_binary)
print("=== One-Class SVM 異常検知結果 ===")
print(f"\n訓練データ:")
print(f" サンプル数: {n_train}")
print(f" 特徴: 温度、圧力(2次元)")
print(f"\nテストデータ:")
print(f" 正常: {n_test_normal}サンプル")
print(f" 異常: {n_test_anomaly}サンプル")
print(f"\nハイパーパラメータ:")
print(f" カーネル: {oc_svm.kernel}")
print(f" nu(異常率上限): {oc_svm.nu}")
print(f"\n混同行列:")
print(f" 予測")
print(f" 正常 異常")
print(f"実際 正常 {cm[0,0]:3d} {cm[0,1]:3d}")
print(f" 異常 {cm[1,0]:3d} {cm[1,1]:3d}")
# 評価指標
TP = cm[1, 1]
TN = cm[0, 0]
FP = cm[0, 1]
FN = cm[1, 0]
precision = TP / (TP + FP) if (TP + FP) > 0 else 0
recall = TP / (TP + FN) if (TP + FN) > 0 else 0
f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
accuracy = (TP + TN) / len(test_labels)
print(f"\n性能指標:")
print(f" 精度(Precision): {precision:.3f}")
print(f" 再現率(Recall): {recall:.3f}")
print(f" F1スコア: {f1_score:.3f}")
print(f" 正解率(Accuracy): {accuracy:.3f}")
print(f"\n決定関数スコア統計:")
print(f" 正常データの平均スコア: {np.mean(decision_scores[test_labels == 0]):.4f}")
print(f" 異常データの平均スコア: {np.mean(decision_scores[test_labels == 1]):.4f}")
print(f"\n【特徴】")
print(" - 正常運転データのみで学習可能(教師なし学習)")
print(" - 非線形な決定境界を学習可能(RBFカーネル)")
print(" - nuパラメータで異常率を調整可能")
解説: One-Class SVMは、正常運転データのみから正常領域の境界を学習する教師なし学習手法です。RBFカーネルにより非線形な決定境界を学習でき、複雑な正常領域を表現できます。プロセス産業では、正常運転データは豊富だが異常データが少ない場合に有効です。nuパラメータで異常率の上限を設定でき、誤報率の調整が可能です。
コード例5: LSTM Autoencoderによる時系列異常検知
目的: ディープラーニング(LSTM Autoencoder)を用いた時系列データの異常検知を実装する。
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# 注: 実際のLSTM Autoencoderの訓練は省略し、概念を示すコードを提供します
# TensorFlow/Kerasが必要ですが、ここでは概念的な実装を示します
# 時系列データ生成
n_samples = 500
time = np.linspace(0, 100, n_samples)
# 正常な周期的パターン
normal_signal = 50 + 10 * np.sin(2 * np.pi * time / 20) + \
5 * np.sin(2 * np.pi * time / 5) + \
np.random.normal(0, 1, n_samples)
# 異常を挿入
abnormal_signal = normal_signal.copy()
# 異常1: スパイク(サンプル150-160)
abnormal_signal[150:161] += 20
# 異常2: ドリフト(サンプル300-400)
abnormal_signal[300:401] += np.linspace(0, 15, 101)
# 異常3: 周期性の消失(サンプル450-480)
abnormal_signal[450:481] = np.random.normal(50, 5, 31)
# LSTM Autoencoderの概念的なシミュレーション
# 実際には、TensorFlow/Kerasで実装します
def simulate_autoencoder_reconstruction(signal, window_size=50):
"""
Autoencoderによる再構成をシミュレート
(実際には、LSTM Autoencoderで実装)
"""
reconstructed = np.zeros_like(signal)
# 移動平均による擬似的な再構成
for i in range(len(signal)):
start = max(0, i - window_size // 2)
end = min(len(signal), i + window_size // 2)
reconstructed[i] = np.mean(signal[start:end])
# ノイズを追加して不完全な再構成をシミュレート
reconstructed += np.random.normal(0, 0.5, len(signal))
return reconstructed
# 正常データでの再構成(訓練フェーズをシミュレート)
reconstructed = simulate_autoencoder_reconstruction(abnormal_signal)
# 再構成誤差の計算
reconstruction_error = np.abs(abnormal_signal - reconstructed)
# 異常判定閾値(再構成誤差の99パーセンタイル)
threshold = np.percentile(reconstruction_error[:200], 99) # 正常部分から閾値を推定
# 異常検知
anomalies = reconstruction_error > threshold
# 可視化
fig, axes = plt.subplots(3, 1, figsize=(16, 12))
# 元信号と再構成信号
axes[0].plot(time, abnormal_signal, 'b-', linewidth=1, alpha=0.7,
label='元信号(異常を含む)')
axes[0].plot(time, reconstructed, 'r-', linewidth=1, alpha=0.7,
label='再構成信号')
axes[0].set_ylabel('信号値', fontsize=11)
axes[0].set_title('LSTM Autoencoder - 元信号と再構成信号',
fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)
# 再構成誤差
axes[1].plot(time, reconstruction_error, 'g-', linewidth=1.5,
label='再構成誤差')
axes[1].axhline(y=threshold, color='red', linestyle='--', linewidth=2,
label=f'閾値 = {threshold:.2f}')
axes[1].fill_between(time, 0, threshold, alpha=0.2, color='green',
label='正常範囲')
axes[1].set_ylabel('再構成誤差', fontsize=11)
axes[1].set_title('再構成誤差の時系列', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
# 異常検知結果
normal_mask = ~anomalies
anomaly_mask = anomalies
axes[2].plot(time[normal_mask], abnormal_signal[normal_mask],
'o', color='#11998e', markersize=3, alpha=0.6, label='正常')
axes[2].scatter(time[anomaly_mask], abnormal_signal[anomaly_mask],
color='red', s=30, marker='o', zorder=5, label='異常')
axes[2].set_xlabel('時間', fontsize=11)
axes[2].set_ylabel('信号値', fontsize=11)
axes[2].set_title('異常検知結果', fontsize=13, fontweight='bold')
axes[2].legend()
axes[2].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 結果サマリー
print("=== LSTM Autoencoder 時系列異常検知 ===")
print(f"\nデータ:")
print(f" サンプル数: {n_samples}")
print(f" 時間範囲: {time[0]:.1f} ~ {time[-1]:.1f}")
print(f"\nAutoencoder設定(概念):")
print(" - エンコーダ: LSTM層で時系列パターンを圧縮")
print(" - デコーダ: LSTM層で元の時系列を再構成")
print(" - 損失関数: 平均二乗誤差(MSE)")
print(f"\n再構成誤差:")
print(f" 平均誤差: {np.mean(reconstruction_error):.3f}")
print(f" 標準偏差: {np.std(reconstruction_error):.3f}")
print(f" 最大誤差: {np.max(reconstruction_error):.3f}")
print(f" 閾値(99パーセンタイル): {threshold:.3f}")
print(f"\n異常検知結果:")
print(f" 検出された異常: {anomalies.sum()}サンプル "
f"({anomalies.sum()/n_samples*100:.1f}%)")
# 異常区間の特定
anomaly_periods = []
in_anomaly = False
start_idx = 0
for i in range(len(anomalies)):
if anomalies[i] and not in_anomaly:
start_idx = i
in_anomaly = True
elif not anomalies[i] and in_anomaly:
anomaly_periods.append((start_idx, i-1))
in_anomaly = False
if in_anomaly:
anomaly_periods.append((start_idx, len(anomalies)-1))
print(f"\n検出された異常期間:")
for idx, (start, end) in enumerate(anomaly_periods[:5]): # 最初の5期間
duration = end - start + 1
print(f" {idx+1}. サンプル {start+1}~{end+1} ({duration}サンプル)")
print(f"\n【LSTM Autoencoderの利点】")
print(" - 時系列の複雑なパターンを学習可能")
print(" - 正常データのみで学習可能(教師なし学習)")
print(" - 長期依存関係を考慮した異常検知")
print(" - 多変量時系列にも拡張可能")
解説: LSTM Autoencoderは、LSTMネットワークを用いてエンコーダ-デコーダ構造を構築し、正常な時系列パターンを学習します。異常データは正常パターンからずれているため、再構成誤差が大きくなります。この再構成誤差を閾値判定することで異常を検出します。プロセス産業では、複雑な周期性や長期依存性を持つ時系列データの異常検知に有効です。実際の実装にはTensorFlow/Kerasを使用します。
コード例6: アラーム管理システムの設計
目的: 優先順位付けとデバウンシングを含む実践的なアラーム管理システムを設計する。
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from collections import deque
class AlarmManager:
"""高度なアラーム管理システム"""
def __init__(self, debounce_time=5, priority_rules=None):
"""
Parameters:
-----------
debounce_time : int
デバウンス時間(秒)- この時間内の連続アラームを1つにまとめる
priority_rules : dict
優先順位ルール
"""
self.debounce_time = debounce_time
self.priority_rules = priority_rules or {
'CRITICAL': 1,
'HIGH': 2,
'MEDIUM': 3,
'LOW': 4
}
self.active_alarms = {} # アラームID -> アラーム情報
self.alarm_history = [] # 全アラーム履歴
self.alarm_counter = 0
def raise_alarm(self, sensor_id, value, alarm_type, severity='MEDIUM',
message=''):
"""
アラームを発生させる
Parameters:
-----------
sensor_id : str
センサーID
value : float
異常値
alarm_type : str
アラームタイプ
severity : str
重要度(CRITICAL, HIGH, MEDIUM, LOW)
message : str
アラームメッセージ
"""
current_time = datetime.now()
# デバウンシング: 同じセンサーの直近のアラームをチェック
alarm_key = f"{sensor_id}_{alarm_type}"
if alarm_key in self.active_alarms:
last_alarm = self.active_alarms[alarm_key]
time_diff = (current_time - last_alarm['timestamp']).total_seconds()
# デバウンス時間内であれば、カウンタを増やすのみ
if time_diff < self.debounce_time:
last_alarm['count'] += 1
last_alarm['last_occurrence'] = current_time
return None # 新規アラームは発生させない
# 新規アラームの作成
self.alarm_counter += 1
alarm_id = f"ALM{self.alarm_counter:05d}"
alarm = {
'alarm_id': alarm_id,
'sensor_id': sensor_id,
'value': value,
'alarm_type': alarm_type,
'severity': severity,
'priority': self.priority_rules.get(severity, 99),
'message': message,
'timestamp': current_time,
'last_occurrence': current_time,
'count': 1,
'acknowledged': False,
'resolved': False
}
self.active_alarms[alarm_key] = alarm
self.alarm_history.append(alarm.copy())
return alarm
def acknowledge_alarm(self, alarm_key):
"""アラームを確認する"""
if alarm_key in self.active_alarms:
self.active_alarms[alarm_key]['acknowledged'] = True
return True
return False
def resolve_alarm(self, alarm_key):
"""アラームを解決する"""
if alarm_key in self.active_alarms:
alarm = self.active_alarms.pop(alarm_key)
alarm['resolved'] = True
alarm['resolved_time'] = datetime.now()
return True
return False
def get_active_alarms_by_priority(self):
"""優先順位順にアクティブアラームを取得"""
alarms = list(self.active_alarms.values())
alarms.sort(key=lambda x: (x['priority'], x['timestamp']))
return alarms
def get_alarm_summary(self):
"""アラームサマリーを取得"""
if not self.active_alarms:
return "アクティブアラーム: なし"
summary = {}
for alarm in self.active_alarms.values():
severity = alarm['severity']
summary[severity] = summary.get(severity, 0) + 1
return summary
# デモンストレーション
print("=== アラーム管理システム デモンストレーション ===\n")
# アラームマネージャーの初期化
alarm_mgr = AlarmManager(debounce_time=3)
# シミュレーション: プロセス監視とアラーム発生
np.random.seed(42)
# センサーデータのシミュレーション
sensors = {
'TEMP_101': {'target': 175, 'std': 2, 'UCL': 180, 'LCL': 170},
'PRES_201': {'target': 1.5, 'std': 0.05, 'UCL': 1.6, 'LCL': 1.4},
'FLOW_301': {'target': 50, 'std': 3, 'UCL': 58, 'LCL': 42}
}
print("プロセス監視を開始...\n")
# 60秒間のシミュレーション
for t in range(60):
for sensor_id, config in sensors.items():
# センサー値の生成
value = np.random.normal(config['target'], config['std'])
# 意図的な異常を挿入
if t == 10 and sensor_id == 'TEMP_101':
value = 185 # 高温異常
elif t in [20, 21, 22] and sensor_id == 'PRES_201':
value = 1.7 # 高圧異常(連続)
elif t == 40 and sensor_id == 'FLOW_301':
value = 30 # 低流量異常
# 異常チェックとアラーム発生
if value > config['UCL']:
alarm = alarm_mgr.raise_alarm(
sensor_id=sensor_id,
value=value,
alarm_type='HIGH_LIMIT',
severity='HIGH' if (value - config['UCL']) > config['std'] else 'MEDIUM',
message=f"{sensor_id}が上限を超過: {value:.2f}"
)
if alarm:
print(f"[{t:2d}秒] 🚨 {alarm['severity']:8s} | "
f"{alarm['sensor_id']:10s} | {alarm['message']}")
elif value < config['LCL']:
alarm = alarm_mgr.raise_alarm(
sensor_id=sensor_id,
value=value,
alarm_type='LOW_LIMIT',
severity='HIGH' if (config['LCL'] - value) > config['std'] else 'MEDIUM',
message=f"{sensor_id}が下限を下回る: {value:.2f}"
)
if alarm:
print(f"[{t:2d}秒] 🚨 {alarm['severity']:8s} | "
f"{alarm['sensor_id']:10s} | {alarm['message']}")
# アラームサマリー
print("\n" + "="*70)
print("アラーム管理サマリー")
print("="*70)
summary = alarm_mgr.get_alarm_summary()
print(f"\nアクティブアラーム数: {len(alarm_mgr.active_alarms)}")
for severity, count in sorted(summary.items(),
key=lambda x: alarm_mgr.priority_rules.get(x[0], 99)):
print(f" {severity}: {count}件")
print(f"\nアラーム履歴総数: {len(alarm_mgr.alarm_history)}")
# 優先順位順にアクティブアラームを表示
print(f"\n【優先順位順アクティブアラーム】")
active_alarms = alarm_mgr.get_active_alarms_by_priority()
for idx, alarm in enumerate(active_alarms[:5], 1): # 最初の5件
print(f"\n{idx}. [{alarm['severity']}] {alarm['alarm_id']}")
print(f" センサー: {alarm['sensor_id']}")
print(f" 値: {alarm['value']:.2f}")
print(f" 発生時刻: {alarm['timestamp'].strftime('%H:%M:%S')}")
print(f" 発生回数: {alarm['count']}")
print(f" 確認済み: {'はい' if alarm['acknowledged'] else 'いいえ'}")
print("\n" + "="*70)
# デバウンシング効果の検証
print(f"\n【デバウンシング効果】")
print(f" アラーム履歴総数: {len(alarm_mgr.alarm_history)}件")
print(f" アクティブアラーム数: {len(alarm_mgr.active_alarms)}件")
print(f" 削減率: {(1 - len(alarm_mgr.active_alarms)/len(alarm_mgr.alarm_history))*100:.1f}%")
print(f" デバウンス時間: {alarm_mgr.debounce_time}秒")
解説: 実践的なアラーム管理システムは、単に異常を検出するだけでなく、優先順位付け、デバウンシング(短時間の連続アラームを1つにまとめる)、アラームの確認・解決管理を行います。デバウンシングにより、ノイズによる誤報を削減し、オペレーターの負担を軽減できます。優先順位付けにより、重要なアラームに注意を集中できます。
コード例7: 誤報削減技法(ヒステリシスとフィルタリング)
目的: ヒステリシスとフィルタリングにより、誤報を削減する技法を実装する。
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
np.random.seed(42)
# ノイズを含むプロセスデータ生成
n_samples = 500
time = np.arange(n_samples)
# 基本信号 + ノイズ
base_signal = 50 + 5 * np.sin(2 * np.pi * time / 100)
noise = np.random.normal(0, 2, n_samples)
noisy_signal = base_signal + noise
# 閾値
upper_threshold = 55
lower_threshold = 45
# 手法1: シンプルな閾値判定(誤報が多い)
def simple_threshold_detection(data, upper, lower):
"""シンプルな閾値判定"""
alarms = (data > upper) | (data < lower)
return alarms
# 手法2: ヒステリシスを用いた閾値判定
def hysteresis_detection(data, upper, lower, hysteresis_band=1):
"""
ヒステリシスを用いた閾値判定
Parameters:
-----------
data : array-like
入力データ
upper : float
上限閾値
lower : float
下限閾値
hysteresis_band : float
ヒステリシスバンド幅
"""
alarms = np.zeros(len(data), dtype=bool)
alarm_active = False
for i in range(len(data)):
if not alarm_active:
# アラーム非活性状態: 外側閾値で判定
if data[i] > upper or data[i] < lower:
alarm_active = True
alarms[i] = True
else:
# アラーム活性状態: 内側閾値(ヒステリシス)で解除判定
inner_upper = upper - hysteresis_band
inner_lower = lower + hysteresis_band
if inner_lower < data[i] < inner_upper:
alarm_active = False
else:
alarms[i] = True
return alarms
# 手法3: ローパスフィルタ + 閾値判定
def filtered_detection(data, upper, lower, filter_order=3, cutoff_freq=0.05):
"""
ローパスフィルタリング後の閾値判定
Parameters:
-----------
data : array-like
入力データ
upper : float
上限閾値
lower : float
下限閾値
filter_order : int
フィルタ次数
cutoff_freq : float
カットオフ周波数(正規化周波数、0~1)
"""
# Butterworthローパスフィルタの設計
b, a = signal.butter(filter_order, cutoff_freq, btype='low')
# フィルタリング
filtered_data = signal.filtfilt(b, a, data)
# 閾値判定
alarms = (filtered_data > upper) | (filtered_data < lower)
return alarms, filtered_data
# 各手法を適用
simple_alarms = simple_threshold_detection(noisy_signal, upper_threshold, lower_threshold)
hysteresis_alarms = hysteresis_detection(noisy_signal, upper_threshold, lower_threshold,
hysteresis_band=2)
filtered_alarms, filtered_signal = filtered_detection(noisy_signal, upper_threshold,
lower_threshold)
# 可視化
fig, axes = plt.subplots(4, 1, figsize=(16, 14))
# 元データ
axes[0].plot(time, noisy_signal, 'b-', linewidth=0.8, alpha=0.7, label='ノイズ含む信号')
axes[0].plot(time, base_signal, 'g--', linewidth=1.5, label='真の信号')
axes[0].axhline(y=upper_threshold, color='red', linestyle='--', linewidth=2,
label='閾値')
axes[0].axhline(y=lower_threshold, color='red', linestyle='--', linewidth=2)
axes[0].set_ylabel('信号値', fontsize=11)
axes[0].set_title('元データ(ノイズを含む)', fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)
# 手法1: シンプルな閾値判定
axes[1].plot(time, noisy_signal, 'b-', linewidth=0.8, alpha=0.5, label='信号')
axes[1].scatter(time[simple_alarms], noisy_signal[simple_alarms],
color='red', s=20, marker='o', zorder=5, label='アラーム')
axes[1].axhline(y=upper_threshold, color='red', linestyle='--', linewidth=2)
axes[1].axhline(y=lower_threshold, color='red', linestyle='--', linewidth=2)
axes[1].set_ylabel('信号値', fontsize=11)
axes[1].set_title(f'手法1: シンプルな閾値判定(アラーム数: {simple_alarms.sum()})',
fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
# 手法2: ヒステリシス
axes[2].plot(time, noisy_signal, 'b-', linewidth=0.8, alpha=0.5, label='信号')
axes[2].scatter(time[hysteresis_alarms], noisy_signal[hysteresis_alarms],
color='orange', s=20, marker='o', zorder=5, label='アラーム')
axes[2].axhline(y=upper_threshold, color='red', linestyle='--', linewidth=2,
label='外側閾値')
axes[2].axhline(y=lower_threshold, color='red', linestyle='--', linewidth=2)
axes[2].axhline(y=upper_threshold - 2, color='orange', linestyle=':', linewidth=1.5,
label='内側閾値(ヒステリシス)')
axes[2].axhline(y=lower_threshold + 2, color='orange', linestyle=':', linewidth=1.5)
axes[2].set_ylabel('信号値', fontsize=11)
axes[2].set_title(f'手法2: ヒステリシス(アラーム数: {hysteresis_alarms.sum()})',
fontsize=13, fontweight='bold')
axes[2].legend()
axes[2].grid(alpha=0.3)
# 手法3: フィルタリング
axes[3].plot(time, noisy_signal, 'b-', linewidth=0.8, alpha=0.3, label='元信号')
axes[3].plot(time, filtered_signal, 'g-', linewidth=1.5, label='フィルタ後の信号')
axes[3].scatter(time[filtered_alarms], filtered_signal[filtered_alarms],
color='purple', s=20, marker='o', zorder=5, label='アラーム')
axes[3].axhline(y=upper_threshold, color='red', linestyle='--', linewidth=2,
label='閾値')
axes[3].axhline(y=lower_threshold, color='red', linestyle='--', linewidth=2)
axes[3].set_xlabel('時間', fontsize=11)
axes[3].set_ylabel('信号値', fontsize=11)
axes[3].set_title(f'手法3: ローパスフィルタ(アラーム数: {filtered_alarms.sum()})',
fontsize=13, fontweight='bold')
axes[3].legend()
axes[3].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 性能比較
print("=== 誤報削減技法の比較 ===")
print(f"\nデータ:")
print(f" サンプル数: {n_samples}")
print(f" 信号: 正弦波 + ガウスノイズ")
print(f" 閾値: 上限 {upper_threshold}, 下限 {lower_threshold}")
print(f"\n【アラーム数の比較】")
print(f" 手法1(シンプル閾値判定): {simple_alarms.sum()}回")
print(f" 手法2(ヒステリシス): {hysteresis_alarms.sum()}回 "
f"({(1 - hysteresis_alarms.sum()/simple_alarms.sum())*100:.1f}% 削減)")
print(f" 手法3(ローパスフィルタ): {filtered_alarms.sum()}回 "
f"({(1 - filtered_alarms.sum()/simple_alarms.sum())*100:.1f}% 削減)")
# アラームの連続性(チャタリング)の評価
def count_alarm_transitions(alarms):
"""アラームのオン/オフ切り替え回数をカウント"""
transitions = np.sum(np.diff(alarms.astype(int)) != 0)
return transitions
simple_transitions = count_alarm_transitions(simple_alarms)
hysteresis_transitions = count_alarm_transitions(hysteresis_alarms)
filtered_transitions = count_alarm_transitions(filtered_alarms)
print(f"\n【アラームの切り替え回数(チャタリング指標)】")
print(f" 手法1: {simple_transitions}回")
print(f" 手法2: {hysteresis_transitions}回 "
f"({(1 - hysteresis_transitions/simple_transitions)*100:.1f}% 削減)")
print(f" 手法3: {filtered_transitions}回 "
f"({(1 - filtered_transitions/simple_transitions)*100:.1f}% 削減)")
print(f"\n【推奨】")
print(" - ヒステリシス: 閾値付近でのチャタリング防止に有効")
print(" - ローパスフィルタ: 高周波ノイズの除去に有効(位相遅れに注意)")
print(" - 組み合わせ: フィルタリング + ヒステリシスで最大の効果")
解説: 誤報削減は、実用的なアラームシステムの重要な要素です。ヒステリシスは、アラームのオンとオフに異なる閾値を使用することで、閾値付近でのチャタリング(頻繁なオン/オフ切り替え)を防ぎます。ローパスフィルタは、高周波ノイズを除去してから閾値判定を行うことで、ノイズによる誤報を削減します。実際のプロセス監視では、これらの技法を組み合わせて使用します。
コード例8: 根本原因分析システムの実装
目的: 異常検知後の根本原因分析(Root Cause Analysis)システムを実装する。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import pearsonr
np.random.seed(42)
# 多変量プロセスデータ生成
n_samples = 200
time_index = pd.date_range('2025-01-01', periods=n_samples, freq='1min')
# プロセス変数の生成
# 変数間に因果関係を持たせる
feed_flow = 50 + np.random.normal(0, 2, n_samples)
reactor_temp = 175 + 0.5 * (feed_flow - 50) + np.random.normal(0, 1.5, n_samples)
reactor_pressure = 1.5 + 0.02 * (reactor_temp - 175) + np.random.normal(0, 0.05, n_samples)
product_quality = 95 + 0.3 * (reactor_temp - 175) - 0.1 * (reactor_pressure - 1.5) + \
np.random.normal(0, 1, n_samples)
# 異常を挿入: 供給流量の急激な増加(サンプル100-120)
feed_flow[100:121] += np.linspace(0, 10, 21)
reactor_temp[100:121] += 0.5 * np.linspace(0, 10, 21) + np.random.normal(0, 1, 21)
reactor_pressure[100:121] += 0.02 * 0.5 * np.linspace(0, 10, 21) + \
np.random.normal(0, 0.05, 21)
product_quality[100:121] += 0.3 * 0.5 * np.linspace(0, 10, 21)
# DataFrameに格納
df = pd.DataFrame({
'timestamp': time_index,
'feed_flow': feed_flow,
'reactor_temp': reactor_temp,
'reactor_pressure': reactor_pressure,
'product_quality': product_quality
})
df.set_index('timestamp', inplace=True)
class RootCauseAnalyzer:
"""根本原因分析システム"""
def __init__(self, data, variable_names):
"""
Parameters:
-----------
data : DataFrame
プロセスデータ
variable_names : list
変数名のリスト
"""
self.data = data
self.variable_names = variable_names
def detect_anomaly_period(self, variable, threshold_factor=3):
"""異常期間を検出"""
mean = self.data[variable].mean()
std = self.data[variable].std()
z_scores = np.abs((self.data[variable] - mean) / std)
anomaly_mask = z_scores > threshold_factor
# 連続する異常期間を特定
anomaly_periods = []
in_anomaly = False
start_idx = None
for i, is_anomaly in enumerate(anomaly_mask):
if is_anomaly and not in_anomaly:
start_idx = i
in_anomaly = True
elif not is_anomaly and in_anomaly:
anomaly_periods.append((start_idx, i-1))
in_anomaly = False
if in_anomaly:
anomaly_periods.append((start_idx, len(anomaly_mask)-1))
return anomaly_periods
def correlation_analysis(self, anomaly_period, target_variable):
"""
異常期間における変数間の相関分析
Parameters:
-----------
anomaly_period : tuple
異常期間(開始インデックス, 終了インデックス)
target_variable : str
対象変数
"""
start, end = anomaly_period
# 異常期間前の正常データ
normal_period = (max(0, start-50), max(0, start-1))
normal_data = self.data.iloc[normal_period[0]:normal_period[1]+1]
# 異常期間のデータ
anomaly_data = self.data.iloc[start:end+1]
correlations = {}
for var in self.variable_names:
if var != target_variable:
# 正常期間の相関
normal_corr, _ = pearsonr(normal_data[target_variable],
normal_data[var])
# 異常期間の相関
anomaly_corr, _ = pearsonr(anomaly_data[target_variable],
anomaly_data[var])
# 相関の変化
corr_change = abs(anomaly_corr - normal_corr)
correlations[var] = {
'normal_corr': normal_corr,
'anomaly_corr': anomaly_corr,
'change': corr_change
}
# 相関変化の大きい順にソート
sorted_vars = sorted(correlations.items(),
key=lambda x: x[1]['change'],
reverse=True)
return sorted_vars
def time_sequence_analysis(self, anomaly_period):
"""
異常発生の時間順序を分析
Parameters:
-----------
anomaly_period : tuple
異常期間(開始インデックス, 終了インデックス)
"""
start, end = anomaly_period
# 各変数の異常開始時刻を特定
anomaly_start_times = {}
for var in self.variable_names:
mean = self.data[var].mean()
std = self.data[var].std()
for i in range(start, end+1):
z_score = abs((self.data[var].iloc[i] - mean) / std)
if z_score > 2: # 2σ以上の逸脱
anomaly_start_times[var] = i
break
# 時間順にソート
sorted_times = sorted(anomaly_start_times.items(),
key=lambda x: x[1])
return sorted_times
# 根本原因分析の実行
analyzer = RootCauseAnalyzer(df, df.columns.tolist())
# 製品品質の異常を検出
quality_anomalies = analyzer.detect_anomaly_period('product_quality',
threshold_factor=2)
print("=== 根本原因分析システム ===")
print(f"\n異常検出:")
print(f" 対象変数: 製品品質")
print(f" 検出された異常期間: {len(quality_anomalies)}個")
if quality_anomalies:
anomaly_period = quality_anomalies[0] # 最初の異常期間を分析
start, end = anomaly_period
print(f"\n【分析対象異常期間】")
print(f" 期間: サンプル {start+1} ~ {end+1}")
print(f" 時刻: {df.index[start]} ~ {df.index[end]}")
# 相関分析
print(f"\n【変数間相関分析】")
correlations = analyzer.correlation_analysis(anomaly_period, 'product_quality')
print(" 根本原因候補(相関変化の大きい順):")
for idx, (var, corr_info) in enumerate(correlations, 1):
print(f"\n {idx}. {var}")
print(f" 正常時相関: {corr_info['normal_corr']:+.3f}")
print(f" 異常時相関: {corr_info['anomaly_corr']:+.3f}")
print(f" 変化: {corr_info['change']:.3f}")
# 時間順序分析
print(f"\n【異常発生の時間順序】")
time_sequence = analyzer.time_sequence_analysis(anomaly_period)
print(" 異常が最初に現れた順序:")
for idx, (var, sample_idx) in enumerate(time_sequence, 1):
time_delay = sample_idx - time_sequence[0][1] if idx > 1 else 0
print(f" {idx}. {var}: サンプル {sample_idx+1} "
f"({time_delay}分後)" if time_delay > 0 else
f" {idx}. {var}: サンプル {sample_idx+1} (起点)")
# 可視化
fig, axes = plt.subplots(len(df.columns), 1, figsize=(16, 12), sharex=True)
for idx, var in enumerate(df.columns):
# 正常データ
normal_mask = np.ones(len(df), dtype=bool)
normal_mask[start:end+1] = False
axes[idx].plot(df.index[normal_mask], df[var][normal_mask],
'o', color='#11998e', markersize=3, alpha=0.6)
# 異常データ
axes[idx].plot(df.index[start:end+1], df[var].iloc[start:end+1],
'o', color='red', markersize=4, alpha=0.8, label='異常期間')
# 平均線
mean = df[var].mean()
axes[idx].axhline(y=mean, color='blue', linestyle='--',
linewidth=1, alpha=0.5)
axes[idx].set_ylabel(var, fontsize=10)
axes[idx].grid(alpha=0.3)
if idx == 0:
axes[idx].set_title('根本原因分析 - 時系列データ',
fontsize=13, fontweight='bold')
if idx == 0:
axes[idx].legend()
axes[-1].set_xlabel('時刻', fontsize=11)
plt.tight_layout()
plt.show()
# 結論
print(f"\n【根本原因の推定】")
if time_sequence:
root_cause = time_sequence[0][0]
print(f" 推定根本原因: {root_cause}")
print(f" 理由:")
print(f" 1. 異常発生の最も早い変数")
print(f" 2. 異常期間中に他変数との相関変化が大きい")
print(f"\n 推奨対応:")
print(f" - {root_cause}を正常範囲に戻す")
print(f" - {root_cause}の制御ループを確認")
print(f" - 関連する上流プロセスを調査")
解説: 根本原因分析(RCA)は、異常検知後に「なぜ異常が発生したか」を特定するプロセスです。このシステムは、(1) 変数間の相関分析により関連する変数を特定し、(2) 時間順序分析により異常が最初に現れた変数(根本原因の候補)を特定します。プロセス産業では、迅速なRCAにより、問題の再発防止と効率的な対策実施が可能になります。
3.3 本章のまとめ
学んだこと
- 異常検知の基礎
- 異常検知の目的と重要性
- ルールベース、統計的、機械学習、ディープラーニングの4つのアプローチ
- 各手法の利点と欠点の理解
- 統計的異常検知
- Z-score法: 正規分布を仮定した外れ値検出
- 修正Z-score法(MAD): 外れ値に頑健な手法
- 適用条件と使い分け
- 機械学習による異常検知
- Isolation Forest: 孤立性に基づく異常検知
- One-Class SVM: 正常領域の境界学習
- 教師なし学習による実装
- ディープラーニング
- LSTM Autoencoder: 時系列パターンの学習
- 再構成誤差による異常検知
- 複雑な時系列への対応
- アラーム管理
- 優先順位付けとデバウンシング
- ヒステリシスとフィルタリングによる誤報削減
- 根本原因分析システム
重要なポイント
- 手法の選択: データの特性と目的に応じて適切な異常検知手法を選択
- 誤報削減: 実用的なシステムでは、誤報削減技法が不可欠
- 根本原因分析: 異常検知だけでなく、原因特定まで行うことが重要
- 段階的アプローチ: シンプルな手法から始め、必要に応じて高度な手法を導入
- 統合システム: 複数の手法を組み合わせた統合的なアプローチが効果的
次の章へ
第4章では、フィードバック制御とPID制御を学びます:
- フィードバック制御の基礎理論
- 一次遅れ系とステップ応答
- PID制御器の実装(P, PI, PID)
- Ziegler-Nicholsチューニング手法
- アンチワインドアップとカスケード制御