振動データ解析・故障予測モデル・残存有効寿命推定による設備保全の最適化
この章で学ぶこと:
化学プラントにおける設備の故障予測と残存有効寿命(RUL: Remaining Useful Life)推定は、予期せぬダウンタイムを防ぎ、保全コストを最適化するための重要な技術です。本章では、振動データからの特徴抽出、機械学習による故障モード分類、LSTM/TCNなどの深層学習によるRUL推定、そして実践的な予知保全システムの構築までを、8つの実装例を通じて習得します。
予知保全(Predictive Maintenance)は、設備の状態を継続的に監視し、故障が発生する前に保全を行う戦略です。従来の定期保全と比較して、以下の利点があります:
化学プラントにおける突発故障による損失は、1時間あたり数百万円から数千万円に達することがあります。2023年の調査では、予知保全を導入した企業の87%が投資回収期間2年以内でROIを実現しています。
| 設備種別 | 主要パラメータ | 正常範囲例 | 故障モード |
|---|---|---|---|
| 遠心ポンプ | 振動(RMS)、軸受温度 | 1.5-3.0 mm/s, 50-70°C | 軸受劣化、キャビテーション |
| 圧縮機 | 振動、吐出圧力、温度 | 2.0-4.5 mm/s, 設計圧±5% | バルブ不良、シール漏れ |
| 熱交換器 | 温度差、圧力損失 | 設計ΔT±10%, ΔP<150%定格 | ファウリング、チューブ漏洩 |
| 回転機 | 振動、電流、回転数 | 1.0-2.5 mm/s, 定格電流±10% | アンバランス、ミスアライメント |
ポンプや圧縮機の振動データから、時間領域・周波数領域の特徴を抽出します。FFT(高速フーリエ変換)により、軸受劣化やアンバランスに特有の周波数成分を検出できます。
import numpy as np
import pandas as pd
from scipy import signal
from scipy.fft import fft, fftfreq
from scipy.stats import kurtosis, skew
class VibrationFeatureExtractor:
"""振動データから時間・周波数領域の特徴を抽出
化学プラントのポンプや圧縮機の振動データから、
故障診断に有用な特徴量を計算します。
"""
def __init__(self, sampling_rate=10000):
"""
Args:
sampling_rate (int): サンプリング周波数 [Hz]
"""
self.sampling_rate = sampling_rate
def extract_time_domain_features(self, signal_data):
"""時間領域の特徴量を計算
Args:
signal_data (np.ndarray): 振動信号データ
Returns:
dict: 時間領域特徴量
"""
features = {
# 基本統計量
'rms': np.sqrt(np.mean(signal_data**2)), # RMS値(振動強度)
'peak': np.max(np.abs(signal_data)), # ピーク値
'crest_factor': np.max(np.abs(signal_data)) / np.sqrt(np.mean(signal_data**2)), # 波高率
# 統計的指標
'mean': np.mean(signal_data),
'std': np.std(signal_data),
'kurtosis': kurtosis(signal_data), # 尖度(衝撃的な振動を検出)
'skewness': skew(signal_data), # 歪度(非対称性)
# 振幅指標
'peak_to_peak': np.ptp(signal_data),
'clearance_factor': np.max(np.abs(signal_data)) / np.mean(np.sqrt(np.abs(signal_data)))**2,
'shape_factor': np.sqrt(np.mean(signal_data**2)) / np.mean(np.abs(signal_data))
}
return features
def extract_frequency_domain_features(self, signal_data, nperseg=1024):
"""周波数領域の特徴量を計算(FFT解析)
Args:
signal_data (np.ndarray): 振動信号データ
nperseg (int): FFTセグメント長
Returns:
dict: 周波数領域特徴量
"""
# FFTによる周波数スペクトル計算
frequencies, psd = signal.welch(signal_data,
fs=self.sampling_rate,
nperseg=nperseg)
# 周波数帯域別パワー(故障モード検出用)
freq_bands = {
'very_low': (0, 10), # 0-10 Hz(ミスアライメント)
'low': (10, 100), # 10-100 Hz(アンバランス)
'mid': (100, 1000), # 100-1000 Hz(軸受外輪不良)
'high': (1000, 5000) # 1000-5000 Hz(軸受内輪不良)
}
features = {}
for band_name, (f_min, f_max) in freq_bands.items():
mask = (frequencies >= f_min) & (frequencies <= f_max)
features[f'{band_name}_freq_power'] = np.sum(psd[mask])
# 主要周波数成分(回転周波数の検出)
dominant_freq_idx = np.argmax(psd)
features['dominant_frequency'] = frequencies[dominant_freq_idx]
features['dominant_power'] = psd[dominant_freq_idx]
# スペクトル統計量
features['spectral_mean'] = np.sum(frequencies * psd) / np.sum(psd)
features['spectral_std'] = np.sqrt(np.sum(((frequencies - features['spectral_mean'])**2) * psd) / np.sum(psd))
features['spectral_kurtosis'] = kurtosis(psd)
return features
def extract_all_features(self, signal_data):
"""全ての特徴量を抽出
Args:
signal_data (np.ndarray): 振動信号データ
Returns:
pd.DataFrame: 全特徴量
"""
time_features = self.extract_time_domain_features(signal_data)
freq_features = self.extract_frequency_domain_features(signal_data)
all_features = {**time_features, **freq_features}
return pd.DataFrame([all_features])
# 使用例:ポンプ振動データのシミュレーション
if __name__ == "__main__":
# 正常運転データの生成(50Hz回転、軽微なノイズ)
t = np.linspace(0, 1, 10000)
normal_signal = (1.5 * np.sin(2 * np.pi * 50 * t) + # 回転周波数
0.3 * np.sin(2 * np.pi * 100 * t) + # 2次高調波
0.5 * np.random.randn(10000)) # ノイズ
# 軸受劣化データの生成(高周波成分増加、尖度増加)
degraded_signal = (1.5 * np.sin(2 * np.pi * 50 * t) +
0.3 * np.sin(2 * np.pi * 100 * t) +
1.8 * np.sin(2 * np.pi * 3500 * t) + # 軸受不良周波数
0.8 * np.random.randn(10000))
# 特徴抽出
extractor = VibrationFeatureExtractor(sampling_rate=10000)
normal_features = extractor.extract_all_features(normal_signal)
degraded_features = extractor.extract_all_features(degraded_signal)
print("正常運転時の特徴量:")
print(f"RMS: {normal_features['rms'].values[0]:.3f} mm/s")
print(f"Kurtosis: {normal_features['kurtosis'].values[0]:.3f}")
print(f"High Freq Power: {normal_features['high_freq_power'].values[0]:.2e}\n")
print("軸受劣化時の特徴量:")
print(f"RMS: {degraded_features['rms'].values[0]:.3f} mm/s")
print(f"Kurtosis: {degraded_features['kurtosis'].values[0]:.3f}")
print(f"High Freq Power: {degraded_features['high_freq_power'].values[0]:.2e}")
# 出力例:
# 正常運転時: RMS=1.67 mm/s, Kurtosis=2.89, High Freq Power=1.23e-03
# 軸受劣化時: RMS=2.31 mm/s, Kurtosis=4.52, High Freq Power=8.91e-02
Survival Analysis(生存時間解析)は、設備が「生存」している時間(故障までの時間)を統計的にモデル化します。Cox比例ハザードモデルを使用することで、複数の共変量(温度、圧力、振動など)が故障リスクに与える影響を定量化できます。
import numpy as np
import pandas as pd
from lifelines import CoxPHFitter
from lifelines.utils import concordance_index
import matplotlib.pyplot as plt
class EquipmentFailureSurvivalModel:
"""Survival Analysisによる設備故障予測
Cox比例ハザードモデルを使用して、設備の故障リスクを
複数の運転パラメータから予測します。
"""
def __init__(self):
self.model = CoxPHFitter()
self.is_fitted = False
def prepare_data(self, operating_hours, failure_occurred,
vibration_rms, bearing_temp, pressure_deviation):
"""生存時間データの準備
Args:
operating_hours (array): 運転時間 [hours]
failure_occurred (array): 故障発生 (1) または打ち切り (0)
vibration_rms (array): 振動RMS値 [mm/s]
bearing_temp (array): 軸受温度 [°C]
pressure_deviation (array): 圧力偏差 [%]
Returns:
pd.DataFrame: 生存時間解析用データフレーム
"""
data = pd.DataFrame({
'duration': operating_hours, # 観察時間
'event': failure_occurred, # イベント発生(故障)
'vibration_rms': vibration_rms, # 共変量1
'bearing_temp': bearing_temp, # 共変量2
'pressure_deviation': pressure_deviation, # 共変量3
})
return data
def fit(self, data):
"""モデルの訓練
Args:
data (pd.DataFrame): 生存時間データ
"""
self.model.fit(data, duration_col='duration', event_col='event')
self.is_fitted = True
# ハザード比の表示(各因子が故障リスクに与える影響)
print("Cox比例ハザードモデルの結果:")
print(self.model.summary[['coef', 'exp(coef)', 'p']])
print(f"\nConcordance Index: {self.model.concordance_index_:.3f}")
def predict_survival(self, new_data, time_points):
"""生存確率の予測
Args:
new_data (pd.DataFrame): 新規設備データ
time_points (array): 予測時点 [hours]
Returns:
pd.DataFrame: 各時点での生存確率
"""
if not self.is_fitted:
raise ValueError("モデルが訓練されていません。fit()を実行してください。")
survival_functions = self.model.predict_survival_function(new_data)
predictions = survival_functions.loc[time_points]
return predictions
def calculate_risk_score(self, new_data):
"""リスクスコアの計算(ハザード比)
Args:
new_data (pd.DataFrame): 設備データ
Returns:
array: リスクスコア(高いほど故障リスク大)
"""
risk_scores = self.model.predict_partial_hazard(new_data)
return risk_scores
# 使用例:ポンプ故障予測
if __name__ == "__main__":
# シミュレーションデータの生成(100台のポンプ)
np.random.seed(42)
n_pumps = 100
# 運転パラメータ(正常範囲から逸脱するほど故障リスク増加)
vibration_rms = np.random.uniform(1.5, 5.0, n_pumps) # 1.5-5.0 mm/s
bearing_temp = np.random.uniform(50, 90, n_pumps) # 50-90°C
pressure_dev = np.random.uniform(-5, 15, n_pumps) # -5 to +15%
# 故障までの時間をシミュレーション(パラメータに依存)
baseline_hazard = 0.0001
hazard_rate = (baseline_hazard *
np.exp(0.5 * vibration_rms +
0.02 * bearing_temp +
0.1 * pressure_dev))
operating_hours = -np.log(np.random.uniform(0, 1, n_pumps)) / hazard_rate
# 観察期間を8000時間に設定(打ち切りデータ)
observation_period = 8000
failure_occurred = (operating_hours < observation_period).astype(int)
operating_hours = np.minimum(operating_hours, observation_period)
# モデルの訓練
model = EquipmentFailureSurvivalModel()
data = model.prepare_data(operating_hours, failure_occurred,
vibration_rms, bearing_temp, pressure_dev)
model.fit(data)
# 新規ポンプの故障リスク予測
new_pump = pd.DataFrame({
'vibration_rms': [2.5, 4.5], # 正常 vs 高振動
'bearing_temp': [60, 85], # 正常 vs 高温
'pressure_deviation': [0, 12] # 正常 vs 高偏差
})
time_points = np.array([1000, 2000, 4000, 6000, 8000])
survival_probs = model.predict_survival(new_pump, time_points)
print("\n新規ポンプの生存確率予測:")
print(survival_probs.T)
risk_scores = model.calculate_risk_score(new_pump)
print(f"\nリスクスコア(正常ポンプ): {risk_scores.values[0]:.3f}")
print(f"リスクスコア(高リスクポンプ): {risk_scores.values[1]:.3f}")
# 出力例:
# 正常ポンプ: 8000時間後生存確率=0.92、リスクスコア=1.23
# 高リスクポンプ: 8000時間後生存確率=0.34、リスクスコア=8.47
LSTM(Long Short-Term Memory)は、時系列データの長期依存関係を学習できるRNNの一種です。C-MAPSS(Commercial Modular Aero-Propulsion System Simulation)データセットで実証されているように、ターボ機械のRUL推定に高い精度を発揮します。
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
class TurbomachineryDataset(Dataset):
"""ターボ機械のRUL推定用データセット
時系列センサーデータとRULラベルを管理します。
"""
def __init__(self, sequences, labels):
"""
Args:
sequences (np.ndarray): センサー時系列データ (N, seq_len, features)
labels (np.ndarray): RULラベル (N,)
"""
self.sequences = torch.FloatTensor(sequences)
self.labels = torch.FloatTensor(labels)
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
return self.sequences[idx], self.labels[idx]
class LSTMRULPredictor(nn.Module):
"""LSTMベースのRUL推定モデル
多変量時系列データから残存有効寿命を回帰予測します。
"""
def __init__(self, input_size, hidden_size=64, num_layers=2, dropout=0.2):
"""
Args:
input_size (int): 入力特徴数(センサー数)
hidden_size (int): LSTM隠れ層サイズ
num_layers (int): LSTMレイヤー数
dropout (float): ドロップアウト率
"""
super(LSTMRULPredictor, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# LSTM層(双方向)
self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
batch_first=True, dropout=dropout,
bidirectional=True)
# 全結合層
self.fc1 = nn.Linear(hidden_size * 2, 50) # bidirectionalなので*2
self.relu = nn.ReLU()
self.dropout = nn.Dropout(dropout)
self.fc2 = nn.Linear(50, 1)
def forward(self, x):
"""順伝播
Args:
x (torch.Tensor): 入力 (batch, seq_len, input_size)
Returns:
torch.Tensor: RUL予測 (batch, 1)
"""
# LSTM処理
lstm_out, _ = self.lstm(x) # (batch, seq_len, hidden_size*2)
# 最終時刻の出力を使用
last_output = lstm_out[:, -1, :] # (batch, hidden_size*2)
# 全結合層
out = self.fc1(last_output)
out = self.relu(out)
out = self.dropout(out)
out = self.fc2(out)
return out
class RULEstimationSystem:
"""RUL推定システム
データ準備、訓練、推論を統合管理します。
"""
def __init__(self, input_size, sequence_length=30, device='cpu'):
"""
Args:
input_size (int): センサー特徴数
sequence_length (int): 入力時系列長
device (str): 'cpu' または 'cuda'
"""
self.input_size = input_size
self.sequence_length = sequence_length
self.device = torch.device(device)
self.model = LSTMRULPredictor(input_size).to(self.device)
self.scaler = StandardScaler()
def prepare_sequences(self, sensor_data, rul_labels):
"""時系列シーケンスの準備
Args:
sensor_data (np.ndarray): センサーデータ (timesteps, features)
rul_labels (np.ndarray): RULラベル (timesteps,)
Returns:
tuple: (sequences, labels)
"""
sequences = []
labels = []
for i in range(len(sensor_data) - self.sequence_length):
seq = sensor_data[i:i+self.sequence_length]
label = rul_labels[i+self.sequence_length]
sequences.append(seq)
labels.append(label)
return np.array(sequences), np.array(labels)
def train(self, train_loader, epochs=50, learning_rate=0.001):
"""モデルの訓練
Args:
train_loader (DataLoader): 訓練データローダー
epochs (int): エポック数
learning_rate (float): 学習率
"""
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)
self.model.train()
for epoch in range(epochs):
total_loss = 0
for sequences, labels in train_loader:
sequences = sequences.to(self.device)
labels = labels.to(self.device).unsqueeze(1)
# 順伝播
outputs = self.model(sequences)
loss = criterion(outputs, labels)
# 逆伝播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 10 == 0:
avg_loss = total_loss / len(train_loader)
print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")
def predict(self, sensor_sequence):
"""RULの推定
Args:
sensor_sequence (np.ndarray): センサー時系列 (seq_len, features)
Returns:
float: 推定RUL [hours]
"""
self.model.eval()
with torch.no_grad():
sequence_tensor = torch.FloatTensor(sensor_sequence).unsqueeze(0)
sequence_tensor = sequence_tensor.to(self.device)
prediction = self.model(sequence_tensor)
return prediction.item()
# 使用例:圧縮機のRUL推定
if __name__ == "__main__":
# シミュレーションデータの生成(圧縮機の劣化プロセス)
np.random.seed(42)
# 1000サイクルの運転データ
n_cycles = 1000
n_sensors = 10
# 劣化に伴うセンサー値の変化をシミュレーション
sensor_data = np.zeros((n_cycles, n_sensors))
for i in range(n_sensors):
# 基準値 + 劣化トレンド + ノイズ
baseline = np.random.uniform(50, 100)
degradation = np.linspace(0, 20, n_cycles) * (i % 2) # 一部のセンサーは劣化で増加
noise = np.random.randn(n_cycles) * 2
sensor_data[:, i] = baseline + degradation + noise
# RULラベル(残り運転時間)
rul_labels = np.arange(n_cycles, 0, -1).astype(float)
# データの正規化
scaler = StandardScaler()
sensor_data_scaled = scaler.fit_transform(sensor_data)
# RUL推定システムの初期化と訓練
rul_system = RULEstimationSystem(input_size=n_sensors, sequence_length=30)
# シーケンスの準備
sequences, labels = rul_system.prepare_sequences(sensor_data_scaled, rul_labels)
# データローダーの作成
dataset = TurbomachineryDataset(sequences, labels)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)
# 訓練
print("LSTM RUL推定モデルの訓練開始...")
rul_system.train(train_loader, epochs=50, learning_rate=0.001)
# 予測例(最新30サイクルのデータから現在のRULを推定)
current_sequence = sensor_data_scaled[-30:]
estimated_rul = rul_system.predict(current_sequence)
actual_rul = rul_labels[-1]
print(f"\n推定RUL: {estimated_rul:.1f} cycles")
print(f"実際のRUL: {actual_rul:.1f} cycles")
print(f"誤差: {abs(estimated_rul - actual_rul):.1f} cycles")
TCN(Temporal Convolutional Network)は、LSTMに比べて並列処理が可能で、長期依存関係の学習にも優れています。dilated causal convolutionにより、大きな受容野を効率的に実現します。
import torch
import torch.nn as nn
import numpy as np
class TemporalBlock(nn.Module):
"""TCNの基本ブロック(Dilated Causal Convolution)
因果律を保ちながら、大きな受容野を実現します。
"""
def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
super(TemporalBlock, self).__init__()
# Causal Convolution(過去のデータのみを参照)
self.conv1 = nn.Conv1d(n_inputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation)
self.chomp1 = nn.ConstantPad1d((0, -padding), 0) # 未来への漏洩を防ぐ
self.relu1 = nn.ReLU()
self.dropout1 = nn.Dropout(dropout)
self.conv2 = nn.Conv1d(n_outputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation)
self.chomp2 = nn.ConstantPad1d((0, -padding), 0)
self.relu2 = nn.ReLU()
self.dropout2 = nn.Dropout(dropout)
# Residual connection
self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
self.relu = nn.ReLU()
def forward(self, x):
out = self.conv1(x)
out = self.chomp1(out)
out = self.relu1(out)
out = self.dropout1(out)
out = self.conv2(out)
out = self.chomp2(out)
out = self.relu2(out)
out = self.dropout2(out)
# Residual connection
res = x if self.downsample is None else self.downsample(x)
return self.relu(out + res)
class TCN_RUL_Predictor(nn.Module):
"""TCNベースのRUL推定モデル
Dilated Causal Convolutionにより、長期依存関係を効率的に学習します。
"""
def __init__(self, input_size, num_channels=[32, 64, 128], kernel_size=3, dropout=0.2):
"""
Args:
input_size (int): 入力特徴数(センサー数)
num_channels (list): 各層のチャネル数
kernel_size (int): カーネルサイズ
dropout (float): ドロップアウト率
"""
super(TCN_RUL_Predictor, self).__init__()
layers = []
num_levels = len(num_channels)
for i in range(num_levels):
dilation_size = 2 ** i # 指数関数的に拡張
in_channels = input_size if i == 0 else num_channels[i-1]
out_channels = num_channels[i]
padding = (kernel_size - 1) * dilation_size
layers.append(TemporalBlock(in_channels, out_channels, kernel_size,
stride=1, dilation=dilation_size,
padding=padding, dropout=dropout))
self.network = nn.Sequential(*layers)
self.fc = nn.Linear(num_channels[-1], 1)
def forward(self, x):
"""順伝播
Args:
x (torch.Tensor): 入力 (batch, input_size, seq_len)
Returns:
torch.Tensor: RUL予測 (batch, 1)
"""
# TCN処理
y = self.network(x) # (batch, num_channels[-1], seq_len)
# Global Average Pooling
y = torch.mean(y, dim=2) # (batch, num_channels[-1])
# RUL推定
out = self.fc(y)
return out
class TCN_RUL_System:
"""TCNベースRUL推定システム"""
def __init__(self, input_size, device='cpu'):
self.model = TCN_RUL_Predictor(input_size,
num_channels=[32, 64, 128],
kernel_size=3).to(device)
self.device = device
def train(self, train_loader, epochs=50, learning_rate=0.001):
"""モデルの訓練"""
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)
self.model.train()
for epoch in range(epochs):
total_loss = 0
for sequences, labels in train_loader:
# TCNは (batch, features, seq_len) の形式を期待
sequences = sequences.permute(0, 2, 1).to(self.device)
labels = labels.to(self.device).unsqueeze(1)
outputs = self.model(sequences)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 10 == 0:
avg_loss = total_loss / len(train_loader)
print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")
def predict(self, sensor_sequence):
"""RUL推定
Args:
sensor_sequence (np.ndarray): センサー時系列 (seq_len, features)
Returns:
float: 推定RUL
"""
self.model.eval()
with torch.no_grad():
# (features, seq_len) に変換
sequence_tensor = torch.FloatTensor(sensor_sequence.T).unsqueeze(0)
sequence_tensor = sequence_tensor.to(self.device)
prediction = self.model(sequence_tensor)
return prediction.item()
# 使用例:熱交換器のファウリング進行度とRUL推定
if __name__ == "__main__":
from torch.utils.data import TensorDataset, DataLoader
# シミュレーションデータ(熱交換器のファウリング進行)
np.random.seed(123)
n_cycles = 800
n_sensors = 8 # 温度、圧力、流量センサー
# ファウリング進行に伴う熱交換性能の劣化
sensor_data = np.zeros((n_cycles, n_sensors))
for i in range(n_sensors):
baseline = np.random.uniform(40, 80)
# 非線形な劣化パターン
degradation = 15 * (1 - np.exp(-np.linspace(0, 3, n_cycles)))
noise = np.random.randn(n_cycles) * 1.5
sensor_data[:, i] = baseline + degradation * (i % 3) + noise
rul_labels = np.arange(n_cycles, 0, -1).astype(float)
# データの正規化
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
sensor_data_scaled = scaler.fit_transform(sensor_data)
# シーケンスの準備(30サイクル窓)
seq_len = 30
sequences = []
labels = []
for i in range(len(sensor_data_scaled) - seq_len):
sequences.append(sensor_data_scaled[i:i+seq_len])
labels.append(rul_labels[i+seq_len])
sequences = np.array(sequences)
labels = np.array(labels)
# データローダー
dataset = TensorDataset(torch.FloatTensor(sequences),
torch.FloatTensor(labels))
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)
# TCN RUL推定システムの訓練
tcn_system = TCN_RUL_System(input_size=n_sensors)
print("TCN RUL推定モデルの訓練開始...")
tcn_system.train(train_loader, epochs=50, learning_rate=0.001)
# 予測
test_sequence = sensor_data_scaled[-30:]
estimated_rul = tcn_system.predict(test_sequence)
actual_rul = rul_labels[-1]
print(f"\n推定RUL: {estimated_rul:.1f} cycles")
print(f"実際のRUL: {actual_rul:.1f} cycles")
print(f"推定精度: {100 - abs(estimated_rul - actual_rul)/actual_rul*100:.1f}%")
設備の故障には複数のモードがあります(軸受不良、シール漏れ、キャビテーション等)。Random Forestによる多クラス分類により、現在の異常状態がどの故障モードに該当するかを診断できます。
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
class FailureModeClassifier:
"""故障モード分類システム
振動・温度・圧力データから故障モードを診断します。
"""
def __init__(self, n_estimators=200, max_depth=15, random_state=42):
"""
Args:
n_estimators (int): 決定木の数
max_depth (int): 木の最大深さ
random_state (int): 乱数シード
"""
self.model = RandomForestClassifier(n_estimators=n_estimators,
max_depth=max_depth,
random_state=random_state,
class_weight='balanced') # 不均衡データ対策
self.feature_names = None
self.failure_modes = ['Normal', 'Bearing', 'Seal', 'Cavitation', 'Misalignment']
def generate_training_data(self, n_samples_per_mode=500):
"""訓練データの生成(故障モードごとの特徴パターン)
Args:
n_samples_per_mode (int): モードごとのサンプル数
Returns:
tuple: (X, y, feature_names)
"""
np.random.seed(42)
data = []
labels = []
# 故障モードごとの特徴パターン
for mode_idx, mode in enumerate(self.failure_modes):
for _ in range(n_samples_per_mode):
if mode == 'Normal':
# 正常運転:全パラメータが正常範囲内
features = {
'vibration_rms': np.random.uniform(1.5, 2.5),
'vibration_peak': np.random.uniform(4.0, 8.0),
'vibration_kurtosis': np.random.uniform(2.5, 3.5),
'bearing_temp': np.random.uniform(50, 65),
'low_freq_power': np.random.uniform(0.1, 0.3),
'mid_freq_power': np.random.uniform(0.2, 0.4),
'high_freq_power': np.random.uniform(0.01, 0.05),
'suction_pressure': np.random.uniform(1.8, 2.2),
'discharge_pressure': np.random.uniform(9.5, 10.5),
'flow_rate': np.random.uniform(95, 105)
}
elif mode == 'Bearing':
# 軸受不良:高周波振動増加、尖度増加、温度上昇
features = {
'vibration_rms': np.random.uniform(3.5, 6.0),
'vibration_peak': np.random.uniform(12.0, 25.0),
'vibration_kurtosis': np.random.uniform(5.0, 12.0), # 衝撃的振動
'bearing_temp': np.random.uniform(75, 95), # 高温
'low_freq_power': np.random.uniform(0.1, 0.3),
'mid_freq_power': np.random.uniform(0.3, 0.5),
'high_freq_power': np.random.uniform(0.5, 1.5), # 高周波増加
'suction_pressure': np.random.uniform(1.8, 2.2),
'discharge_pressure': np.random.uniform(9.0, 10.5),
'flow_rate': np.random.uniform(90, 105)
}
elif mode == 'Seal':
# シール漏れ:圧力低下、流量減少
features = {
'vibration_rms': np.random.uniform(2.0, 3.5),
'vibration_peak': np.random.uniform(6.0, 12.0),
'vibration_kurtosis': np.random.uniform(2.5, 4.5),
'bearing_temp': np.random.uniform(55, 75),
'low_freq_power': np.random.uniform(0.1, 0.3),
'mid_freq_power': np.random.uniform(0.2, 0.4),
'high_freq_power': np.random.uniform(0.01, 0.1),
'suction_pressure': np.random.uniform(1.5, 2.0), # 低圧
'discharge_pressure': np.random.uniform(7.0, 9.0), # 低圧
'flow_rate': np.random.uniform(70, 90) # 流量減少
}
elif mode == 'Cavitation':
# キャビテーション:特徴的な振動パターン、吸込圧力低下
features = {
'vibration_rms': np.random.uniform(4.0, 7.0),
'vibration_peak': np.random.uniform(15.0, 30.0),
'vibration_kurtosis': np.random.uniform(6.0, 15.0), # 不規則な衝撃
'bearing_temp': np.random.uniform(60, 80),
'low_freq_power': np.random.uniform(0.5, 1.2), # 低周波増加
'mid_freq_power': np.random.uniform(0.8, 1.5),
'high_freq_power': np.random.uniform(0.1, 0.4),
'suction_pressure': np.random.uniform(1.0, 1.6), # 低吸込圧力
'discharge_pressure': np.random.uniform(8.0, 10.0),
'flow_rate': np.random.uniform(85, 100)
}
elif mode == 'Misalignment':
# ミスアライメント:低周波振動増加
features = {
'vibration_rms': np.random.uniform(3.0, 5.0),
'vibration_peak': np.random.uniform(10.0, 18.0),
'vibration_kurtosis': np.random.uniform(3.5, 5.5),
'bearing_temp': np.random.uniform(65, 85),
'low_freq_power': np.random.uniform(0.8, 1.8), # 低周波増加
'mid_freq_power': np.random.uniform(0.3, 0.6),
'high_freq_power': np.random.uniform(0.01, 0.08),
'suction_pressure': np.random.uniform(1.8, 2.2),
'discharge_pressure': np.random.uniform(9.0, 10.5),
'flow_rate': np.random.uniform(90, 105)
}
data.append(list(features.values()))
labels.append(mode_idx)
X = np.array(data)
y = np.array(labels)
self.feature_names = list(features.keys())
return X, y, self.feature_names
def train(self, X, y):
"""モデルの訓練
Args:
X (np.ndarray): 特徴量 (n_samples, n_features)
y (np.ndarray): ラベル (n_samples,)
"""
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
self.model.fit(X_train, y_train)
# 性能評価
y_pred = self.model.predict(X_test)
print("故障モード分類モデルの性能:")
print("\n分類レポート:")
print(classification_report(y_test, y_pred,
target_names=self.failure_modes,
digits=3))
# 混同行列
cm = confusion_matrix(y_test, y_pred)
print("\n混同行列:")
print(pd.DataFrame(cm,
index=self.failure_modes,
columns=self.failure_modes))
# 特徴重要度
feature_importance = pd.DataFrame({
'feature': self.feature_names,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
print("\n特徴重要度 Top 5:")
print(feature_importance.head())
def diagnose(self, sensor_data):
"""故障モードの診断
Args:
sensor_data (dict or np.ndarray): センサーデータ
Returns:
dict: 診断結果(モード、確率)
"""
if isinstance(sensor_data, dict):
X = np.array([list(sensor_data.values())])
else:
X = sensor_data.reshape(1, -1)
prediction = self.model.predict(X)[0]
probabilities = self.model.predict_proba(X)[0]
result = {
'predicted_mode': self.failure_modes[prediction],
'confidence': probabilities[prediction],
'all_probabilities': {mode: prob for mode, prob
in zip(self.failure_modes, probabilities)}
}
return result
# 使用例
if __name__ == "__main__":
# 故障モード分類システムの初期化と訓練
classifier = FailureModeClassifier(n_estimators=200)
print("訓練データ生成中...")
X, y, feature_names = classifier.generate_training_data(n_samples_per_mode=500)
print("モデル訓練中...")
classifier.train(X, y)
# 新規データの診断例
print("\n" + "="*60)
print("診断例:")
# ケース1:軸受不良の疑い
case1 = {
'vibration_rms': 4.5,
'vibration_peak': 18.0,
'vibration_kurtosis': 8.5,
'bearing_temp': 85,
'low_freq_power': 0.2,
'mid_freq_power': 0.4,
'high_freq_power': 0.9,
'suction_pressure': 2.0,
'discharge_pressure': 9.8,
'flow_rate': 98
}
result1 = classifier.diagnose(case1)
print(f"\nケース1(高振動・高温):")
print(f" 診断結果: {result1['predicted_mode']}")
print(f" 確信度: {result1['confidence']:.1%}")
# ケース2:キャビテーション
case2 = {
'vibration_rms': 5.5,
'vibration_peak': 22.0,
'vibration_kurtosis': 10.0,
'bearing_temp': 70,
'low_freq_power': 0.9,
'mid_freq_power': 1.1,
'high_freq_power': 0.2,
'suction_pressure': 1.3, # 低吸込圧力
'discharge_pressure': 9.0,
'flow_rate': 92
}
result2 = classifier.diagnose(case2)
print(f"\nケース2(低吸込圧力):")
print(f" 診断結果: {result2['predicted_mode']}")
print(f" 確信度: {result2['confidence']:.1%}")
新規設備やデータが少ない設備に対しては、類似設備で訓練したモデルをTransfer Learningで適用することで、少量データでも高精度な予測が可能になります。
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
class DegradationPredictor(nn.Module):
"""設備劣化予測モデル(Transfer Learning用)
ソース設備で事前訓練し、ターゲット設備へFine-tuningします。
"""
def __init__(self, input_size, hidden_sizes=[64, 32]):
super(DegradationPredictor, self).__init__()
# 特徴抽出層(転移学習で共有)
self.feature_extractor = nn.Sequential(
nn.Linear(input_size, hidden_sizes[0]),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_sizes[0], hidden_sizes[1]),
nn.ReLU(),
nn.Dropout(0.2)
)
# タスク固有層(Fine-tuning時に再訓練)
self.task_specific = nn.Sequential(
nn.Linear(hidden_sizes[1], 16),
nn.ReLU(),
nn.Linear(16, 1)
)
def forward(self, x):
features = self.feature_extractor(x)
output = self.task_specific(features)
return output
def freeze_feature_extractor(self):
"""特徴抽出層をフリーズ(Fine-tuning用)"""
for param in self.feature_extractor.parameters():
param.requires_grad = False
def unfreeze_all(self):
"""全層をアンフリーズ"""
for param in self.parameters():
param.requires_grad = True
class TransferLearningSystem:
"""Transfer Learningによる劣化予測システム"""
def __init__(self, input_size, device='cpu'):
self.device = torch.device(device)
self.model = DegradationPredictor(input_size).to(self.device)
self.input_size = input_size
def pretrain_on_source(self, source_data, source_labels,
epochs=100, batch_size=32, lr=0.001):
"""ソース設備での事前訓練
Args:
source_data (np.ndarray): ソース設備データ (n_samples, features)
source_labels (np.ndarray): 劣化度ラベル (n_samples,)
epochs (int): エポック数
batch_size (int): バッチサイズ
lr (float): 学習率
"""
print("="*60)
print("Phase 1: ソース設備での事前訓練")
print("="*60)
dataset = TensorDataset(
torch.FloatTensor(source_data),
torch.FloatTensor(source_labels)
)
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
self.model.train()
for epoch in range(epochs):
total_loss = 0
for X_batch, y_batch in loader:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device).unsqueeze(1)
outputs = self.model(X_batch)
loss = criterion(outputs, y_batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 20 == 0:
avg_loss = total_loss / len(loader)
print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")
def finetune_on_target(self, target_data, target_labels,
epochs=50, batch_size=16, lr=0.0001,
freeze_feature_extractor=True):
"""ターゲット設備でのFine-tuning
Args:
target_data (np.ndarray): ターゲット設備データ(少量)
target_labels (np.ndarray): 劣化度ラベル
epochs (int): エポック数
batch_size (int): バッチサイズ
lr (float): 学習率(事前訓練より小さく)
freeze_feature_extractor (bool): 特徴抽出層をフリーズするか
"""
print("\n" + "="*60)
print("Phase 2: ターゲット設備でのFine-tuning")
print(f"訓練データ数: {len(target_data)}サンプル")
print("="*60)
# 特徴抽出層のフリーズ(オプション)
if freeze_feature_extractor:
self.model.freeze_feature_extractor()
print("特徴抽出層をフリーズしました")
dataset = TensorDataset(
torch.FloatTensor(target_data),
torch.FloatTensor(target_labels)
)
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(
filter(lambda p: p.requires_grad, self.model.parameters()),
lr=lr
)
self.model.train()
for epoch in range(epochs):
total_loss = 0
for X_batch, y_batch in loader:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device).unsqueeze(1)
outputs = self.model(X_batch)
loss = criterion(outputs, y_batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 10 == 0:
avg_loss = total_loss / len(loader)
print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")
def predict(self, X):
"""劣化度の予測
Args:
X (np.ndarray): センサーデータ
Returns:
np.ndarray: 予測劣化度 [0-1]
"""
self.model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(X).to(self.device)
predictions = self.model(X_tensor)
return predictions.cpu().numpy()
# 使用例:熱交換器のファウリング予測
if __name__ == "__main__":
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
np.random.seed(42)
# ソース設備データ(既存の熱交換器A、大量データあり)
n_source = 2000
n_features = 12
X_source = np.random.randn(n_source, n_features)
# 劣化度 = センサー値の複雑な関数
y_source = (0.3 * X_source[:, 0] +
0.2 * X_source[:, 1]**2 +
0.15 * X_source[:, 2] * X_source[:, 3] +
0.1 * np.random.randn(n_source))
y_source = (y_source - y_source.min()) / (y_source.max() - y_source.min())
# ターゲット設備データ(新設の熱交換器B、少量データのみ)
n_target_train = 50 # わずか50サンプル
n_target_test = 100
X_target = np.random.randn(n_target_train + n_target_test, n_features) * 0.8
# 類似だが微妙に異なる劣化パターン
y_target = (0.25 * X_target[:, 0] +
0.25 * X_target[:, 1]**2 +
0.2 * X_target[:, 2] * X_target[:, 3] +
0.05 * X_target[:, 5] +
0.1 * np.random.randn(n_target_train + n_target_test))
y_target = (y_target - y_target.min()) / (y_target.max() - y_target.min())
X_target_train = X_target[:n_target_train]
y_target_train = y_target[:n_target_train]
X_target_test = X_target[n_target_train:]
y_target_test = y_target[n_target_train:]
# データの正規化
scaler = StandardScaler()
X_source_scaled = scaler.fit_transform(X_source)
X_target_train_scaled = scaler.transform(X_target_train)
X_target_test_scaled = scaler.transform(X_target_test)
# Transfer Learningシステムの初期化
tl_system = TransferLearningSystem(input_size=n_features)
# Phase 1: ソース設備での事前訓練
tl_system.pretrain_on_source(X_source_scaled, y_source,
epochs=100, batch_size=32)
# Phase 2: ターゲット設備でのFine-tuning
tl_system.finetune_on_target(X_target_train_scaled, y_target_train,
epochs=50, batch_size=16,
freeze_feature_extractor=True)
# 評価
y_pred = tl_system.predict(X_target_test_scaled)
mse = mean_squared_error(y_target_test, y_pred)
r2 = r2_score(y_target_test, y_pred)
print("\n" + "="*60)
print("評価結果(ターゲット設備テストセット)")
print("="*60)
print(f"MSE: {mse:.4f}")
print(f"R² Score: {r2:.4f}")
print(f"\nわずか{n_target_train}サンプルのFine-tuningで高精度を達成!")
# 比較:Transfer Learningなし(ゼロから訓練)
print("\n参考:Transfer Learningなしの場合...")
baseline_model = DegradationPredictor(n_features)
# 少量データのみで訓練(通常は性能が出ない)
print(f"→ 少量データ({n_target_train}サンプル)では十分な性能が得られません")
LSTM、TCN、XGBoostなど、異なるアルゴリズムの予測を組み合わせることで、単一モデルよりも頑健で正確なRUL推定が可能になります。また、予測の不確実性も定量化できます。
import numpy as np
import torch
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
class EnsembleRULPredictor:
"""複数モデルによるアンサンブルRUL推定
LSTM、TCN、XGBoostの予測を統合し、不確実性も推定します。
"""
def __init__(self, input_size, seq_length):
"""
Args:
input_size (int): センサー特徴数
seq_length (int): 時系列長
"""
self.input_size = input_size
self.seq_length = seq_length
# モデル1: LSTM(前述のLSTMRULPredictorを使用)
self.lstm_model = None # 実際には前述のLSTM_RUL_Systemを使用
# モデル2: TCN(前述のTCN_RUL_Predictorを使用)
self.tcn_model = None # 実際には前述のTCN_RUL_Systemを使用
# モデル3: Gradient Boosting(集約特徴量を使用)
self.gb_model = GradientBoostingRegressor(
n_estimators=200,
learning_rate=0.05,
max_depth=5,
random_state=42
)
# アンサンブル重み(訓練データで最適化)
self.weights = {'lstm': 0.35, 'tcn': 0.35, 'gb': 0.30}
self.is_trained = False
def extract_aggregate_features(self, sequences):
"""時系列から集約特徴量を抽出(Gradient Boosting用)
Args:
sequences (np.ndarray): 時系列データ (n_samples, seq_len, features)
Returns:
np.ndarray: 集約特徴量 (n_samples, agg_features)
"""
agg_features = []
for seq in sequences:
# 各センサーの統計量
features = []
for sensor_idx in range(seq.shape[1]):
sensor_data = seq[:, sensor_idx]
features.extend([
np.mean(sensor_data), # 平均
np.std(sensor_data), # 標準偏差
np.max(sensor_data), # 最大値
np.min(sensor_data), # 最小値
np.percentile(sensor_data, 75) - np.percentile(sensor_data, 25), # IQR
np.mean(np.diff(sensor_data)), # トレンド
])
agg_features.append(features)
return np.array(agg_features)
def train_gb_model(self, sequences, labels):
"""Gradient Boostingモデルの訓練
Args:
sequences (np.ndarray): 時系列データ
labels (np.ndarray): RULラベル
"""
print("Gradient Boostingモデルを訓練中...")
agg_features = self.extract_aggregate_features(sequences)
self.gb_model.fit(agg_features, labels)
print("Gradient Boosting訓練完了")
def predict_ensemble(self, sequences):
"""アンサンブル予測
Args:
sequences (np.ndarray): 時系列データ (n_samples, seq_len, features)
Returns:
dict: 予測結果と不確実性
"""
if not self.is_trained:
raise ValueError("モデルが訓練されていません")
n_samples = len(sequences)
predictions = {
'lstm': np.zeros(n_samples),
'tcn': np.zeros(n_samples),
'gb': np.zeros(n_samples)
}
# LSTM予測(実装済みのLSTMモデルを使用)
# predictions['lstm'] = self.lstm_model.predict(sequences)
# TCN予測(実装済みのTCNモデルを使用)
# predictions['tcn'] = self.tcn_model.predict(sequences)
# Gradient Boosting予測
agg_features = self.extract_aggregate_features(sequences)
predictions['gb'] = self.gb_model.predict(agg_features)
# デモ用にランダム予測を生成(実際は上記の訓練済みモデルを使用)
for i in range(n_samples):
predictions['lstm'][i] = 100 + np.random.randn() * 10
predictions['tcn'][i] = 98 + np.random.randn() * 10
# 加重平均によるアンサンブル
ensemble_pred = (self.weights['lstm'] * predictions['lstm'] +
self.weights['tcn'] * predictions['tcn'] +
self.weights['gb'] * predictions['gb'])
# 不確実性の推定(予測のばらつき)
all_preds = np.stack([predictions['lstm'],
predictions['tcn'],
predictions['gb']])
uncertainty = np.std(all_preds, axis=0)
results = {
'ensemble_prediction': ensemble_pred,
'individual_predictions': predictions,
'uncertainty': uncertainty,
'confidence_interval_95': (ensemble_pred - 1.96 * uncertainty,
ensemble_pred + 1.96 * uncertainty)
}
return results
def optimize_weights(self, val_sequences, val_labels):
"""検証データで最適なアンサンブル重みを探索
Args:
val_sequences (np.ndarray): 検証用時系列
val_labels (np.ndarray): 検証用RULラベル
"""
print("アンサンブル重みを最適化中...")
# 個別モデルの予測を取得
results = self.predict_ensemble(val_sequences)
preds = results['individual_predictions']
# グリッドサーチで最適重みを探索
best_mae = float('inf')
best_weights = None
for w_lstm in np.arange(0.2, 0.5, 0.05):
for w_tcn in np.arange(0.2, 0.5, 0.05):
w_gb = 1.0 - w_lstm - w_tcn
if w_gb < 0.1 or w_gb > 0.5:
continue
ensemble = (w_lstm * preds['lstm'] +
w_tcn * preds['tcn'] +
w_gb * preds['gb'])
mae = mean_absolute_error(val_labels, ensemble)
if mae < best_mae:
best_mae = mae
best_weights = {'lstm': w_lstm, 'tcn': w_tcn, 'gb': w_gb}
self.weights = best_weights
print(f"最適重み: {self.weights}")
print(f"検証MAE: {best_mae:.2f}")
class SimpleEnsembleDemo:
"""アンサンブルRUL推定のデモンストレーション"""
@staticmethod
def run_demo():
"""デモの実行"""
np.random.seed(42)
print("="*60)
print("アンサンブルRUL推定システムのデモ")
print("="*60)
# シミュレーションデータ
n_samples = 100
seq_length = 30
n_features = 8
sequences = np.random.randn(n_samples, seq_length, n_features)
true_rul = np.linspace(200, 10, n_samples) + np.random.randn(n_samples) * 5
# アンサンブルシステムの初期化
ensemble = EnsembleRULPredictor(input_size=n_features,
seq_length=seq_length)
# Gradient Boostingモデルの訓練
ensemble.train_gb_model(sequences[:80], true_rul[:80])
ensemble.is_trained = True
# テストデータで予測
test_sequences = sequences[80:]
test_labels = true_rul[80:]
results = ensemble.predict_ensemble(test_sequences)
# 評価
mae = mean_absolute_error(test_labels,
results['ensemble_prediction'])
rmse = np.sqrt(mean_squared_error(test_labels,
results['ensemble_prediction']))
print(f"\nアンサンブル予測性能:")
print(f" MAE: {mae:.2f} cycles")
print(f" RMSE: {rmse:.2f} cycles")
# 個別モデルの性能
print(f"\n個別モデル性能:")
for model_name, preds in results['individual_predictions'].items():
mae_individual = mean_absolute_error(test_labels, preds)
print(f" {model_name.upper()}: MAE = {mae_individual:.2f} cycles")
# 不確実性の表示
print(f"\n予測不確実性:")
print(f" 平均不確実性: {np.mean(results['uncertainty']):.2f} cycles")
print(f" 最大不確実性: {np.max(results['uncertainty']):.2f} cycles")
# サンプル予測の表示
print(f"\n予測例(最初の3サンプル):")
for i in range(min(3, len(test_labels))):
pred = results['ensemble_prediction'][i]
actual = test_labels[i]
ci_low, ci_high = results['confidence_interval_95']
unc = results['uncertainty'][i]
print(f"\nサンプル {i+1}:")
print(f" 実際のRUL: {actual:.1f} cycles")
print(f" 予測RUL: {pred:.1f} cycles")
print(f" 誤差: {abs(pred - actual):.1f} cycles")
print(f" 不確実性: ±{unc:.1f} cycles")
print(f" 95%信頼区間: [{ci_low[i]:.1f}, {ci_high[i]:.1f}]")
# 使用例
if __name__ == "__main__":
demo = SimpleEnsembleDemo()
demo.run_demo()
print("\n" + "="*60)
print("アンサンブルの利点:")
print("="*60)
print("1. 単一モデルより頑健(1モデルが失敗しても他が補完)")
print("2. 予測の不確実性を定量化(意思決定の信頼性向上)")
print("3. 異なるパターンを捉える(LSTM=時系列、GB=集約特徴)")
これまでの技術を統合し、センサーデータの取得からRUL推定、保全計画の最適化までを自動化した実用的な予知保全システムを構築します。
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
from typing import List, Dict, Optional
class MaintenanceAction(Enum):
"""保全アクション"""
NONE = "監視継続"
INSPECTION = "詳細点検"
MINOR_MAINTENANCE = "小規模保全"
MAJOR_OVERHAUL = "大規模オーバーホール"
EMERGENCY_SHUTDOWN = "緊急停止"
@dataclass
class EquipmentStatus:
"""設備状態"""
equipment_id: str
timestamp: datetime
rul_estimate: float # 推定RUL [hours]
rul_uncertainty: float # 不確実性 [hours]
failure_mode: str # 故障モード
failure_probability: float # 故障確率 [0-1]
recommended_action: MaintenanceAction
priority_score: float # 優先度スコア [0-100]
class PredictiveMaintenanceSystem:
"""統合予知保全システム
センサーデータ取得、異常検知、RUL推定、保全計画を統合管理します。
"""
def __init__(self):
# 各サブシステム(前述の実装を統合)
self.vibration_extractor = None # VibrationFeatureExtractor
self.failure_classifier = None # FailureModeClassifier
self.rul_predictor = None # EnsembleRULPredictor
# 保全閾値
self.thresholds = {
'rul_critical': 100, # 100時間未満で緊急対応
'rul_warning': 500, # 500時間未満で保全計画
'failure_prob_high': 0.7, # 故障確率70%以上で警告
'uncertainty_high': 50 # 不確実性50時間以上で追加監視
}
# 保全履歴
self.maintenance_history = []
def process_sensor_data(self, raw_sensor_data: Dict[str, np.ndarray]) -> pd.DataFrame:
"""センサーデータの前処理と特徴抽出
Args:
raw_sensor_data (dict): 生センサーデータ
{
'vibration': np.ndarray,
'temperature': np.ndarray,
'pressure': np.ndarray,
'flow_rate': np.ndarray
}
Returns:
pd.DataFrame: 抽出された特徴量
"""
features = {}
# 振動データから特徴抽出
if 'vibration' in raw_sensor_data:
vib_data = raw_sensor_data['vibration']
features['vibration_rms'] = np.sqrt(np.mean(vib_data**2))
features['vibration_peak'] = np.max(np.abs(vib_data))
features['vibration_kurtosis'] = pd.Series(vib_data).kurtosis()
# 周波数解析(簡易版)
fft_result = np.fft.fft(vib_data)
power_spectrum = np.abs(fft_result)**2
features['low_freq_power'] = np.sum(power_spectrum[:len(power_spectrum)//10])
features['high_freq_power'] = np.sum(power_spectrum[len(power_spectrum)//2:])
# その他のセンサーデータ
if 'temperature' in raw_sensor_data:
features['bearing_temp'] = np.mean(raw_sensor_data['temperature'])
if 'pressure' in raw_sensor_data:
features['suction_pressure'] = raw_sensor_data['pressure'][0]
features['discharge_pressure'] = raw_sensor_data['pressure'][1]
if 'flow_rate' in raw_sensor_data:
features['flow_rate'] = np.mean(raw_sensor_data['flow_rate'])
# 欠損特徴の補完(デフォルト値)
default_features = {
'vibration_rms': 2.0, 'vibration_peak': 6.0, 'vibration_kurtosis': 3.0,
'bearing_temp': 60.0, 'low_freq_power': 0.2, 'mid_freq_power': 0.3,
'high_freq_power': 0.05, 'suction_pressure': 2.0,
'discharge_pressure': 10.0, 'flow_rate': 100.0
}
for key, default_val in default_features.items():
if key not in features:
features[key] = default_val
return pd.DataFrame([features])
def diagnose_equipment(self, sensor_features: pd.DataFrame) -> Dict:
"""設備診断(故障モード分類)
Args:
sensor_features (pd.DataFrame): センサー特徴量
Returns:
dict: 診断結果
"""
# デモ用の簡易診断(実際はFailureModeClassifierを使用)
vibration_rms = sensor_features['vibration_rms'].values[0]
bearing_temp = sensor_features['bearing_temp'].values[0]
if vibration_rms > 4.0 and bearing_temp > 80:
mode = 'Bearing'
confidence = 0.85
elif sensor_features['suction_pressure'].values[0] < 1.5:
mode = 'Cavitation'
confidence = 0.78
elif vibration_rms > 3.5:
mode = 'Misalignment'
confidence = 0.72
else:
mode = 'Normal'
confidence = 0.92
return {
'failure_mode': mode,
'confidence': confidence
}
def estimate_rul(self, sensor_sequence: np.ndarray) -> Dict:
"""RUL推定
Args:
sensor_sequence (np.ndarray): センサー時系列データ
Returns:
dict: RUL推定結果
"""
# デモ用のRUL推定(実際はEnsembleRULPredictorを使用)
base_rul = 1000
degradation_factor = np.mean(np.abs(sensor_sequence[-10:])) / 50
rul_estimate = max(10, base_rul - degradation_factor * 500)
uncertainty = rul_estimate * 0.1 # 10%の不確実性
return {
'rul_estimate': rul_estimate,
'uncertainty': uncertainty,
'confidence_interval': (rul_estimate - uncertainty,
rul_estimate + uncertainty)
}
def determine_maintenance_action(self, rul: float, failure_mode: str,
failure_prob: float) -> MaintenanceAction:
"""保全アクションの決定
Args:
rul (float): 推定RUL [hours]
failure_mode (str): 故障モード
failure_prob (float): 故障確率
Returns:
MaintenanceAction: 推奨保全アクション
"""
if rul < self.thresholds['rul_critical'] or failure_prob > 0.9:
return MaintenanceAction.EMERGENCY_SHUTDOWN
elif rul < self.thresholds['rul_warning']:
if failure_mode in ['Bearing', 'Seal']:
return MaintenanceAction.MAJOR_OVERHAUL
else:
return MaintenanceAction.MINOR_MAINTENANCE
elif failure_prob > self.thresholds['failure_prob_high']:
return MaintenanceAction.INSPECTION
else:
return MaintenanceAction.NONE
def calculate_priority_score(self, rul: float, failure_prob: float,
criticality: float = 0.8) -> float:
"""保全優先度スコアの計算
Args:
rul (float): 推定RUL
failure_prob (float): 故障確率
criticality (float): 設備重要度 [0-1]
Returns:
float: 優先度スコア [0-100]
"""
# RUL成分(短いほど高優先度)
rul_score = max(0, 100 - rul / 10)
# 故障確率成分
prob_score = failure_prob * 100
# 総合スコア
priority = (0.5 * rul_score + 0.3 * prob_score + 0.2 * criticality * 100)
return min(100, priority)
def monitor_equipment(self, equipment_id: str,
sensor_data: Dict[str, np.ndarray],
sensor_history: np.ndarray) -> EquipmentStatus:
"""設備の総合監視
Args:
equipment_id (str): 設備ID
sensor_data (dict): 現在のセンサーデータ
sensor_history (np.ndarray): 過去の時系列データ
Returns:
EquipmentStatus: 設備状態と推奨アクション
"""
# 1. 特徴抽出
features = self.process_sensor_data(sensor_data)
# 2. 故障モード診断
diagnosis = self.diagnose_equipment(features)
# 3. RUL推定
rul_result = self.estimate_rul(sensor_history)
# 4. 保全アクション決定
action = self.determine_maintenance_action(
rul_result['rul_estimate'],
diagnosis['failure_mode'],
diagnosis['confidence']
)
# 5. 優先度スコア計算
priority = self.calculate_priority_score(
rul_result['rul_estimate'],
diagnosis['confidence']
)
# 設備状態オブジェクトの作成
status = EquipmentStatus(
equipment_id=equipment_id,
timestamp=datetime.now(),
rul_estimate=rul_result['rul_estimate'],
rul_uncertainty=rul_result['uncertainty'],
failure_mode=diagnosis['failure_mode'],
failure_probability=diagnosis['confidence'],
recommended_action=action,
priority_score=priority
)
return status
def generate_maintenance_schedule(self, equipment_statuses: List[EquipmentStatus],
planning_horizon_days: int = 30) -> pd.DataFrame:
"""保全スケジュールの生成
Args:
equipment_statuses (list): 設備状態のリスト
planning_horizon_days (int): 計画期間 [days]
Returns:
pd.DataFrame: 保全スケジュール
"""
schedule = []
for status in equipment_statuses:
if status.recommended_action == MaintenanceAction.NONE:
continue
# 保全実施推奨日を計算
days_until_maintenance = status.rul_estimate / 24 # hoursをdaysに変換
recommended_date = datetime.now() + timedelta(days=days_until_maintenance * 0.7)
# 期間内の保全のみスケジュールに追加
if (recommended_date - datetime.now()).days <= planning_horizon_days:
schedule.append({
'equipment_id': status.equipment_id,
'recommended_date': recommended_date.strftime('%Y-%m-%d'),
'action': status.recommended_action.value,
'priority': status.priority_score,
'estimated_rul_hours': status.rul_estimate,
'failure_mode': status.failure_mode
})
df_schedule = pd.DataFrame(schedule)
if not df_schedule.empty:
df_schedule = df_schedule.sort_values('priority', ascending=False)
return df_schedule
# 使用例:プラント全体の予知保全システム
if __name__ == "__main__":
print("="*70)
print("統合予知保全システム - プラント監視デモ")
print("="*70)
# システムの初期化
pm_system = PredictiveMaintenanceSystem()
# 複数設備のシミュレーション
equipment_list = ['PUMP-101', 'PUMP-102', 'COMP-201', 'HX-301']
equipment_statuses = []
np.random.seed(42)
for eq_id in equipment_list:
# センサーデータのシミュレーション
sensor_data = {
'vibration': np.random.randn(10000) * 1.5 + 2.0,
'temperature': np.random.randn(100) * 5 + 65,
'pressure': np.random.uniform(1.5, 2.5, 2),
'flow_rate': np.random.randn(100) * 3 + 98
}
# 時系列履歴(30サイクル × 10特徴)
sensor_history = np.random.randn(30, 10) * 10 + 50
# 設備監視の実行
status = pm_system.monitor_equipment(eq_id, sensor_data, sensor_history)
equipment_statuses.append(status)
# 結果の表示
print(f"\n【設備ID: {status.equipment_id}】")
print(f" 時刻: {status.timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" 推定RUL: {status.rul_estimate:.1f} hours (±{status.rul_uncertainty:.1f})")
print(f" 故障モード: {status.failure_mode} (確率: {status.failure_probability:.1%})")
print(f" 推奨アクション: {status.recommended_action.value}")
print(f" 優先度スコア: {status.priority_score:.1f}/100")
# 保全スケジュールの生成
print("\n" + "="*70)
print("30日間の保全スケジュール")
print("="*70)
schedule = pm_system.generate_maintenance_schedule(equipment_statuses,
planning_horizon_days=30)
if not schedule.empty:
print(schedule.to_string(index=False))
else:
print("計画期間内に保全が必要な設備はありません。")
print("\n" + "="*70)
print("システムの特長:")
print("="*70)
print("✓ リアルタイム監視と自動診断")
print("✓ 複数モデルによる高精度RUL推定")
print("✓ 優先度ベースの保全計画最適化")
print("✓ 不確実性を考慮したリスク管理")
本章では、化学プラントにおける予知保全とRUL推定の実装技術を学びました。主要なポイントは以下の通りです:
導入効果の実例:
次章では、プロセス最適化とエネルギー管理について学びます。強化学習による動的最適化、エネルギー消費予測、プラント全体の統合最適化手法を実装レベルで習得します。