5.1 時系列データの確率モデリング
📐 定義: 時系列データと自己相関
時系列データ \(\{X_t\}_{t=1,2,\ldots,n}\) は、時間順序を持つデータ列です。 自己相関関数(ACF: Autocorrelation Function): \[\rho(k) = \frac{Cov(X_t, X_{t-k})}{\sqrt{Var(X_t) Var(X_{t-k})}}\] 偏自己相関関数(PACF: Partial Autocorrelation Function): ラグ k での直接的な相関(中間ラグの影響を除去) これらは時系列モデルの選択に使われます。
時系列データ \(\{X_t\}_{t=1,2,\ldots,n}\) は、時間順序を持つデータ列です。 自己相関関数(ACF: Autocorrelation Function): \[\rho(k) = \frac{Cov(X_t, X_{t-k})}{\sqrt{Var(X_t) Var(X_{t-k})}}\] 偏自己相関関数(PACF: Partial Autocorrelation Function): ラグ k での直接的な相関(中間ラグの影響を除去) これらは時系列モデルの選択に使われます。
💻 コード例1: 時系列データの確率モデリング
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import acf, pacf
# 仮想データ:製造プロセスの温度データ
np.random.seed(42)
n_obs = 500
# トレンド + 季節性 + ノイズ
time = np.arange(n_obs)
trend = 100 + 0.02 * time # 線形トレンド
seasonality = 5 * np.sin(2 * np.pi * time / 50) # 周期50の季節性
noise = np.random.normal(0, 2, n_obs) # ホワイトノイズ
temperature = trend + seasonality + noise
fig, axes = plt.subplots(3, 2, figsize=(14, 12))
# (1) 時系列プロット
axes[0, 0].plot(time, temperature, linewidth=1.5, color='#667eea')
axes[0, 0].plot(time, trend, '--', linewidth=2, color='red', label='トレンド')
axes[0, 0].plot(time, trend + seasonality, '--', linewidth=2, color='green',
label='トレンド+季節性')
axes[0, 0].set_xlabel('時間', fontsize=11)
axes[0, 0].set_ylabel('温度 (°C)', fontsize=11)
axes[0, 0].set_title('製造プロセス温度の時系列データ', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) ヒストグラム
axes[0, 1].hist(temperature, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
axes[0, 1].set_xlabel('温度 (°C)', fontsize=11)
axes[0, 1].set_ylabel('密度', fontsize=11)
axes[0, 1].set_title('温度の分布', fontsize=12, fontweight='bold')
axes[0, 1].grid(alpha=0.3)
# (3) 自己相関関数(ACF)
lags = 50
acf_values = acf(temperature, nlags=lags)
axes[1, 0].stem(range(lags+1), acf_values, linefmt='#667eea', markerfmt='o',
basefmt=' ')
axes[1, 0].axhline(y=0, color='black', linestyle='-', linewidth=0.8)
axes[1, 0].axhline(y=1.96/np.sqrt(n_obs), color='red', linestyle='--',
linewidth=1.5, label='95%信頼区間')
axes[1, 0].axhline(y=-1.96/np.sqrt(n_obs), color='red', linestyle='--', linewidth=1.5)
axes[1, 0].set_xlabel('ラグ', fontsize=11)
axes[1, 0].set_ylabel('ACF', fontsize=11)
axes[1, 0].set_title('自己相関関数(ACF)', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) 偏自己相関関数(PACF)
pacf_values = pacf(temperature, nlags=lags)
axes[1, 1].stem(range(lags+1), pacf_values, linefmt='#764ba2', markerfmt='o',
basefmt=' ')
axes[1, 1].axhline(y=0, color='black', linestyle='-', linewidth=0.8)
axes[1, 1].axhline(y=1.96/np.sqrt(n_obs), color='red', linestyle='--', linewidth=1.5)
axes[1, 1].axhline(y=-1.96/np.sqrt(n_obs), color='red', linestyle='--', linewidth=1.5)
axes[1, 1].set_xlabel('ラグ', fontsize=11)
axes[1, 1].set_ylabel('PACF', fontsize=11)
axes[1, 1].set_title('偏自己相関関数(PACF)', fontsize=12, fontweight='bold')
axes[1, 1].grid(alpha=0.3)
# (5) 差分系列(トレンド除去)
diff_temp = np.diff(temperature)
axes[2, 0].plot(diff_temp, linewidth=1.5, color='#667eea')
axes[2, 0].axhline(y=0, color='red', linestyle='--', linewidth=2)
axes[2, 0].set_xlabel('時間', fontsize=11)
axes[2, 0].set_ylabel('差分 ΔT', fontsize=11)
axes[2, 0].set_title('1次差分系列(トレンド除去後)', fontsize=12, fontweight='bold')
axes[2, 0].grid(alpha=0.3)
# (6) 差分系列のACF
diff_acf = acf(diff_temp, nlags=lags)
axes[2, 1].stem(range(lags+1), diff_acf, linefmt='#667eea', markerfmt='o',
basefmt=' ')
axes[2, 1].axhline(y=0, color='black', linestyle='-', linewidth=0.8)
axes[2, 1].axhline(y=1.96/np.sqrt(len(diff_temp)), color='red', linestyle='--', linewidth=1.5)
axes[2, 1].axhline(y=-1.96/np.sqrt(len(diff_temp)), color='red', linestyle='--', linewidth=1.5)
axes[2, 1].set_xlabel('ラグ', fontsize=11)
axes[2, 1].set_ylabel('ACF', fontsize=11)
axes[2, 1].set_title('差分系列のACF', fontsize=12, fontweight='bold')
axes[2, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 統計サマリー
print("時系列データの統計:")
print(f"平均: {temperature.mean():.2f} °C")
print(f"標準偏差: {temperature.std():.2f} °C")
print(f"最小値: {temperature.min():.2f} °C")
print(f"最大値: {temperature.max():.2f} °C")
print(f"\n差分系列の統計:")
print(f"平均: {diff_temp.mean():.4f}")
print(f"標準偏差: {diff_temp.std():.2f}")
5.2 ARMA/ARIMAモデル
📊 定理: ARMA/ARIMAモデル
AR(p)モデル(自己回帰): \[X_t = c + \sum_{i=1}^p \phi_i X_{t-i} + \epsilon_t\] MA(q)モデル(移動平均): \[X_t = \mu + \epsilon_t + \sum_{i=1}^q \theta_i \epsilon_{t-i}\] ARMA(p,q)モデル: \[X_t = c + \sum_{i=1}^p \phi_i X_{t-i} + \epsilon_t + \sum_{i=1}^q \theta_i \epsilon_{t-i}\] ARIMA(p,d,q)モデル: ARMA(p,q)を d 回差分した系列に適用
AR(p)モデル(自己回帰): \[X_t = c + \sum_{i=1}^p \phi_i X_{t-i} + \epsilon_t\] MA(q)モデル(移動平均): \[X_t = \mu + \epsilon_t + \sum_{i=1}^q \theta_i \epsilon_{t-i}\] ARMA(p,q)モデル: \[X_t = c + \sum_{i=1}^p \phi_i X_{t-i} + \epsilon_t + \sum_{i=1}^q \theta_i \epsilon_{t-i}\] ARIMA(p,d,q)モデル: ARMA(p,q)を d 回差分した系列に適用
💻 コード例2: ARMA/ARIMAモデル
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
# ARIMAモデルのフィッティング
# まず定常性を検定(Augmented Dickey-Fuller test)
adf_result = adfuller(temperature)
print("Augmented Dickey-Fuller検定:")
print(f"ADF統計量: {adf_result[0]:.4f}")
print(f"p値: {adf_result[1]:.4f}")
print(f"結論: {'定常' if adf_result[1] < 0.05 else '非定常'}")
# ARIMA(1,1,1)モデルのフィッティング
model = ARIMA(temperature, order=(1, 1, 1))
model_fit = model.fit()
print("\n" + "="*60)
print(model_fit.summary())
# 予測
n_forecast = 50
forecast = model_fit.forecast(steps=n_forecast)
forecast_index = np.arange(n_obs, n_obs + n_forecast)
# 信頼区間
forecast_result = model_fit.get_forecast(steps=n_forecast)
forecast_ci = forecast_result.conf_int()
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) フィッティングと予測
axes[0, 0].plot(time, temperature, linewidth=1.5, color='#667eea',
label='実測値')
axes[0, 0].plot(time, model_fit.fittedvalues, '--', linewidth=2, color='red',
label='フィッティング', alpha=0.7)
axes[0, 0].plot(forecast_index, forecast, linewidth=2, color='green',
label='予測')
axes[0, 0].fill_between(forecast_index,
forecast_ci.iloc[:, 0],
forecast_ci.iloc[:, 1],
alpha=0.2, color='green', label='95%信頼区間')
axes[0, 0].set_xlabel('時間', fontsize=11)
axes[0, 0].set_ylabel('温度 (°C)', fontsize=11)
axes[0, 0].set_title('ARIMAモデルによるフィッティングと予測', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) 残差プロット
residuals = model_fit.resid
axes[0, 1].plot(residuals, linewidth=1.5, color='#667eea')
axes[0, 1].axhline(y=0, color='red', linestyle='--', linewidth=2)
axes[0, 1].set_xlabel('時間', fontsize=11)
axes[0, 1].set_ylabel('残差', fontsize=11)
axes[0, 1].set_title('残差プロット', fontsize=12, fontweight='bold')
axes[0, 1].grid(alpha=0.3)
# (3) 残差のヒストグラム
axes[1, 0].hist(residuals, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
# 正規分布フィット
x = np.linspace(residuals.min(), residuals.max(), 1000)
mu, sigma = residuals.mean(), residuals.std()
axes[1, 0].plot(x, stats.norm.pdf(x, mu, sigma), 'r-', linewidth=2.5,
label=f'N({mu:.2f}, {sigma:.2f}²)')
axes[1, 0].set_xlabel('残差', fontsize=11)
axes[1, 0].set_ylabel('密度', fontsize=11)
axes[1, 0].set_title('残差の分布', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) 残差のACF
residual_acf = acf(residuals, nlags=40)
axes[1, 1].stem(range(len(residual_acf)), residual_acf, linefmt='#667eea',
markerfmt='o', basefmt=' ')
axes[1, 1].axhline(y=0, color='black', linestyle='-', linewidth=0.8)
axes[1, 1].axhline(y=1.96/np.sqrt(len(residuals)), color='red',
linestyle='--', linewidth=1.5, label='95%信頼区間')
axes[1, 1].axhline(y=-1.96/np.sqrt(len(residuals)), color='red',
linestyle='--', linewidth=1.5)
axes[1, 1].set_xlabel('ラグ', fontsize=11)
axes[1, 1].set_ylabel('ACF', fontsize=11)
axes[1, 1].set_title('残差のACF(ホワイトノイズ検定)', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# モデル診断
from statsmodels.stats.diagnostic import acorr_ljungbox
lb_test = acorr_ljungbox(residuals, lags=20, return_df=True)
print("\n\nLjung-Box検定(残差の独立性):")
print(lb_test.head(10))
5.3 カルマンフィルタによる状態推定
📐 定義: カルマンフィルタ
線形動的システムの状態を、ノイズのある観測から最適に推定する再帰的アルゴリズム。 状態空間モデル: \[\mathbf{x}_{t} = \mathbf{F}\mathbf{x}_{t-1} + \mathbf{w}_t \quad (\text{状態方程式})\] \[\mathbf{z}_t = \mathbf{H}\mathbf{x}_t + \mathbf{v}_t \quad (\text{観測方程式})\] ここで \(\mathbf{w}_t \sim N(0, \mathbf{Q})\)(プロセスノイズ)、\(\mathbf{v}_t \sim N(0, \mathbf{R})\)(観測ノイズ) カルマンフィルタの更新式:
線形動的システムの状態を、ノイズのある観測から最適に推定する再帰的アルゴリズム。 状態空間モデル: \[\mathbf{x}_{t} = \mathbf{F}\mathbf{x}_{t-1} + \mathbf{w}_t \quad (\text{状態方程式})\] \[\mathbf{z}_t = \mathbf{H}\mathbf{x}_t + \mathbf{v}_t \quad (\text{観測方程式})\] ここで \(\mathbf{w}_t \sim N(0, \mathbf{Q})\)(プロセスノイズ)、\(\mathbf{v}_t \sim N(0, \mathbf{R})\)(観測ノイズ) カルマンフィルタの更新式:
- 予測: \(\hat{\mathbf{x}}_{t|t-1} = \mathbf{F}\hat{\mathbf{x}}_{t-1|t-1}\)
- 更新: \(\hat{\mathbf{x}}_{t|t} = \hat{\mathbf{x}}_{t|t-1} + \mathbf{K}_t(\mathbf{z}_t - \mathbf{H}\hat{\mathbf{x}}_{t|t-1})\)
💻 コード例3: カルマンフィルタによる状態推定
from pykalman import KalmanFilter
# 真の状態(位置と速度)のシミュレーション
np.random.seed(42)
n_timesteps = 100
dt = 0.1
# 真の状態
true_position = np.zeros(n_timesteps)
true_velocity = np.zeros(n_timesteps)
true_position[0] = 0
true_velocity[0] = 1 # 初期速度
process_noise_std = 0.1
for t in range(1, n_timesteps):
# 真の運動方程式: x = x + v*dt, v = v + w
true_velocity[t] = true_velocity[t-1] + np.random.normal(0, process_noise_std)
true_position[t] = true_position[t-1] + true_velocity[t-1] * dt
# 観測データ(ノイズあり)
observation_noise_std = 0.5
observations = true_position + np.random.normal(0, observation_noise_std, n_timesteps)
# カルマンフィルタの設定
# 状態: [position, velocity]
# 観測: [position]
# 状態遷移行列 F
F = np.array([[1, dt],
[0, 1]])
# 観測行列 H
H = np.array([[1, 0]])
# プロセスノイズ共分散 Q
Q = np.array([[0.01, 0],
[0, process_noise_std**2]])
# 観測ノイズ共分散 R
R = np.array([[observation_noise_std**2]])
# 初期状態
initial_state_mean = np.array([0, 0])
initial_state_covariance = np.eye(2)
# カルマンフィルタ
kf = KalmanFilter(
transition_matrices=F,
observation_matrices=H,
transition_covariance=Q,
observation_covariance=R,
initial_state_mean=initial_state_mean,
initial_state_covariance=initial_state_covariance
)
# フィルタリング
state_means, state_covariances = kf.filter(observations)
# スムージング(全データを使った事後推定)
smoothed_state_means, smoothed_state_covariances = kf.smooth(observations)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) 位置の推定
time = np.arange(n_timesteps) * dt
axes[0, 0].plot(time, true_position, 'g-', linewidth=2.5, label='真の位置')
axes[0, 0].plot(time, observations, 'r.', markersize=6, alpha=0.5,
label='観測(ノイズあり)')
axes[0, 0].plot(time, state_means[:, 0], 'b-', linewidth=2,
label='カルマンフィルタ')
axes[0, 0].plot(time, smoothed_state_means[:, 0], 'c--', linewidth=2,
label='カルマンスムーザ')
axes[0, 0].set_xlabel('時間', fontsize=11)
axes[0, 0].set_ylabel('位置', fontsize=11)
axes[0, 0].set_title('カルマンフィルタによる位置推定', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) 速度の推定
axes[0, 1].plot(time, true_velocity, 'g-', linewidth=2.5, label='真の速度')
axes[0, 1].plot(time, state_means[:, 1], 'b-', linewidth=2,
label='カルマンフィルタ')
axes[0, 1].plot(time, smoothed_state_means[:, 1], 'c--', linewidth=2,
label='カルマンスムーザ')
axes[0, 1].set_xlabel('時間', fontsize=11)
axes[0, 1].set_ylabel('速度', fontsize=11)
axes[0, 1].set_title('速度の推定(観測なし)', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) 推定誤差
position_error = state_means[:, 0] - true_position
velocity_error = state_means[:, 1] - true_velocity
axes[1, 0].plot(time, position_error, linewidth=2, color='#667eea',
label='位置推定誤差')
axes[1, 0].plot(time, velocity_error, linewidth=2, color='#764ba2',
label='速度推定誤差')
axes[1, 0].axhline(y=0, color='red', linestyle='--', linewidth=2)
axes[1, 0].set_xlabel('時間', fontsize=11)
axes[1, 0].set_ylabel('推定誤差', fontsize=11)
axes[1, 0].set_title('推定誤差の時間推移', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) 推定誤差の共分散(不確実性)
position_std = np.sqrt([cov[0, 0] for cov in state_covariances])
axes[1, 1].plot(time, state_means[:, 0], 'b-', linewidth=2, label='推定位置')
axes[1, 1].fill_between(time,
state_means[:, 0] - 2*position_std,
state_means[:, 0] + 2*position_std,
alpha=0.3, color='blue', label='95%信頼区間')
axes[1, 1].plot(time, true_position, 'g--', linewidth=2, label='真の位置')
axes[1, 1].set_xlabel('時間', fontsize=11)
axes[1, 1].set_ylabel('位置', fontsize=11)
axes[1, 1].set_title('推定の不確実性', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("カルマンフィルタの性能:")
print(f"位置推定のRMSE: {np.sqrt(np.mean(position_error**2)):.4f}")
print(f"速度推定のRMSE: {np.sqrt(np.mean(velocity_error**2)):.4f}")
print(f"\n観測ノイズのRMSE: {observation_noise_std:.4f}")
print(f"フィルタによる改善率: {(1 - np.sqrt(np.mean(position_error**2))/observation_noise_std)*100:.1f}%")
5.4 品質管理と管理図
📐 定義: 管理図(Control Chart)
プロセスが統計的管理状態にあるかを監視するツール。 Xbar-R管理図:
プロセスが統計的管理状態にあるかを監視するツール。 Xbar-R管理図:
- 中心線(CL): \(\bar{\bar{X}} = \frac{1}{k}\sum_{i=1}^k \bar{X}_i\)
- 上方管理限界(UCL): \(\bar{\bar{X}} + A_2 \bar{R}\)
- 下方管理限界(LCL): \(\bar{\bar{X}} - A_2 \bar{R}\)
💻 コード例4: 管理図(Xbar-R chart, CUSUM)
# 製造プロセスの品質管理
np.random.seed(42)
# データ生成:正常時とシフト発生時
n_groups = 50
group_size = 5
# 正常時(最初の30グループ)
normal_data = np.random.normal(100, 2, (30, group_size))
# シフト発生(後半20グループ、平均が101.5にシフト)
shifted_data = np.random.normal(101.5, 2, (20, group_size))
data = np.vstack([normal_data, shifted_data])
# グループごとの平均と範囲
group_means = data.mean(axis=1)
group_ranges = data.max(axis=1) - data.min(axis=1)
# 管理限界の計算
CL_mean = group_means[:30].mean() # 正常時データから計算
R_bar = group_ranges[:30].mean()
# Xbar管理図の係数(サンプルサイズ5の場合)
A2 = 0.577
D3 = 0
D4 = 2.114
UCL_mean = CL_mean + A2 * R_bar
LCL_mean = CL_mean - A2 * R_bar
UCL_range = D4 * R_bar
LCL_range = D3 * R_bar
fig, axes = plt.subplots(3, 2, figsize=(14, 12))
# (1) Xbar管理図
axes[0, 0].plot(group_means, 'o-', linewidth=2, color='#667eea', markersize=6)
axes[0, 0].axhline(y=CL_mean, color='green', linestyle='-', linewidth=2,
label=f'CL={CL_mean:.2f}')
axes[0, 0].axhline(y=UCL_mean, color='red', linestyle='--', linewidth=2,
label=f'UCL={UCL_mean:.2f}')
axes[0, 0].axhline(y=LCL_mean, color='red', linestyle='--', linewidth=2,
label=f'LCL={LCL_mean:.2f}')
axes[0, 0].axvline(x=29.5, color='orange', linestyle=':', linewidth=2,
alpha=0.7, label='シフト発生点')
# 管理外れ点を強調
out_of_control = (group_means > UCL_mean) | (group_means < LCL_mean)
axes[0, 0].plot(np.where(out_of_control)[0], group_means[out_of_control],
'rx', markersize=12, markeredgewidth=3, label='管理外れ')
axes[0, 0].set_xlabel('グループ番号', fontsize=11)
axes[0, 0].set_ylabel('グループ平均', fontsize=11)
axes[0, 0].set_title('Xbar管理図', fontsize=12, fontweight='bold')
axes[0, 0].legend(fontsize=9)
axes[0, 0].grid(alpha=0.3)
# (2) R管理図
axes[0, 1].plot(group_ranges, 'o-', linewidth=2, color='#764ba2', markersize=6)
axes[0, 1].axhline(y=R_bar, color='green', linestyle='-', linewidth=2,
label=f'CL={R_bar:.2f}')
axes[0, 1].axhline(y=UCL_range, color='red', linestyle='--', linewidth=2,
label=f'UCL={UCL_range:.2f}')
axes[0, 1].axhline(y=LCL_range, color='red', linestyle='--', linewidth=2,
label=f'LCL={LCL_range:.2f}')
axes[0, 1].axvline(x=29.5, color='orange', linestyle=':', linewidth=2, alpha=0.7)
axes[0, 1].set_xlabel('グループ番号', fontsize=11)
axes[0, 1].set_ylabel('グループ範囲', fontsize=11)
axes[0, 1].set_title('R管理図(範囲)', fontsize=12, fontweight='bold')
axes[0, 1].legend(fontsize=9)
axes[0, 1].grid(alpha=0.3)
# (3) CUSUM管理図
mu_0 = CL_mean # 目標値
sigma = 2 # 標準偏差(既知と仮定)
k = 0.5 * sigma # 許容値(0.5σのシフトを検出)
h = 5 * sigma # 決定間隔(5σ)
# 上側CUSUMと下側CUSUM
C_plus = np.zeros(n_groups + 1)
C_minus = np.zeros(n_groups + 1)
for i in range(n_groups):
C_plus[i+1] = max(0, C_plus[i] + (group_means[i] - mu_0) - k)
C_minus[i+1] = max(0, C_minus[i] - (group_means[i] - mu_0) - k)
axes[1, 0].plot(C_plus[1:], 'o-', linewidth=2, color='red', markersize=6,
label='上側CUSUM')
axes[1, 0].plot(C_minus[1:], 'o-', linewidth=2, color='blue', markersize=6,
label='下側CUSUM')
axes[1, 0].axhline(y=h, color='red', linestyle='--', linewidth=2,
label=f'決定限界 h={h:.2f}')
axes[1, 0].axvline(x=29.5, color='orange', linestyle=':', linewidth=2, alpha=0.7)
# アラーム検出
alarm_plus = np.where(C_plus[1:] > h)[0]
if len(alarm_plus) > 0:
axes[1, 0].plot(alarm_plus[0], C_plus[alarm_plus[0]+1], 'r*',
markersize=20, label=f'アラーム(t={alarm_plus[0]})')
axes[1, 0].set_xlabel('グループ番号', fontsize=11)
axes[1, 0].set_ylabel('CUSUM', fontsize=11)
axes[1, 0].set_title('CUSUM管理図', fontsize=12, fontweight='bold')
axes[1, 0].legend(fontsize=9)
axes[1, 0].grid(alpha=0.3)
# (4) 個別値のヒストグラム(正常時 vs シフト時)
axes[1, 1].hist(normal_data.flatten(), bins=30, alpha=0.6, density=True,
color='green', edgecolor='black', label='正常時')
axes[1, 1].hist(shifted_data.flatten(), bins=30, alpha=0.6, density=True,
color='red', edgecolor='black', label='シフト後')
# 理論分布
x = np.linspace(94, 108, 1000)
axes[1, 1].plot(x, stats.norm.pdf(x, 100, 2), 'g-', linewidth=2.5,
label='N(100, 2²)')
axes[1, 1].plot(x, stats.norm.pdf(x, 101.5, 2), 'r-', linewidth=2.5,
label='N(101.5, 2²)')
axes[1, 1].set_xlabel('測定値', fontsize=11)
axes[1, 1].set_ylabel('密度', fontsize=11)
axes[1, 1].set_title('プロセスシフトの検出', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
# (5) 工程能力指数(Cpk)
# Cpk = min((USL - μ)/(3σ), (μ - LSL)/(3σ))
USL = 106 # 上限規格
LSL = 94 # 下限規格
# 正常時
Cp_normal = (USL - LSL) / (6 * sigma)
Cpk_normal = min((USL - CL_mean) / (3 * sigma),
(CL_mean - LSL) / (3 * sigma))
# シフト後
shifted_mean = shifted_data.mean()
Cpk_shifted = min((USL - shifted_mean) / (3 * sigma),
(shifted_mean - LSL) / (3 * sigma))
axes[2, 0].bar(['Cp\n(正常)', 'Cpk\n(正常)', 'Cpk\n(シフト後)'],
[Cp_normal, Cpk_normal, Cpk_shifted],
color=['green', 'blue', 'red'], alpha=0.7, edgecolor='black')
axes[2, 0].axhline(y=1.33, color='orange', linestyle='--', linewidth=2,
label='推奨値(1.33)')
axes[2, 0].axhline(y=1.00, color='red', linestyle='--', linewidth=2,
label='最低値(1.00)')
axes[2, 0].set_ylabel('工程能力指数', fontsize=11)
axes[2, 0].set_title('工程能力指数(Cp, Cpk)', fontsize=12, fontweight='bold')
axes[2, 0].legend()
axes[2, 0].grid(axis='y', alpha=0.3)
# (6) 管理図の性能比較(検出時間)
axes[2, 1].text(0.1, 0.8, f'シフト発生: グループ30', fontsize=12, transform=axes[2, 1].transAxes)
axes[2, 1].text(0.1, 0.7, f'\nXbar管理図:', fontsize=12, fontweight='bold',
transform=axes[2, 1].transAxes)
first_out = np.where(out_of_control)[0]
if len(first_out) > 0:
axes[2, 1].text(0.1, 0.6, f' 初回検出: グループ{first_out[0]+1}',
fontsize=11, transform=axes[2, 1].transAxes)
axes[2, 1].text(0.1, 0.5, f' 検出遅れ: {first_out[0]-29}グループ',
fontsize=11, transform=axes[2, 1].transAxes)
axes[2, 1].text(0.1, 0.4, f'\nCUSUM管理図:', fontsize=12, fontweight='bold',
transform=axes[2, 1].transAxes)
if len(alarm_plus) > 0:
axes[2, 1].text(0.1, 0.3, f' 初回検出: グループ{alarm_plus[0]+1}',
fontsize=11, transform=axes[2, 1].transAxes)
axes[2, 1].text(0.1, 0.2, f' 検出遅れ: {alarm_plus[0]-29}グループ',
fontsize=11, transform=axes[2, 1].transAxes)
axes[2, 1].axis('off')
axes[2, 1].set_title('検出性能の比較', fontsize=12, fontweight='bold')
plt.tight_layout()
plt.show()
print("品質管理指標:")
print(f"工程能力指数 Cp (正常時): {Cp_normal:.4f}")
print(f"工程能力指数 Cpk (正常時): {Cpk_normal:.4f}")
print(f"工程能力指数 Cpk (シフト後): {Cpk_shifted:.4f}")
print(f"\n管理外れ点数(Xbar図): {out_of_control.sum()} / {n_groups}")
5.5 モンテカルロシミュレーションによるプロセス最適化
💻 コード例5: モンテカルロシミュレーションによるプロセス最適化
# プロセス最適化問題:収率最大化
np.random.seed(42)
# プロセスパラメータ(不確実性あり)
def process_yield(temperature, pressure, time):
"""
収率計算モデル(簡略化)
temperature: 温度(°C)
pressure: 圧力(MPa)
time: 反応時間(時間)
"""
# 理想的な収率モデル
T_opt = 350
P_opt = 2.0
t_opt = 4.0
# 2次関数による収率モデル
yield_rate = 0.9 - 0.001*(temperature - T_opt)**2 \
- 0.05*(pressure - P_opt)**2 \
- 0.01*(time - t_opt)**2
# ランダムノイズ(プロセス変動)
noise = np.random.normal(0, 0.02)
return max(0, min(1, yield_rate + noise))
# モンテカルロシミュレーション
n_simulations = 10000
# パラメータ設定(不確実性を考慮)
temperature_mean = 350
temperature_std = 5
pressure_mean = 2.0
pressure_std = 0.1
time_mean = 4.0
time_std = 0.2
# シミュレーション実行
yields = []
temperatures = []
pressures = []
times = []
for _ in range(n_simulations):
T = np.random.normal(temperature_mean, temperature_std)
P = np.random.normal(pressure_mean, pressure_std)
t = np.random.normal(time_mean, time_std)
y = process_yield(T, P, t)
yields.append(y)
temperatures.append(T)
pressures.append(P)
times.append(t)
yields = np.array(yields)
temperatures = np.array(temperatures)
pressures = np.array(pressures)
times = np.array(times)
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
# (1) 収率の分布
axes[0, 0].hist(yields, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
axes[0, 0].axvline(yields.mean(), color='red', linestyle='--', linewidth=2,
label=f'平均={yields.mean():.4f}')
axes[0, 0].axvline(np.percentile(yields, 5), color='orange', linestyle=':',
linewidth=2, label=f'5%点={np.percentile(yields, 5):.4f}')
axes[0, 0].set_xlabel('収率', fontsize=11)
axes[0, 0].set_ylabel('密度', fontsize=11)
axes[0, 0].set_title('収率の分布', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) 温度 vs 収率
axes[0, 1].scatter(temperatures, yields, alpha=0.3, s=10, color='#667eea')
axes[0, 1].set_xlabel('温度 (°C)', fontsize=11)
axes[0, 1].set_ylabel('収率', fontsize=11)
axes[0, 1].set_title('温度と収率の関係', fontsize=12, fontweight='bold')
axes[0, 1].grid(alpha=0.3)
# 感度分析(線形回帰)
from scipy.stats import pearsonr
corr_T, _ = pearsonr(temperatures, yields)
axes[0, 1].text(0.05, 0.95, f'相関係数: {corr_T:.4f}',
transform=axes[0, 1].transAxes, fontsize=10,
verticalalignment='top')
# (3) 圧力 vs 収率
axes[0, 2].scatter(pressures, yields, alpha=0.3, s=10, color='#764ba2')
axes[0, 2].set_xlabel('圧力 (MPa)', fontsize=11)
axes[0, 2].set_ylabel('収率', fontsize=11)
axes[0, 2].set_title('圧力と収率の関係', fontsize=12, fontweight='bold')
axes[0, 2].grid(alpha=0.3)
corr_P, _ = pearsonr(pressures, yields)
axes[0, 2].text(0.05, 0.95, f'相関係数: {corr_P:.4f}',
transform=axes[0, 2].transAxes, fontsize=10,
verticalalignment='top')
# (4) リスク分析(収率 < 0.8 の確率)
threshold = 0.8
risk_probability = (yields < threshold).sum() / n_simulations
axes[1, 0].hist(yields, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
axes[1, 0].axvline(threshold, color='red', linestyle='--', linewidth=2.5,
label=f'閾値={threshold}')
# リスク領域を塗りつぶし
x_fill = np.linspace(yields.min(), threshold, 100)
axes[1, 0].fill_betweenx([0, 10], threshold, yields.min(),
alpha=0.3, color='red')
axes[1, 0].set_xlabel('収率', fontsize=11)
axes[1, 0].set_ylabel('密度', fontsize=11)
axes[1, 0].set_title(f'リスク分析(収率<{threshold}の確率: {risk_probability:.2%})',
fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
axes[1, 0].set_ylim([0, 20])
# (5) パラメータ感度分析
sensitivities = {
'温度': corr_T,
'圧力': corr_P,
'時間': pearsonr(times, yields)[0]
}
axes[1, 1].barh(list(sensitivities.keys()), list(sensitivities.values()),
color='#667eea', alpha=0.7, edgecolor='black')
axes[1, 1].axvline(x=0, color='black', linestyle='-', linewidth=0.8)
axes[1, 1].set_xlabel('相関係数', fontsize=11)
axes[1, 1].set_title('パラメータ感度分析', fontsize=12, fontweight='bold')
axes[1, 1].grid(alpha=0.3)
# (6) 最適化シナリオの比較
scenarios = {
'現状': {'T': (350, 5), 'P': (2.0, 0.1), 't': (4.0, 0.2)},
'温度最適化': {'T': (350, 2), 'P': (2.0, 0.1), 't': (4.0, 0.2)},
'圧力最適化': {'T': (350, 5), 'P': (2.0, 0.05), 't': (4.0, 0.2)},
'総合最適化': {'T': (350, 2), 'P': (2.0, 0.05), 't': (4.0, 0.1)}
}
scenario_yields = {}
for name, params in scenarios.items():
yields_scenario = []
for _ in range(5000):
T = np.random.normal(*params['T'])
P = np.random.normal(*params['P'])
t = np.random.normal(*params['t'])
yields_scenario.append(process_yield(T, P, t))
scenario_yields[name] = np.array(yields_scenario)
axes[1, 2].boxplot([scenario_yields[name] for name in scenarios.keys()],
labels=list(scenarios.keys()), patch_artist=True)
axes[1, 2].set_ylabel('収率', fontsize=11)
axes[1, 2].set_title('最適化シナリオの比較', fontsize=12, fontweight='bold')
axes[1, 2].grid(axis='y', alpha=0.3)
plt.setp(axes[1, 2].xaxis.get_majorticklabels(), rotation=45, ha='right')
plt.tight_layout()
plt.show()
print("モンテカルロシミュレーション結果:")
print(f"シミュレーション回数: {n_simulations}")
print(f"\n収率統計:")
print(f" 平均: {yields.mean():.4f}")
print(f" 標準偏差: {yields.std():.4f}")
print(f" 最小値: {yields.min():.4f}")
print(f" 最大値: {yields.max():.4f}")
print(f" 5%点: {np.percentile(yields, 5):.4f}")
print(f" 95%点: {np.percentile(yields, 95):.4f}")
print(f"\nリスク評価:")
print(f" 収率<{threshold}の確率: {risk_probability:.2%}")
print("\n最適化シナリオ比較:")
for name in scenarios.keys():
mean_yield = scenario_yields[name].mean()
std_yield = scenario_yields[name].std()
print(f" {name}: 平均={mean_yield:.4f}, σ={std_yield:.4f}")
5.6 信頼性工学(Weibull分布)
💻 コード例6: 信頼性工学(Weibull分布)
# Weibull分布による信頼性解析
from scipy.stats import weibull_min
np.random.seed(42)
# 故障データのシミュレーション(Weibull分布)
shape_param = 2.5 # β(形状パラメータ)
scale_param = 1000 # η(尺度パラメータ、特性寿命)
weibull_dist = weibull_min(c=shape_param, scale=scale_param)
# 故障時間データ
n_samples = 100
failure_times = weibull_dist.rvs(n_samples)
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
# (1) 故障時間の分布
axes[0, 0].hist(failure_times, bins=30, density=True, alpha=0.6,
color='#667eea', edgecolor='black', label='実測データ')
t = np.linspace(0, failure_times.max(), 1000)
pdf_theory = weibull_dist.pdf(t)
axes[0, 0].plot(t, pdf_theory, 'r-', linewidth=2.5,
label=f'Weibull(β={shape_param}, η={scale_param})')
axes[0, 0].set_xlabel('故障時間(時間)', fontsize=11)
axes[0, 0].set_ylabel('確率密度', fontsize=11)
axes[0, 0].set_title('故障時間の分布', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) 信頼度関数 R(t) = 1 - F(t)
cdf_theory = weibull_dist.cdf(t)
reliability = 1 - cdf_theory
axes[0, 1].plot(t, reliability, linewidth=2.5, color='#667eea')
axes[0, 1].axhline(y=0.5, color='red', linestyle='--', linewidth=2,
label='R(t)=0.5')
# 平均寿命(MTTF)
mttf = weibull_dist.mean()
axes[0, 1].axvline(x=mttf, color='green', linestyle='--', linewidth=2,
label=f'MTTF={mttf:.0f}h')
axes[0, 1].set_xlabel('時間(時間)', fontsize=11)
axes[0, 1].set_ylabel('信頼度 R(t)', fontsize=11)
axes[0, 1].set_title('信頼度関数', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) ハザード関数(故障率) h(t) = f(t) / R(t)
hazard_rate = pdf_theory / (reliability + 1e-10)
axes[0, 2].plot(t, hazard_rate, linewidth=2.5, color='#764ba2')
axes[0, 2].set_xlabel('時間(時間)', fontsize=11)
axes[0, 2].set_ylabel('ハザード率 h(t)', fontsize=11)
axes[0, 2].set_title('ハザード関数(故障率)', fontsize=12, fontweight='bold')
axes[0, 2].grid(alpha=0.3)
# β>1: 摩耗故障(故障率増加)
axes[0, 2].text(0.5, 0.9, f'β={shape_param} > 1: 摩耗故障型',
transform=axes[0, 2].transAxes, fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
# (4) Weibullプロット(確率紙)
# ln(-ln(1-F(t))) vs ln(t) が直線になる
sorted_times = np.sort(failure_times)
n = len(sorted_times)
empirical_cdf = np.arange(1, n+1) / (n+1)
# Weibull変換
y_weibull = np.log(-np.log(1 - empirical_cdf))
x_weibull = np.log(sorted_times)
# 線形回帰
from scipy.stats import linregress
slope, intercept, r_value, _, _ = linregress(x_weibull, y_weibull)
# パラメータ推定
beta_estimated = slope
eta_estimated = np.exp(-intercept / slope)
axes[1, 0].plot(x_weibull, y_weibull, 'o', color='#667eea', markersize=5,
label='実測データ')
axes[1, 0].plot(x_weibull, slope*x_weibull + intercept, 'r-', linewidth=2,
label=f'フィット: β={beta_estimated:.2f}, η={eta_estimated:.0f}')
axes[1, 0].set_xlabel('ln(時間)', fontsize=11)
axes[1, 0].set_ylabel('ln(-ln(1-F(t)))', fontsize=11)
axes[1, 0].set_title(f'Weibullプロット (R²={r_value**2:.4f})',
fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (5) 保全計画(予防保全 vs 事後保全)
# コストモデル
C_preventive = 100 # 予防保全コスト
C_corrective = 500 # 事後保全コスト(故障修理)
# 予防保全間隔を変化させる
T_preventive = np.linspace(100, 2000, 100)
expected_costs = []
for T in T_preventive:
# 予防保全までの故障確率
F_T = weibull_dist.cdf(T)
# 期待コスト(1サイクルあたり)
# E[コスト] = C_preventive + C_corrective * F(T)
expected_cost = (C_preventive + C_corrective * F_T) / T
expected_costs.append(expected_cost)
expected_costs = np.array(expected_costs)
optimal_T = T_preventive[np.argmin(expected_costs)]
axes[1, 1].plot(T_preventive, expected_costs, linewidth=2.5, color='#667eea')
axes[1, 1].axvline(x=optimal_T, color='red', linestyle='--', linewidth=2,
label=f'最適間隔={optimal_T:.0f}h')
axes[1, 1].scatter([optimal_T], [expected_costs.min()], color='red',
s=100, zorder=5)
axes[1, 1].set_xlabel('予防保全間隔(時間)', fontsize=11)
axes[1, 1].set_ylabel('期待コスト(/時間)', fontsize=11)
axes[1, 1].set_title('保全計画の最適化', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
# (6) 異なる形状パラメータでの比較
betas = [0.5, 1.0, 2.0, 3.0]
colors = ['blue', 'green', 'orange', 'red']
for beta, color in zip(betas, colors):
dist = weibull_min(c=beta, scale=1000)
t = np.linspace(0, 3000, 1000)
pdf = dist.pdf(t)
axes[1, 2].plot(t, pdf, linewidth=2.5, color=color, label=f'β={beta}')
axes[1, 2].set_xlabel('時間', fontsize=11)
axes[1, 2].set_ylabel('確率密度', fontsize=11)
axes[1, 2].set_title('形状パラメータβの影響', fontsize=12, fontweight='bold')
axes[1, 2].legend()
axes[1, 2].grid(alpha=0.3)
# 故障モードの解説
axes[1, 2].text(0.55, 0.9, 'β<1: 初期故障型',
transform=axes[1, 2].transAxes, fontsize=9)
axes[1, 2].text(0.55, 0.8, 'β=1: ランダム故障型',
transform=axes[1, 2].transAxes, fontsize=9)
axes[1, 2].text(0.55, 0.7, 'β>1: 摩耗故障型',
transform=axes[1, 2].transAxes, fontsize=9)
plt.tight_layout()
plt.show()
print("Weibull分布による信頼性解析:")
print(f"真のパラメータ: β={shape_param}, η={scale_param}")
print(f"推定パラメータ: β={beta_estimated:.2f}, η={eta_estimated:.0f}")
print(f"\n信頼性指標:")
print(f" 平均寿命(MTTF): {mttf:.2f} 時間")
print(f" B10寿命(10%故障時間): {weibull_dist.ppf(0.1):.2f} 時間")
print(f" 中央値寿命: {weibull_dist.median():.2f} 時間")
print(f"\n最適保全計画:")
print(f" 最適予防保全間隔: {optimal_T:.0f} 時間")
print(f" 最小期待コスト: {expected_costs.min():.4f} /時間")
5.7 予知保全のための故障予測モデル
💻 コード例7: 予知保全のための故障予測モデル
# 予知保全:劣化データからの故障予測
np.random.seed(42)
# 劣化データのシミュレーション(時間とともに増加)
n_units = 20 # 装置数
max_time = 200 # 観測期間
# 劣化モデル: X(t) = a*t + b*t^2 + noise
def degradation_model(t, a=0.1, b=0.001, sigma=0.5):
"""劣化モデル(2次関数 + ノイズ)"""
return a * t + b * t**2 + np.random.normal(0, sigma)
# 複数装置の劣化軌跡をシミュレーション
time_points = np.linspace(0, max_time, 50)
degradation_data = []
for i in range(n_units):
# パラメータにばらつきを持たせる
a = np.random.normal(0.1, 0.02)
b = np.random.normal(0.001, 0.0002)
degradation = [degradation_model(t, a, b) for t in time_points]
degradation_data.append(degradation)
degradation_data = np.array(degradation_data)
# 故障閾値
failure_threshold = 30
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) 劣化軌跡
for i in range(n_units):
axes[0, 0].plot(time_points, degradation_data[i], alpha=0.5, linewidth=1.5)
axes[0, 0].axhline(y=failure_threshold, color='red', linestyle='--',
linewidth=2.5, label=f'故障閾値={failure_threshold}')
# 平均劣化軌跡
mean_degradation = degradation_data.mean(axis=0)
axes[0, 0].plot(time_points, mean_degradation, 'b-', linewidth=3,
label='平均劣化軌跡')
axes[0, 0].set_xlabel('時間', fontsize=11)
axes[0, 0].set_ylabel('劣化レベル', fontsize=11)
axes[0, 0].set_title('装置の劣化軌跡', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) 残存寿命(RUL: Remaining Useful Life)の予測
current_time = 100
current_degradation = degradation_data[:, np.argmin(np.abs(time_points - current_time))]
# 線形回帰による劣化速度の推定
predicted_ruls = []
for i in range(n_units):
# 過去データから劣化速度を推定
past_times = time_points[time_points <= current_time]
past_degradation = degradation_data[i, :len(past_times)]
# 線形回帰
slope, intercept, _, _, _ = linregress(past_times, past_degradation)
# 故障時間の予測: failure_threshold = slope * t_failure + intercept
if slope > 0:
t_failure = (failure_threshold - intercept) / slope
rul = max(0, t_failure - current_time)
else:
rul = np.inf
predicted_ruls.append(rul)
predicted_ruls = np.array(predicted_ruls)
predicted_ruls = predicted_ruls[predicted_ruls < 500] # 外れ値除去
axes[0, 1].hist(predicted_ruls, bins=20, alpha=0.6, color='#667eea',
edgecolor='black')
axes[0, 1].axvline(predicted_ruls.mean(), color='red', linestyle='--',
linewidth=2, label=f'平均RUL={predicted_ruls.mean():.1f}')
axes[0, 1].set_xlabel('残存寿命(RUL)', fontsize=11)
axes[0, 1].set_ylabel('頻度', fontsize=11)
axes[0, 1].set_title(f't={current_time}での残存寿命分布',
fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) 予知保全の意思決定
# コスト計算
maintenance_cost = 50
failure_cost = 200
inspection_cost = 10
# 保全戦略の比較
strategies = {
'事後保全': {'action': 'corrective', 'threshold': failure_threshold},
'時間基準保全': {'action': 'time-based', 'interval': 80},
'状態基準保全': {'action': 'condition-based', 'threshold': 25},
'予知保全': {'action': 'predictive', 'rul_threshold': 20}
}
# シミュレーションによるコスト評価
def simulate_maintenance_cost(degradation_data, time_points, strategy, n_sims=100):
"""保全戦略のコストシミュレーション"""
total_cost = 0
for sim in range(n_sims):
current_deg = 0
current_time = 0
cost = 0
for t_idx, t in enumerate(time_points):
current_deg = degradation_data[sim % len(degradation_data), t_idx]
if strategy['action'] == 'corrective':
if current_deg >= failure_threshold:
cost += failure_cost
current_deg = 0
elif strategy['action'] == 'time-based':
if t % strategy['interval'] == 0 and t > 0:
cost += maintenance_cost
elif strategy['action'] == 'condition-based':
cost += inspection_cost
if current_deg >= strategy['threshold']:
cost += maintenance_cost
current_deg = 0
elif strategy['action'] == 'predictive':
# RUL推定と保全判断(簡略化)
if current_deg >= 20: # 劣化が進んだら予測
cost += inspection_cost
estimated_rul = (failure_threshold - current_deg) / 0.2
if estimated_rul < strategy['rul_threshold']:
cost += maintenance_cost
current_deg = 0
total_cost += cost
return total_cost / n_sims
strategy_costs = {}
for name, strategy in strategies.items():
avg_cost = simulate_maintenance_cost(degradation_data, time_points, strategy)
strategy_costs[name] = avg_cost
axes[1, 0].bar(strategy_costs.keys(), strategy_costs.values(),
color=['red', 'orange', 'yellow', 'green'], alpha=0.7,
edgecolor='black')
axes[1, 0].set_ylabel('期待総コスト', fontsize=11)
axes[1, 0].set_title('保全戦略のコスト比較', fontsize=12, fontweight='bold')
axes[1, 0].grid(axis='y', alpha=0.3)
plt.setp(axes[1, 0].xaxis.get_majorticklabels(), rotation=45, ha='right')
# 最適戦略を強調
min_cost_strategy = min(strategy_costs, key=strategy_costs.get)
min_cost_idx = list(strategy_costs.keys()).index(min_cost_strategy)
axes[1, 0].get_children()[min_cost_idx].set_edgecolor('blue')
axes[1, 0].get_children()[min_cost_idx].set_linewidth(3)
# (4) 予知保全のROI(投資対効果)
preventive_savings = strategy_costs['事後保全'] - strategy_costs['予知保全']
inspection_cost_total = inspection_cost * len(time_points)
roi = (preventive_savings / inspection_cost_total) * 100
axes[1, 1].text(0.5, 0.7, '予知保全の投資対効果',
ha='center', fontsize=14, fontweight='bold',
transform=axes[1, 1].transAxes)
axes[1, 1].text(0.5, 0.5, f'事後保全コスト: {strategy_costs["事後保全"]:.2f}',
ha='center', fontsize=12, transform=axes[1, 1].transAxes)
axes[1, 1].text(0.5, 0.4, f'予知保全コスト: {strategy_costs["予知保全"]:.2f}',
ha='center', fontsize=12, transform=axes[1, 1].transAxes)
axes[1, 1].text(0.5, 0.3, f'コスト削減: {preventive_savings:.2f}',
ha='center', fontsize=12, color='green',
fontweight='bold', transform=axes[1, 1].transAxes)
axes[1, 1].text(0.5, 0.2, f'ROI: {roi:.1f}%',
ha='center', fontsize=14, color='blue',
fontweight='bold', transform=axes[1, 1].transAxes)
axes[1, 1].axis('off')
plt.tight_layout()
plt.show()
print("予知保全の効果:")
print("="*60)
for name, cost in strategy_costs.items():
print(f"{name:15s}: {cost:8.2f}")
print(f"\n予知保全によるコスト削減: {preventive_savings:.2f}")
print(f"ROI: {roi:.1f}%")
💡 Note: 予知保全(Predictive Maintenance)は、IoTセンサーと機械学習を組み合わせて、装置の故障を事前に予測し、最適なタイミングで保全を実施する手法です。事後保全(壊れてから修理)や時間基準保全(定期保全)と比較して、大幅なコスト削減と稼働率向上が可能です。
演習問題
📝 演習1: ARIMA モデルによる予測
時系列データ(トレンド + 季節性 + ノイズ)を生成し:
時系列データ(トレンド + 季節性 + ノイズ)を生成し:
- ACF/PACFプロットからARIMAモデルの次数(p,d,q)を決定せよ
- ARIMAモデルをフィッティングし、50ステップ先まで予測せよ
- 残差がホワイトノイズであることをLjung-Box検定で確認せよ
📝 演習2: カルマンフィルタの実装
加速度計のノイズデータから速度と位置を推定する:
加速度計のノイズデータから速度と位置を推定する:
- 状態空間モデル(位置、速度、加速度)を定義せよ
- カルマンフィルタを実装し、ノイズのある加速度から位置を推定せよ
- 観測ノイズの大きさを変化させ、推定精度への影響を調べよ
📝 演習3: 予知保全システムの設計
振動センサーデータから軸受の故障を予測するシステムを設計せよ:
振動センサーデータから軸受の故障を予測するシステムを設計せよ:
- 劣化モデル(振動振幅の時間増加)を定義し、データを生成せよ
- 残存寿命(RUL)を推定するアルゴリズムを実装せよ
- 異なる保全戦略(事後、時間基準、状態基準、予知)のコストを比較せよ