5.1 Probabilistic Modeling of Time Series Data
📐 Definition: Time Series Data and Autocorrelation
Time series data \(\{X_t\}_{t=1,2,\ldots,n}\) is a data sequence ordered by time. Autocorrelation Function (ACF): \[\rho(k) = \frac{Cov(X_t, X_{t-k})}{\sqrt{Var(X_t) Var(X_{t-k})}}\] Partial Autocorrelation Function (PACF): Direct correlation at lag k (removing effects of intermediate lags) These are used for time series model selection.
Time series data \(\{X_t\}_{t=1,2,\ldots,n}\) is a data sequence ordered by time. Autocorrelation Function (ACF): \[\rho(k) = \frac{Cov(X_t, X_{t-k})}{\sqrt{Var(X_t) Var(X_{t-k})}}\] Partial Autocorrelation Function (PACF): Direct correlation at lag k (removing effects of intermediate lags) These are used for time series model selection.
💻 Code Example 1: Probabilistic Modeling of Time Series Data
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import acf, pacf
# Synthetic data: Manufacturing process temperature data
np.random.seed(42)
n_obs = 500
# Trend + Seasonality + Noise
time = np.arange(n_obs)
trend = 100 + 0.02 * time # Linear trend
seasonality = 5 * np.sin(2 * np.pi * time / 50) # Period 50 seasonality
noise = np.random.normal(0, 2, n_obs) # White noise
temperature = trend + seasonality + noise
fig, axes = plt.subplots(3, 2, figsize=(14, 12))
# (1) Time series plot
axes[0, 0].plot(time, temperature, linewidth=1.5, color='#667eea')
axes[0, 0].plot(time, trend, '--', linewidth=2, color='red', label='Trend')
axes[0, 0].plot(time, trend + seasonality, '--', linewidth=2, color='green',
label='Trend+Seasonality')
axes[0, 0].set_xlabel('Time', fontsize=11)
axes[0, 0].set_ylabel('Temperature (°C)', fontsize=11)
axes[0, 0].set_title('Manufacturing Process Temperature Time Series', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) Histogram
axes[0, 1].hist(temperature, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
axes[0, 1].set_xlabel('Temperature (°C)', fontsize=11)
axes[0, 1].set_ylabel('Density', fontsize=11)
axes[0, 1].set_title('Temperature Distribution', fontsize=12, fontweight='bold')
axes[0, 1].grid(alpha=0.3)
# (3) Autocorrelation Function (ACF)
lags = 50
acf_values = acf(temperature, nlags=lags)
axes[1, 0].stem(range(lags+1), acf_values, linefmt='#667eea', markerfmt='o',
basefmt=' ')
axes[1, 0].axhline(y=0, color='black', linestyle='-', linewidth=0.8)
axes[1, 0].axhline(y=1.96/np.sqrt(n_obs), color='red', linestyle='--',
linewidth=1.5, label='95% Confidence Interval')
axes[1, 0].axhline(y=-1.96/np.sqrt(n_obs), color='red', linestyle='--', linewidth=1.5)
axes[1, 0].set_xlabel('Lag', fontsize=11)
axes[1, 0].set_ylabel('ACF', fontsize=11)
axes[1, 0].set_title('Autocorrelation Function (ACF)', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) Partial Autocorrelation Function (PACF)
pacf_values = pacf(temperature, nlags=lags)
axes[1, 1].stem(range(lags+1), pacf_values, linefmt='#764ba2', markerfmt='o',
basefmt=' ')
axes[1, 1].axhline(y=0, color='black', linestyle='-', linewidth=0.8)
axes[1, 1].axhline(y=1.96/np.sqrt(n_obs), color='red', linestyle='--', linewidth=1.5)
axes[1, 1].axhline(y=-1.96/np.sqrt(n_obs), color='red', linestyle='--', linewidth=1.5)
axes[1, 1].set_xlabel('Lag', fontsize=11)
axes[1, 1].set_ylabel('PACF', fontsize=11)
axes[1, 1].set_title('Partial Autocorrelation Function (PACF)', fontsize=12, fontweight='bold')
axes[1, 1].grid(alpha=0.3)
# (5) Differenced series (detrended)
diff_temp = np.diff(temperature)
axes[2, 0].plot(diff_temp, linewidth=1.5, color='#667eea')
axes[2, 0].axhline(y=0, color='red', linestyle='--', linewidth=2)
axes[2, 0].set_xlabel('Time', fontsize=11)
axes[2, 0].set_ylabel('Difference ΔT', fontsize=11)
axes[2, 0].set_title('First Difference Series (After Detrending)', fontsize=12, fontweight='bold')
axes[2, 0].grid(alpha=0.3)
# (6) ACF of differenced series
diff_acf = acf(diff_temp, nlags=lags)
axes[2, 1].stem(range(lags+1), diff_acf, linefmt='#667eea', markerfmt='o',
basefmt=' ')
axes[2, 1].axhline(y=0, color='black', linestyle='-', linewidth=0.8)
axes[2, 1].axhline(y=1.96/np.sqrt(len(diff_temp)), color='red', linestyle='--', linewidth=1.5)
axes[2, 1].axhline(y=-1.96/np.sqrt(len(diff_temp)), color='red', linestyle='--', linewidth=1.5)
axes[2, 1].set_xlabel('Lag', fontsize=11)
axes[2, 1].set_ylabel('ACF', fontsize=11)
axes[2, 1].set_title('ACF of Differenced Series', fontsize=12, fontweight='bold')
axes[2, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# Statistical summary
print("Time Series Data Statistics:")
print(f"Mean: {temperature.mean():.2f} °C")
print(f"Standard Deviation: {temperature.std():.2f} °C")
print(f"Minimum: {temperature.min():.2f} °C")
print(f"Maximum: {temperature.max():.2f} °C")
print(f"\nDifferenced Series Statistics:")
print(f"Mean: {diff_temp.mean():.4f}")
print(f"Standard Deviation: {diff_temp.std():.2f}")
5.2 ARMA/ARIMA Models
📊 Theorem: ARMA/ARIMA Models
AR(p) Model (Autoregressive): \[X_t = c + \sum_{i=1}^p \phi_i X_{t-i} + \epsilon_t\] MA(q) Model (Moving Average): \[X_t = \mu + \epsilon_t + \sum_{i=1}^q \theta_i \epsilon_{t-i}\] ARMA(p,q) Model: \[X_t = c + \sum_{i=1}^p \phi_i X_{t-i} + \epsilon_t + \sum_{i=1}^q \theta_i \epsilon_{t-i}\] ARIMA(p,d,q) Model: ARMA(p,q) applied to a series differenced d times
AR(p) Model (Autoregressive): \[X_t = c + \sum_{i=1}^p \phi_i X_{t-i} + \epsilon_t\] MA(q) Model (Moving Average): \[X_t = \mu + \epsilon_t + \sum_{i=1}^q \theta_i \epsilon_{t-i}\] ARMA(p,q) Model: \[X_t = c + \sum_{i=1}^p \phi_i X_{t-i} + \epsilon_t + \sum_{i=1}^q \theta_i \epsilon_{t-i}\] ARIMA(p,d,q) Model: ARMA(p,q) applied to a series differenced d times
💻 Code Example 2: ARMA/ARIMA Models
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
# ARIMA model fitting
# First test for stationarity (Augmented Dickey-Fuller test)
adf_result = adfuller(temperature)
print("Augmented Dickey-Fuller Test:")
print(f"ADF Statistic: {adf_result[0]:.4f}")
print(f"p-value: {adf_result[1]:.4f}")
print(f"Conclusion: {'Stationary' if adf_result[1] < 0.05 else 'Non-stationary'}")
# Fitting ARIMA(1,1,1) model
model = ARIMA(temperature, order=(1, 1, 1))
model_fit = model.fit()
print("\n" + "="*60)
print(model_fit.summary())
# Forecasting
n_forecast = 50
forecast = model_fit.forecast(steps=n_forecast)
forecast_index = np.arange(n_obs, n_obs + n_forecast)
# Confidence intervals
forecast_result = model_fit.get_forecast(steps=n_forecast)
forecast_ci = forecast_result.conf_int()
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) Fitting and forecasting
axes[0, 0].plot(time, temperature, linewidth=1.5, color='#667eea',
label='Observed')
axes[0, 0].plot(time, model_fit.fittedvalues, '--', linewidth=2, color='red',
label='Fitted', alpha=0.7)
axes[0, 0].plot(forecast_index, forecast, linewidth=2, color='green',
label='Forecast')
axes[0, 0].fill_between(forecast_index,
forecast_ci.iloc[:, 0],
forecast_ci.iloc[:, 1],
alpha=0.2, color='green', label='95% Confidence Interval')
axes[0, 0].set_xlabel('Time', fontsize=11)
axes[0, 0].set_ylabel('Temperature (°C)', fontsize=11)
axes[0, 0].set_title('ARIMA Model Fitting and Forecasting', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) Residual plot
residuals = model_fit.resid
axes[0, 1].plot(residuals, linewidth=1.5, color='#667eea')
axes[0, 1].axhline(y=0, color='red', linestyle='--', linewidth=2)
axes[0, 1].set_xlabel('Time', fontsize=11)
axes[0, 1].set_ylabel('Residuals', fontsize=11)
axes[0, 1].set_title('Residual Plot', fontsize=12, fontweight='bold')
axes[0, 1].grid(alpha=0.3)
# (3) Histogram of residuals
axes[1, 0].hist(residuals, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
# Normal distribution fit
x = np.linspace(residuals.min(), residuals.max(), 1000)
mu, sigma = residuals.mean(), residuals.std()
axes[1, 0].plot(x, stats.norm.pdf(x, mu, sigma), 'r-', linewidth=2.5,
label=f'N({mu:.2f}, {sigma:.2f}²)')
axes[1, 0].set_xlabel('Residuals', fontsize=11)
axes[1, 0].set_ylabel('Density', fontsize=11)
axes[1, 0].set_title('Residual Distribution', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) ACF of residuals
residual_acf = acf(residuals, nlags=40)
axes[1, 1].stem(range(len(residual_acf)), residual_acf, linefmt='#667eea',
markerfmt='o', basefmt=' ')
axes[1, 1].axhline(y=0, color='black', linestyle='-', linewidth=0.8)
axes[1, 1].axhline(y=1.96/np.sqrt(len(residuals)), color='red',
linestyle='--', linewidth=1.5, label='95% Confidence Interval')
axes[1, 1].axhline(y=-1.96/np.sqrt(len(residuals)), color='red',
linestyle='--', linewidth=1.5)
axes[1, 1].set_xlabel('Lag', fontsize=11)
axes[1, 1].set_ylabel('ACF', fontsize=11)
axes[1, 1].set_title('ACF of Residuals (White Noise Test)', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# Model diagnostics
from statsmodels.stats.diagnostic import acorr_ljungbox
lb_test = acorr_ljungbox(residuals, lags=20, return_df=True)
print("\n\nLjung-Box Test (Residual Independence):")
print(lb_test.head(10))
5.3 State Estimation with Kalman Filter
📐 Definition: Kalman Filter
A recursive algorithm that optimally estimates the state of a linear dynamic system from noisy observations. State-Space Model: \[\mathbf{x}_{t} = \mathbf{F}\mathbf{x}_{t-1} + \mathbf{w}_t \quad (\text{State equation})\] \[\mathbf{z}_t = \mathbf{H}\mathbf{x}_t + \mathbf{v}_t \quad (\text{Observation equation})\] where \(\mathbf{w}_t \sim N(0, \mathbf{Q})\) (process noise), \(\mathbf{v}_t \sim N(0, \mathbf{R})\) (observation noise) Kalman Filter Update Equations:
A recursive algorithm that optimally estimates the state of a linear dynamic system from noisy observations. State-Space Model: \[\mathbf{x}_{t} = \mathbf{F}\mathbf{x}_{t-1} + \mathbf{w}_t \quad (\text{State equation})\] \[\mathbf{z}_t = \mathbf{H}\mathbf{x}_t + \mathbf{v}_t \quad (\text{Observation equation})\] where \(\mathbf{w}_t \sim N(0, \mathbf{Q})\) (process noise), \(\mathbf{v}_t \sim N(0, \mathbf{R})\) (observation noise) Kalman Filter Update Equations:
- Prediction: \(\hat{\mathbf{x}}_{t|t-1} = \mathbf{F}\hat{\mathbf{x}}_{t-1|t-1}\)
- Update: \(\hat{\mathbf{x}}_{t|t} = \hat{\mathbf{x}}_{t|t-1} + \mathbf{K}_t(\mathbf{z}_t - \mathbf{H}\hat{\mathbf{x}}_{t|t-1})\)
💻 Code Example 3: State Estimation with Kalman Filter
from pykalman import KalmanFilter
# Simulation of true state (position and velocity)
np.random.seed(42)
n_timesteps = 100
dt = 0.1
# True state
true_position = np.zeros(n_timesteps)
true_velocity = np.zeros(n_timesteps)
true_position[0] = 0
true_velocity[0] = 1 # Initial velocity
process_noise_std = 0.1
for t in range(1, n_timesteps):
# True equation of motion: x = x + v*dt, v = v + w
true_velocity[t] = true_velocity[t-1] + np.random.normal(0, process_noise_std)
true_position[t] = true_position[t-1] + true_velocity[t-1] * dt
# Observation data (with noise)
observation_noise_std = 0.5
observations = true_position + np.random.normal(0, observation_noise_std, n_timesteps)
# Kalman filter setup
# State: [position, velocity]
# Observation: [position]
# State transition matrix F
F = np.array([[1, dt],
[0, 1]])
# Observation matrix H
H = np.array([[1, 0]])
# Process noise covariance Q
Q = np.array([[0.01, 0],
[0, process_noise_std**2]])
# Observation noise covariance R
R = np.array([[observation_noise_std**2]])
# Initial state
initial_state_mean = np.array([0, 0])
initial_state_covariance = np.eye(2)
# Kalman filter
kf = KalmanFilter(
transition_matrices=F,
observation_matrices=H,
transition_covariance=Q,
observation_covariance=R,
initial_state_mean=initial_state_mean,
initial_state_covariance=initial_state_covariance
)
# Filtering
state_means, state_covariances = kf.filter(observations)
# Smoothing (posterior estimation using all data)
smoothed_state_means, smoothed_state_covariances = kf.smooth(observations)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) Position estimation
time = np.arange(n_timesteps) * dt
axes[0, 0].plot(time, true_position, 'g-', linewidth=2.5, label='True Position')
axes[0, 0].plot(time, observations, 'r.', markersize=6, alpha=0.5,
label='Observations (Noisy)')
axes[0, 0].plot(time, state_means[:, 0], 'b-', linewidth=2,
label='Kalman Filter')
axes[0, 0].plot(time, smoothed_state_means[:, 0], 'c--', linewidth=2,
label='Kalman Smoother')
axes[0, 0].set_xlabel('Time', fontsize=11)
axes[0, 0].set_ylabel('Position', fontsize=11)
axes[0, 0].set_title('Position Estimation with Kalman Filter', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) Velocity estimation
axes[0, 1].plot(time, true_velocity, 'g-', linewidth=2.5, label='True Velocity')
axes[0, 1].plot(time, state_means[:, 1], 'b-', linewidth=2,
label='Kalman Filter')
axes[0, 1].plot(time, smoothed_state_means[:, 1], 'c--', linewidth=2,
label='Kalman Smoother')
axes[0, 1].set_xlabel('Time', fontsize=11)
axes[0, 1].set_ylabel('Velocity', fontsize=11)
axes[0, 1].set_title('Velocity Estimation (No Direct Observation)', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) Estimation error
position_error = state_means[:, 0] - true_position
velocity_error = state_means[:, 1] - true_velocity
axes[1, 0].plot(time, position_error, linewidth=2, color='#667eea',
label='Position Estimation Error')
axes[1, 0].plot(time, velocity_error, linewidth=2, color='#764ba2',
label='Velocity Estimation Error')
axes[1, 0].axhline(y=0, color='red', linestyle='--', linewidth=2)
axes[1, 0].set_xlabel('Time', fontsize=11)
axes[1, 0].set_ylabel('Estimation Error', fontsize=11)
axes[1, 0].set_title('Time Evolution of Estimation Error', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (4) Estimation error covariance (uncertainty)
position_std = np.sqrt([cov[0, 0] for cov in state_covariances])
axes[1, 1].plot(time, state_means[:, 0], 'b-', linewidth=2, label='Estimated Position')
axes[1, 1].fill_between(time,
state_means[:, 0] - 2*position_std,
state_means[:, 0] + 2*position_std,
alpha=0.3, color='blue', label='95% Confidence Interval')
axes[1, 1].plot(time, true_position, 'g--', linewidth=2, label='True Position')
axes[1, 1].set_xlabel('Time', fontsize=11)
axes[1, 1].set_ylabel('Position', fontsize=11)
axes[1, 1].set_title('Estimation Uncertainty', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("Kalman Filter Performance:")
print(f"Position Estimation RMSE: {np.sqrt(np.mean(position_error**2)):.4f}")
print(f"Velocity Estimation RMSE: {np.sqrt(np.mean(velocity_error**2)):.4f}")
print(f"\nObservation Noise RMSE: {observation_noise_std:.4f}")
print(f"Filter Improvement Rate: {(1 - np.sqrt(np.mean(position_error**2))/observation_noise_std)*100:.1f}%")
5.4 Quality Control and Control Charts
📐 Definition: Control Chart
A tool for monitoring whether a process is in a state of statistical control. Xbar-R Control Chart:
A tool for monitoring whether a process is in a state of statistical control. Xbar-R Control Chart:
- Center Line (CL): \(\bar{\bar{X}} = \frac{1}{k}\sum_{i=1}^k \bar{X}_i\)
- Upper Control Limit (UCL): \(\bar{\bar{X}} + A_2 \bar{R}\)
- Lower Control Limit (LCL): \(\bar{\bar{X}} - A_2 \bar{R}\)
💻 Code Example 4: Control Charts (Xbar-R chart, CUSUM)
# Manufacturing process quality control
np.random.seed(42)
# Data generation: Normal and shift occurrence
n_groups = 50
group_size = 5
# Normal period (first 30 groups)
normal_data = np.random.normal(100, 2, (30, group_size))
# Shift occurrence (latter 20 groups, mean shifts to 101.5)
shifted_data = np.random.normal(101.5, 2, (20, group_size))
data = np.vstack([normal_data, shifted_data])
# Group mean and range
group_means = data.mean(axis=1)
group_ranges = data.max(axis=1) - data.min(axis=1)
# Control limit calculation
CL_mean = group_means[:30].mean() # Calculated from normal period data
R_bar = group_ranges[:30].mean()
# Xbar control chart coefficients (for sample size 5)
A2 = 0.577
D3 = 0
D4 = 2.114
UCL_mean = CL_mean + A2 * R_bar
LCL_mean = CL_mean - A2 * R_bar
UCL_range = D4 * R_bar
LCL_range = D3 * R_bar
fig, axes = plt.subplots(3, 2, figsize=(14, 12))
# (1) Xbar control chart
axes[0, 0].plot(group_means, 'o-', linewidth=2, color='#667eea', markersize=6)
axes[0, 0].axhline(y=CL_mean, color='green', linestyle='-', linewidth=2,
label=f'CL={CL_mean:.2f}')
axes[0, 0].axhline(y=UCL_mean, color='red', linestyle='--', linewidth=2,
label=f'UCL={UCL_mean:.2f}')
axes[0, 0].axhline(y=LCL_mean, color='red', linestyle='--', linewidth=2,
label=f'LCL={LCL_mean:.2f}')
axes[0, 0].axvline(x=29.5, color='orange', linestyle=':', linewidth=2,
alpha=0.7, label='Shift Point')
# Highlight out-of-control points
out_of_control = (group_means > UCL_mean) | (group_means < LCL_mean)
axes[0, 0].plot(np.where(out_of_control)[0], group_means[out_of_control],
'rx', markersize=12, markeredgewidth=3, label='Out of Control')
axes[0, 0].set_xlabel('Group Number', fontsize=11)
axes[0, 0].set_ylabel('Group Mean', fontsize=11)
axes[0, 0].set_title('Xbar Control Chart', fontsize=12, fontweight='bold')
axes[0, 0].legend(fontsize=9)
axes[0, 0].grid(alpha=0.3)
# (2) R control chart
axes[0, 1].plot(group_ranges, 'o-', linewidth=2, color='#764ba2', markersize=6)
axes[0, 1].axhline(y=R_bar, color='green', linestyle='-', linewidth=2,
label=f'CL={R_bar:.2f}')
axes[0, 1].axhline(y=UCL_range, color='red', linestyle='--', linewidth=2,
label=f'UCL={UCL_range:.2f}')
axes[0, 1].axhline(y=LCL_range, color='red', linestyle='--', linewidth=2,
label=f'LCL={LCL_range:.2f}')
axes[0, 1].axvline(x=29.5, color='orange', linestyle=':', linewidth=2, alpha=0.7)
axes[0, 1].set_xlabel('Group Number', fontsize=11)
axes[0, 1].set_ylabel('Group Range', fontsize=11)
axes[0, 1].set_title('R Control Chart (Range)', fontsize=12, fontweight='bold')
axes[0, 1].legend(fontsize=9)
axes[0, 1].grid(alpha=0.3)
# (3) CUSUM control chart
mu_0 = CL_mean # Target value
sigma = 2 # Standard deviation (assumed known)
k = 0.5 * sigma # Allowance value (detect 0.5σ shift)
h = 5 * sigma # Decision interval (5σ)
# Upper and lower CUSUM
C_plus = np.zeros(n_groups + 1)
C_minus = np.zeros(n_groups + 1)
for i in range(n_groups):
C_plus[i+1] = max(0, C_plus[i] + (group_means[i] - mu_0) - k)
C_minus[i+1] = max(0, C_minus[i] - (group_means[i] - mu_0) - k)
axes[1, 0].plot(C_plus[1:], 'o-', linewidth=2, color='red', markersize=6,
label='Upper CUSUM')
axes[1, 0].plot(C_minus[1:], 'o-', linewidth=2, color='blue', markersize=6,
label='Lower CUSUM')
axes[1, 0].axhline(y=h, color='red', linestyle='--', linewidth=2,
label=f'Decision Limit h={h:.2f}')
axes[1, 0].axvline(x=29.5, color='orange', linestyle=':', linewidth=2, alpha=0.7)
# Alarm detection
alarm_plus = np.where(C_plus[1:] > h)[0]
if len(alarm_plus) > 0:
axes[1, 0].plot(alarm_plus[0], C_plus[alarm_plus[0]+1], 'r*',
markersize=20, label=f'Alarm (t={alarm_plus[0]})')
axes[1, 0].set_xlabel('Group Number', fontsize=11)
axes[1, 0].set_ylabel('CUSUM', fontsize=11)
axes[1, 0].set_title('CUSUM Control Chart', fontsize=12, fontweight='bold')
axes[1, 0].legend(fontsize=9)
axes[1, 0].grid(alpha=0.3)
# (4) Individual value histogram (Normal vs. Shifted)
axes[1, 1].hist(normal_data.flatten(), bins=30, alpha=0.6, density=True,
color='green', edgecolor='black', label='Normal Period')
axes[1, 1].hist(shifted_data.flatten(), bins=30, alpha=0.6, density=True,
color='red', edgecolor='black', label='After Shift')
# Theoretical distribution
x = np.linspace(94, 108, 1000)
axes[1, 1].plot(x, stats.norm.pdf(x, 100, 2), 'g-', linewidth=2.5,
label='N(100, 2²)')
axes[1, 1].plot(x, stats.norm.pdf(x, 101.5, 2), 'r-', linewidth=2.5,
label='N(101.5, 2²)')
axes[1, 1].set_xlabel('Measurement', fontsize=11)
axes[1, 1].set_ylabel('Density', fontsize=11)
axes[1, 1].set_title('Process Shift Detection', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
# (5) Process capability index (Cpk)
# Cpk = min((USL - μ)/(3σ), (μ - LSL)/(3σ))
USL = 106 # Upper specification limit
LSL = 94 # Lower specification limit
# Normal period
Cp_normal = (USL - LSL) / (6 * sigma)
Cpk_normal = min((USL - CL_mean) / (3 * sigma),
(CL_mean - LSL) / (3 * sigma))
# After shift
shifted_mean = shifted_data.mean()
Cpk_shifted = min((USL - shifted_mean) / (3 * sigma),
(shifted_mean - LSL) / (3 * sigma))
axes[2, 0].bar(['Cp\n(Normal)', 'Cpk\n(Normal)', 'Cpk\n(Shifted)'],
[Cp_normal, Cpk_normal, Cpk_shifted],
color=['green', 'blue', 'red'], alpha=0.7, edgecolor='black')
axes[2, 0].axhline(y=1.33, color='orange', linestyle='--', linewidth=2,
label='Recommended (1.33)')
axes[2, 0].axhline(y=1.00, color='red', linestyle='--', linewidth=2,
label='Minimum (1.00)')
axes[2, 0].set_ylabel('Process Capability Index', fontsize=11)
axes[2, 0].set_title('Process Capability Index (Cp, Cpk)', fontsize=12, fontweight='bold')
axes[2, 0].legend()
axes[2, 0].grid(axis='y', alpha=0.3)
# (6) Control chart performance comparison (detection time)
axes[2, 1].text(0.1, 0.8, f'Shift Occurrence: Group 30', fontsize=12, transform=axes[2, 1].transAxes)
axes[2, 1].text(0.1, 0.7, f'\nXbar Control Chart:', fontsize=12, fontweight='bold',
transform=axes[2, 1].transAxes)
first_out = np.where(out_of_control)[0]
if len(first_out) > 0:
axes[2, 1].text(0.1, 0.6, f' First Detection: Group {first_out[0]+1}',
fontsize=11, transform=axes[2, 1].transAxes)
axes[2, 1].text(0.1, 0.5, f' Detection Delay: {first_out[0]-29} Groups',
fontsize=11, transform=axes[2, 1].transAxes)
axes[2, 1].text(0.1, 0.4, f'\nCUSUM Control Chart:', fontsize=12, fontweight='bold',
transform=axes[2, 1].transAxes)
if len(alarm_plus) > 0:
axes[2, 1].text(0.1, 0.3, f' First Detection: Group {alarm_plus[0]+1}',
fontsize=11, transform=axes[2, 1].transAxes)
axes[2, 1].text(0.1, 0.2, f' Detection Delay: {alarm_plus[0]-29} Groups',
fontsize=11, transform=axes[2, 1].transAxes)
axes[2, 1].axis('off')
axes[2, 1].set_title('Detection Performance Comparison', fontsize=12, fontweight='bold')
plt.tight_layout()
plt.show()
print("Quality Control Metrics:")
print(f"Process Capability Index Cp (Normal): {Cp_normal:.4f}")
print(f"Process Capability Index Cpk (Normal): {Cpk_normal:.4f}")
print(f"Process Capability Index Cpk (Shifted): {Cpk_shifted:.4f}")
print(f"\nOut-of-Control Points (Xbar Chart): {out_of_control.sum()} / {n_groups}")
5.5 Process Optimization with Monte Carlo Simulation
💻 Code Example 5: Process Optimization with Monte Carlo Simulation
# Process optimization problem: Yield maximization
np.random.seed(42)
# Process parameters (with uncertainty)
def process_yield(temperature, pressure, time):
"""
Yield calculation model (simplified)
temperature: Temperature (°C)
pressure: Pressure (MPa)
time: Reaction time (hours)
"""
# Ideal yield model
T_opt = 350
P_opt = 2.0
t_opt = 4.0
# Yield model with quadratic function
yield_rate = 0.9 - 0.001*(temperature - T_opt)**2 \
- 0.05*(pressure - P_opt)**2 \
- 0.01*(time - t_opt)**2
# Random noise (process variation)
noise = np.random.normal(0, 0.02)
return max(0, min(1, yield_rate + noise))
# Monte Carlo simulation
n_simulations = 10000
# Parameter settings (considering uncertainty)
temperature_mean = 350
temperature_std = 5
pressure_mean = 2.0
pressure_std = 0.1
time_mean = 4.0
time_std = 0.2
# Execute simulation
yields = []
temperatures = []
pressures = []
times = []
for _ in range(n_simulations):
T = np.random.normal(temperature_mean, temperature_std)
P = np.random.normal(pressure_mean, pressure_std)
t = np.random.normal(time_mean, time_std)
y = process_yield(T, P, t)
yields.append(y)
temperatures.append(T)
pressures.append(P)
times.append(t)
yields = np.array(yields)
temperatures = np.array(temperatures)
pressures = np.array(pressures)
times = np.array(times)
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
# (1) Yield distribution
axes[0, 0].hist(yields, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
axes[0, 0].axvline(yields.mean(), color='red', linestyle='--', linewidth=2,
label=f'Mean={yields.mean():.4f}')
axes[0, 0].axvline(np.percentile(yields, 5), color='orange', linestyle=':',
linewidth=2, label=f'5th Percentile={np.percentile(yields, 5):.4f}')
axes[0, 0].set_xlabel('Yield', fontsize=11)
axes[0, 0].set_ylabel('Density', fontsize=11)
axes[0, 0].set_title('Yield Distribution', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) Temperature vs. Yield
axes[0, 1].scatter(temperatures, yields, alpha=0.3, s=10, color='#667eea')
axes[0, 1].set_xlabel('Temperature (°C)', fontsize=11)
axes[0, 1].set_ylabel('Yield', fontsize=11)
axes[0, 1].set_title('Temperature vs. Yield Relationship', fontsize=12, fontweight='bold')
axes[0, 1].grid(alpha=0.3)
# Sensitivity analysis (linear regression)
from scipy.stats import pearsonr
corr_T, _ = pearsonr(temperatures, yields)
axes[0, 1].text(0.05, 0.95, f'Correlation: {corr_T:.4f}',
transform=axes[0, 1].transAxes, fontsize=10,
verticalalignment='top')
# (3) Pressure vs. Yield
axes[0, 2].scatter(pressures, yields, alpha=0.3, s=10, color='#764ba2')
axes[0, 2].set_xlabel('Pressure (MPa)', fontsize=11)
axes[0, 2].set_ylabel('Yield', fontsize=11)
axes[0, 2].set_title('Pressure vs. Yield Relationship', fontsize=12, fontweight='bold')
axes[0, 2].grid(alpha=0.3)
corr_P, _ = pearsonr(pressures, yields)
axes[0, 2].text(0.05, 0.95, f'Correlation: {corr_P:.4f}',
transform=axes[0, 2].transAxes, fontsize=10,
verticalalignment='top')
# (4) Risk analysis (Probability of yield < 0.8)
threshold = 0.8
risk_probability = (yields < threshold).sum() / n_simulations
axes[1, 0].hist(yields, bins=50, density=True, alpha=0.6,
color='#667eea', edgecolor='black')
axes[1, 0].axvline(threshold, color='red', linestyle='--', linewidth=2.5,
label=f'Threshold={threshold}')
# Fill risk region
x_fill = np.linspace(yields.min(), threshold, 100)
axes[1, 0].fill_betweenx([0, 10], threshold, yields.min(),
alpha=0.3, color='red')
axes[1, 0].set_xlabel('Yield', fontsize=11)
axes[1, 0].set_ylabel('Density', fontsize=11)
axes[1, 0].set_title(f'Risk Analysis (Probability of Yield<{threshold}: {risk_probability:.2%})',
fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
axes[1, 0].set_ylim([0, 20])
# (5) Parameter sensitivity analysis
sensitivities = {
'Temperature': corr_T,
'Pressure': corr_P,
'Time': pearsonr(times, yields)[0]
}
axes[1, 1].barh(list(sensitivities.keys()), list(sensitivities.values()),
color='#667eea', alpha=0.7, edgecolor='black')
axes[1, 1].axvline(x=0, color='black', linestyle='-', linewidth=0.8)
axes[1, 1].set_xlabel('Correlation Coefficient', fontsize=11)
axes[1, 1].set_title('Parameter Sensitivity Analysis', fontsize=12, fontweight='bold')
axes[1, 1].grid(alpha=0.3)
# (6) Optimization scenario comparison
scenarios = {
'Current': {'T': (350, 5), 'P': (2.0, 0.1), 't': (4.0, 0.2)},
'Temp Optimized': {'T': (350, 2), 'P': (2.0, 0.1), 't': (4.0, 0.2)},
'Press Optimized': {'T': (350, 5), 'P': (2.0, 0.05), 't': (4.0, 0.2)},
'Total Optimized': {'T': (350, 2), 'P': (2.0, 0.05), 't': (4.0, 0.1)}
}
scenario_yields = {}
for name, params in scenarios.items():
yields_scenario = []
for _ in range(5000):
T = np.random.normal(*params['T'])
P = np.random.normal(*params['P'])
t = np.random.normal(*params['t'])
yields_scenario.append(process_yield(T, P, t))
scenario_yields[name] = np.array(yields_scenario)
axes[1, 2].boxplot([scenario_yields[name] for name in scenarios.keys()],
labels=list(scenarios.keys()), patch_artist=True)
axes[1, 2].set_ylabel('Yield', fontsize=11)
axes[1, 2].set_title('Optimization Scenario Comparison', fontsize=12, fontweight='bold')
axes[1, 2].grid(axis='y', alpha=0.3)
plt.setp(axes[1, 2].xaxis.get_majorticklabels(), rotation=45, ha='right')
plt.tight_layout()
plt.show()
print("Monte Carlo Simulation Results:")
print(f"Number of Simulations: {n_simulations}")
print(f"\nYield Statistics:")
print(f" Mean: {yields.mean():.4f}")
print(f" Standard Deviation: {yields.std():.4f}")
print(f" Minimum: {yields.min():.4f}")
print(f" Maximum: {yields.max():.4f}")
print(f" 5th Percentile: {np.percentile(yields, 5):.4f}")
print(f" 95th Percentile: {np.percentile(yields, 95):.4f}")
print(f"\nRisk Assessment:")
print(f" Probability of Yield<{threshold}: {risk_probability:.2%}")
print("\nOptimization Scenario Comparison:")
for name in scenarios.keys():
mean_yield = scenario_yields[name].mean()
std_yield = scenario_yields[name].std()
print(f" {name}: Mean={mean_yield:.4f}, σ={std_yield:.4f}")
5.6 Reliability Engineering (Weibull Distribution)
💻 Code Example 6: Reliability Engineering (Weibull Distribution)
# Reliability analysis with Weibull distribution
from scipy.stats import weibull_min
np.random.seed(42)
# Simulation of failure data (Weibull distribution)
shape_param = 2.5 # β (shape parameter)
scale_param = 1000 # η (scale parameter, characteristic life)
weibull_dist = weibull_min(c=shape_param, scale=scale_param)
# Failure time data
n_samples = 100
failure_times = weibull_dist.rvs(n_samples)
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
# (1) Failure time distribution
axes[0, 0].hist(failure_times, bins=30, density=True, alpha=0.6,
color='#667eea', edgecolor='black', label='Observed Data')
t = np.linspace(0, failure_times.max(), 1000)
pdf_theory = weibull_dist.pdf(t)
axes[0, 0].plot(t, pdf_theory, 'r-', linewidth=2.5,
label=f'Weibull(β={shape_param}, η={scale_param})')
axes[0, 0].set_xlabel('Failure Time (hours)', fontsize=11)
axes[0, 0].set_ylabel('Probability Density', fontsize=11)
axes[0, 0].set_title('Failure Time Distribution', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) Reliability function R(t) = 1 - F(t)
cdf_theory = weibull_dist.cdf(t)
reliability = 1 - cdf_theory
axes[0, 1].plot(t, reliability, linewidth=2.5, color='#667eea')
axes[0, 1].axhline(y=0.5, color='red', linestyle='--', linewidth=2,
label='R(t)=0.5')
# Mean time to failure (MTTF)
mttf = weibull_dist.mean()
axes[0, 1].axvline(x=mttf, color='green', linestyle='--', linewidth=2,
label=f'MTTF={mttf:.0f}h')
axes[0, 1].set_xlabel('Time (hours)', fontsize=11)
axes[0, 1].set_ylabel('Reliability R(t)', fontsize=11)
axes[0, 1].set_title('Reliability Function', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) Hazard function (failure rate) h(t) = f(t) / R(t)
hazard_rate = pdf_theory / (reliability + 1e-10)
axes[0, 2].plot(t, hazard_rate, linewidth=2.5, color='#764ba2')
axes[0, 2].set_xlabel('Time (hours)', fontsize=11)
axes[0, 2].set_ylabel('Hazard Rate h(t)', fontsize=11)
axes[0, 2].set_title('Hazard Function (Failure Rate)', fontsize=12, fontweight='bold')
axes[0, 2].grid(alpha=0.3)
# β>1: Wear-out failure (increasing failure rate)
axes[0, 2].text(0.5, 0.9, f'β={shape_param} > 1: Wear-out Failure',
transform=axes[0, 2].transAxes, fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
# (4) Weibull plot (probability paper)
# ln(-ln(1-F(t))) vs ln(t) becomes linear
sorted_times = np.sort(failure_times)
n = len(sorted_times)
empirical_cdf = np.arange(1, n+1) / (n+1)
# Weibull transformation
y_weibull = np.log(-np.log(1 - empirical_cdf))
x_weibull = np.log(sorted_times)
# Linear regression
from scipy.stats import linregress
slope, intercept, r_value, _, _ = linregress(x_weibull, y_weibull)
# Parameter estimation
beta_estimated = slope
eta_estimated = np.exp(-intercept / slope)
axes[1, 0].plot(x_weibull, y_weibull, 'o', color='#667eea', markersize=5,
label='Observed Data')
axes[1, 0].plot(x_weibull, slope*x_weibull + intercept, 'r-', linewidth=2,
label=f'Fit: β={beta_estimated:.2f}, η={eta_estimated:.0f}')
axes[1, 0].set_xlabel('ln(Time)', fontsize=11)
axes[1, 0].set_ylabel('ln(-ln(1-F(t)))', fontsize=11)
axes[1, 0].set_title(f'Weibull Plot (R²={r_value**2:.4f})',
fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)
# (5) Maintenance planning (preventive vs. corrective)
# Cost model
C_preventive = 100 # Preventive maintenance cost
C_corrective = 500 # Corrective maintenance cost (failure repair)
# Vary preventive maintenance interval
T_preventive = np.linspace(100, 2000, 100)
expected_costs = []
for T in T_preventive:
# Failure probability until preventive maintenance
F_T = weibull_dist.cdf(T)
# Expected cost (per cycle)
# E[Cost] = C_preventive + C_corrective * F(T)
expected_cost = (C_preventive + C_corrective * F_T) / T
expected_costs.append(expected_cost)
expected_costs = np.array(expected_costs)
optimal_T = T_preventive[np.argmin(expected_costs)]
axes[1, 1].plot(T_preventive, expected_costs, linewidth=2.5, color='#667eea')
axes[1, 1].axvline(x=optimal_T, color='red', linestyle='--', linewidth=2,
label=f'Optimal Interval={optimal_T:.0f}h')
axes[1, 1].scatter([optimal_T], [expected_costs.min()], color='red',
s=100, zorder=5)
axes[1, 1].set_xlabel('Preventive Maintenance Interval (hours)', fontsize=11)
axes[1, 1].set_ylabel('Expected Cost (/hour)', fontsize=11)
axes[1, 1].set_title('Maintenance Planning Optimization', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
# (6) Comparison with different shape parameters
betas = [0.5, 1.0, 2.0, 3.0]
colors = ['blue', 'green', 'orange', 'red']
for beta, color in zip(betas, colors):
dist = weibull_min(c=beta, scale=1000)
t = np.linspace(0, 3000, 1000)
pdf = dist.pdf(t)
axes[1, 2].plot(t, pdf, linewidth=2.5, color=color, label=f'β={beta}')
axes[1, 2].set_xlabel('Time', fontsize=11)
axes[1, 2].set_ylabel('Probability Density', fontsize=11)
axes[1, 2].set_title('Effect of Shape Parameter β', fontsize=12, fontweight='bold')
axes[1, 2].legend()
axes[1, 2].grid(alpha=0.3)
# Failure mode explanation
axes[1, 2].text(0.55, 0.9, 'β<1: Early Failure',
transform=axes[1, 2].transAxes, fontsize=9)
axes[1, 2].text(0.55, 0.8, 'β=1: Random Failure',
transform=axes[1, 2].transAxes, fontsize=9)
axes[1, 2].text(0.55, 0.7, 'β>1: Wear-out Failure',
transform=axes[1, 2].transAxes, fontsize=9)
plt.tight_layout()
plt.show()
print("Reliability Analysis with Weibull Distribution:")
print(f"True Parameters: β={shape_param}, η={scale_param}")
print(f"Estimated Parameters: β={beta_estimated:.2f}, η={eta_estimated:.0f}")
print(f"\nReliability Metrics:")
print(f" Mean Time to Failure (MTTF): {mttf:.2f} hours")
print(f" B10 Life (10% Failure Time): {weibull_dist.ppf(0.1):.2f} hours")
print(f" Median Life: {weibull_dist.median():.2f} hours")
print(f"\nOptimal Maintenance Plan:")
print(f" Optimal Preventive Maintenance Interval: {optimal_T:.0f} hours")
print(f" Minimum Expected Cost: {expected_costs.min():.4f} /hour")
5.7 Failure Prediction Model for Predictive Maintenance
💻 Code Example 7: Failure Prediction Model for Predictive Maintenance
# Predictive maintenance: Failure prediction from degradation data
np.random.seed(42)
# Degradation data simulation (increasing with time)
n_units = 20 # Number of equipment units
max_time = 200 # Observation period
# Degradation model: X(t) = a*t + b*t^2 + noise
def degradation_model(t, a=0.1, b=0.001, sigma=0.5):
"""Degradation model (quadratic function + noise)"""
return a * t + b * t**2 + np.random.normal(0, sigma)
# Simulate degradation trajectories for multiple units
time_points = np.linspace(0, max_time, 50)
degradation_data = []
for i in range(n_units):
# Add variability to parameters
a = np.random.normal(0.1, 0.02)
b = np.random.normal(0.001, 0.0002)
degradation = [degradation_model(t, a, b) for t in time_points]
degradation_data.append(degradation)
degradation_data = np.array(degradation_data)
# Failure threshold
failure_threshold = 30
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) Degradation trajectories
for i in range(n_units):
axes[0, 0].plot(time_points, degradation_data[i], alpha=0.5, linewidth=1.5)
axes[0, 0].axhline(y=failure_threshold, color='red', linestyle='--',
linewidth=2.5, label=f'Failure Threshold={failure_threshold}')
# Mean degradation trajectory
mean_degradation = degradation_data.mean(axis=0)
axes[0, 0].plot(time_points, mean_degradation, 'b-', linewidth=3,
label='Mean Degradation Trajectory')
axes[0, 0].set_xlabel('Time', fontsize=11)
axes[0, 0].set_ylabel('Degradation Level', fontsize=11)
axes[0, 0].set_title('Equipment Degradation Trajectories', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)
# (2) Remaining Useful Life (RUL) prediction
current_time = 100
current_degradation = degradation_data[:, np.argmin(np.abs(time_points - current_time))]
# Estimate degradation rate using linear regression
predicted_ruls = []
for i in range(n_units):
# Estimate degradation rate from past data
past_times = time_points[time_points <= current_time]
past_degradation = degradation_data[i, :len(past_times)]
# Linear regression
slope, intercept, _, _, _ = linregress(past_times, past_degradation)
# Predict failure time: failure_threshold = slope * t_failure + intercept
if slope > 0:
t_failure = (failure_threshold - intercept) / slope
rul = max(0, t_failure - current_time)
else:
rul = np.inf
predicted_ruls.append(rul)
predicted_ruls = np.array(predicted_ruls)
predicted_ruls = predicted_ruls[predicted_ruls < 500] # Remove outliers
axes[0, 1].hist(predicted_ruls, bins=20, alpha=0.6, color='#667eea',
edgecolor='black')
axes[0, 1].axvline(predicted_ruls.mean(), color='red', linestyle='--',
linewidth=2, label=f'Mean RUL={predicted_ruls.mean():.1f}')
axes[0, 1].set_xlabel('Remaining Useful Life (RUL)', fontsize=11)
axes[0, 1].set_ylabel('Frequency', fontsize=11)
axes[0, 1].set_title(f'RUL Distribution at t={current_time}',
fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)
# (3) Decision-making for predictive maintenance
# Cost calculation
maintenance_cost = 50
failure_cost = 200
inspection_cost = 10
# Compare maintenance strategies
strategies = {
'Corrective': {'action': 'corrective', 'threshold': failure_threshold},
'Time-based': {'action': 'time-based', 'interval': 80},
'Condition-based': {'action': 'condition-based', 'threshold': 25},
'Predictive': {'action': 'predictive', 'rul_threshold': 20}
}
# Cost evaluation by simulation
def simulate_maintenance_cost(degradation_data, time_points, strategy, n_sims=100):
"""Maintenance strategy cost simulation"""
total_cost = 0
for sim in range(n_sims):
current_deg = 0
current_time = 0
cost = 0
for t_idx, t in enumerate(time_points):
current_deg = degradation_data[sim % len(degradation_data), t_idx]
if strategy['action'] == 'corrective':
if current_deg >= failure_threshold:
cost += failure_cost
current_deg = 0
elif strategy['action'] == 'time-based':
if t % strategy['interval'] == 0 and t > 0:
cost += maintenance_cost
elif strategy['action'] == 'condition-based':
cost += inspection_cost
if current_deg >= strategy['threshold']:
cost += maintenance_cost
current_deg = 0
elif strategy['action'] == 'predictive':
# RUL estimation and maintenance decision (simplified)
if current_deg >= 20: # Predict when degradation progresses
cost += inspection_cost
estimated_rul = (failure_threshold - current_deg) / 0.2
if estimated_rul < strategy['rul_threshold']:
cost += maintenance_cost
current_deg = 0
total_cost += cost
return total_cost / n_sims
strategy_costs = {}
for name, strategy in strategies.items():
avg_cost = simulate_maintenance_cost(degradation_data, time_points, strategy)
strategy_costs[name] = avg_cost
axes[1, 0].bar(strategy_costs.keys(), strategy_costs.values(),
color=['red', 'orange', 'yellow', 'green'], alpha=0.7,
edgecolor='black')
axes[1, 0].set_ylabel('Expected Total Cost', fontsize=11)
axes[1, 0].set_title('Maintenance Strategy Cost Comparison', fontsize=12, fontweight='bold')
axes[1, 0].grid(axis='y', alpha=0.3)
plt.setp(axes[1, 0].xaxis.get_majorticklabels(), rotation=45, ha='right')
# Highlight optimal strategy
min_cost_strategy = min(strategy_costs, key=strategy_costs.get)
min_cost_idx = list(strategy_costs.keys()).index(min_cost_strategy)
axes[1, 0].get_children()[min_cost_idx].set_edgecolor('blue')
axes[1, 0].get_children()[min_cost_idx].set_linewidth(3)
# (4) ROI (Return on Investment) of predictive maintenance
preventive_savings = strategy_costs['Corrective'] - strategy_costs['Predictive']
inspection_cost_total = inspection_cost * len(time_points)
roi = (preventive_savings / inspection_cost_total) * 100
axes[1, 1].text(0.5, 0.7, 'Predictive Maintenance ROI',
ha='center', fontsize=14, fontweight='bold',
transform=axes[1, 1].transAxes)
axes[1, 1].text(0.5, 0.5, f'Corrective Maintenance Cost: {strategy_costs["Corrective"]:.2f}',
ha='center', fontsize=12, transform=axes[1, 1].transAxes)
axes[1, 1].text(0.5, 0.4, f'Predictive Maintenance Cost: {strategy_costs["Predictive"]:.2f}',
ha='center', fontsize=12, transform=axes[1, 1].transAxes)
axes[1, 1].text(0.5, 0.3, f'Cost Reduction: {preventive_savings:.2f}',
ha='center', fontsize=12, color='green',
fontweight='bold', transform=axes[1, 1].transAxes)
axes[1, 1].text(0.5, 0.2, f'ROI: {roi:.1f}%',
ha='center', fontsize=14, color='blue',
fontweight='bold', transform=axes[1, 1].transAxes)
axes[1, 1].axis('off')
plt.tight_layout()
plt.show()
print("Predictive Maintenance Effects:")
print("="*60)
for name, cost in strategy_costs.items():
print(f"{name:15s}: {cost:8.2f}")
print(f"\nCost Reduction by Predictive Maintenance: {preventive_savings:.2f}")
print(f"ROI: {roi:.1f}%")
💡 Note: Predictive Maintenance combines IoT sensors and machine learning to predict equipment failures in advance and perform maintenance at optimal timing. Compared to corrective maintenance (repair after breakdown) or time-based maintenance (periodic maintenance), significant cost reductions and improved operational rates are possible.
Exercises
📝 Exercise 1: Forecasting with ARIMA Model
Generate time series data (trend + seasonality + noise) and:
Generate time series data (trend + seasonality + noise) and:
- Determine the ARIMA model order (p,d,q) from ACF/PACF plots
- Fit the ARIMA model and forecast 50 steps ahead
- Verify that residuals are white noise using the Ljung-Box test
📝 Exercise 2: Implementing Kalman Filter
Estimate velocity and position from noisy accelerometer data:
Estimate velocity and position from noisy accelerometer data:
- Define a state-space model (position, velocity, acceleration)
- Implement the Kalman filter and estimate position from noisy acceleration
- Investigate the impact of observation noise magnitude on estimation accuracy
📝 Exercise 3: Designing a Predictive Maintenance System
Design a system to predict bearing failures from vibration sensor data:
Design a system to predict bearing failures from vibration sensor data:
- Define a degradation model (time increase in vibration amplitude) and generate data
- Implement an algorithm to estimate Remaining Useful Life (RUL)
- Compare costs of different maintenance strategies (corrective, time-based, condition-based, predictive)