Practical methods for visualizing process variation and early anomaly detection
Statistical Process Control (SPC) is a methodology for statistically monitoring process variation and detecting anomalies early. It is based on control charts developed by Dr. Walter Shewhart at Bell Laboratories in the 1920s.
The purpose of SPC is to statistically distinguish between normal variation from common causes and abnormal variation from special causes, and to detect and address special causes early.
A control chart consists of the following elements:
The most basic control chart. Simultaneously manages subgroup means and ranges.
# ===================================
# Example 1: X-bar and R Control Charts Implementation
# ===================================
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Tuple, Dict
import warnings
warnings.filterwarnings('ignore')
class XbarRControlChart:
"""X-bar and R Control Chart Implementation
Used for continuous variable process control. Simultaneously monitors
subgroup means (X-bar) and ranges (R) to manage process center and spread.
Parameters:
-----------
subgroup_size : int
Number of samples in subgroup (typically 2-10)
Attributes:
-----------
A2, D3, D4 : float
Constants for control limit calculation (dependent on subgroup size)
"""
# Constants by subgroup size (from JIS Z 9020-2)
CONSTANTS = {
2: {'A2': 1.880, 'D3': 0, 'D4': 3.267},
3: {'A2': 1.023, 'D3': 0, 'D4': 2.575},
4: {'A2': 0.729, 'D3': 0, 'D4': 2.282},
5: {'A2': 0.577, 'D3': 0, 'D4': 2.115},
6: {'A2': 0.483, 'D3': 0, 'D4': 2.004},
7: {'A2': 0.419, 'D3': 0.076, 'D4': 1.924},
8: {'A2': 0.373, 'D3': 0.136, 'D4': 1.864},
9: {'A2': 0.337, 'D3': 0.184, 'D4': 1.816},
10: {'A2': 0.308, 'D3': 0.223, 'D4': 1.777}
}
def __init__(self, subgroup_size: int = 5):
if subgroup_size not in self.CONSTANTS:
raise ValueError(f"Subgroup size must be 2-10, got {subgroup_size}")
self.n = subgroup_size
self.A2 = self.CONSTANTS[subgroup_size]['A2']
self.D3 = self.CONSTANTS[subgroup_size]['D3']
self.D4 = self.CONSTANTS[subgroup_size]['D4']
self.xbar_cl = None
self.xbar_ucl = None
self.xbar_lcl = None
self.r_cl = None
self.r_ucl = None
self.r_lcl = None
def fit(self, data: np.ndarray) -> 'XbarRControlChart':
"""Calculate control limits (Phase I: Establish baseline from initial data)
Parameters:
-----------
data : np.ndarray
Data with shape (n_subgroups, subgroup_size)
Returns:
--------
self : XbarRControlChart
"""
if data.shape[1] != self.n:
raise ValueError(f"Data subgroup size {data.shape[1]} != {self.n}")
# Calculate mean and range for each subgroup
xbar = data.mean(axis=1)
r = data.max(axis=1) - data.min(axis=1)
# Calculate center line and control limits
self.xbar_cl = xbar.mean()
self.r_cl = r.mean()
# X-bar chart control limits
self.xbar_ucl = self.xbar_cl + self.A2 * self.r_cl
self.xbar_lcl = self.xbar_cl - self.A2 * self.r_cl
# R chart control limits
self.r_ucl = self.D4 * self.r_cl
self.r_lcl = self.D3 * self.r_cl
return self
def predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
"""Determine control status for new data (Phase II: Continuous monitoring)
Parameters:
-----------
data : np.ndarray
Data with shape (n_subgroups, subgroup_size)
Returns:
--------
results : dict
Dictionary containing 'xbar', 'r', 'xbar_ooc', 'r_ooc'
ooc = out of control
"""
if self.xbar_cl is None:
raise ValueError("Call fit() first to establish control limits")
xbar = data.mean(axis=1)
r = data.max(axis=1) - data.min(axis=1)
# Determine out of control
xbar_ooc = (xbar > self.xbar_ucl) | (xbar < self.xbar_lcl)
r_ooc = (r > self.r_ucl) | (r < self.r_lcl)
return {
'xbar': xbar,
'r': r,
'xbar_ooc': xbar_ooc,
'r_ooc': r_ooc
}
def plot(self, data: np.ndarray, title: str = "X-bar and R Control Charts"):
"""Plot control charts
Parameters:
-----------
data : np.ndarray
Data with shape (n_subgroups, subgroup_size)
title : str
Graph title
"""
results = self.predict(data)
n_points = len(results['xbar'])
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# X-bar chart
ax1.plot(range(1, n_points+1), results['xbar'],
marker='o', color='#2c3e50', linewidth=1.5, markersize=6)
ax1.axhline(self.xbar_cl, color='#11998e', linestyle='-',
linewidth=2, label='CL')
ax1.axhline(self.xbar_ucl, color='#e74c3c', linestyle='--',
linewidth=2, label='UCL')
ax1.axhline(self.xbar_lcl, color='#e74c3c', linestyle='--',
linewidth=2, label='LCL')
# Highlight out of control points
ooc_indices = np.where(results['xbar_ooc'])[0]
if len(ooc_indices) > 0:
ax1.scatter(ooc_indices + 1, results['xbar'][ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
ax1.set_xlabel('Subgroup Number', fontsize=11)
ax1.set_ylabel('Subgroup Mean (X-bar)', fontsize=11)
ax1.set_title('X-bar Control Chart', fontsize=13, fontweight='bold')
ax1.legend(loc='best')
ax1.grid(True, alpha=0.3)
# R chart
ax2.plot(range(1, n_points+1), results['r'],
marker='s', color='#2c3e50', linewidth=1.5, markersize=6)
ax2.axhline(self.r_cl, color='#11998e', linestyle='-',
linewidth=2, label='CL')
ax2.axhline(self.r_ucl, color='#e74c3c', linestyle='--',
linewidth=2, label='UCL')
ax2.axhline(self.r_lcl, color='#e74c3c', linestyle='--',
linewidth=2, label='LCL')
# Highlight out of control points
ooc_indices = np.where(results['r_ooc'])[0]
if len(ooc_indices) > 0:
ax2.scatter(ooc_indices + 1, results['r'][ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
ax2.set_xlabel('Subgroup Number', fontsize=11)
ax2.set_ylabel('Subgroup Range (R)', fontsize=11)
ax2.set_title('R Control Chart', fontsize=13, fontweight='bold')
ax2.legend(loc='best')
ax2.grid(True, alpha=0.3)
plt.suptitle(title, fontsize=15, fontweight='bold', y=0.995)
plt.tight_layout()
plt.show()
# Example usage: Chemical process temperature management
# Phase I: Establish control limits with initial 25 subgroups
np.random.seed(42)
n_subgroups_phase1 = 25
subgroup_size = 5
target_temp = 80.0 # Target temperature [°C]
# Normal operation data (mean 80°C, standard deviation 0.5°C)
phase1_data = np.random.normal(target_temp, 0.5,
(n_subgroups_phase1, subgroup_size))
# Build control chart
xbar_r_chart = XbarRControlChart(subgroup_size=subgroup_size)
xbar_r_chart.fit(phase1_data)
print("=== Phase I: Establishing Control Limits ===")
print(f"X-bar chart:")
print(f" CL = {xbar_r_chart.xbar_cl:.3f} °C")
print(f" UCL = {xbar_r_chart.xbar_ucl:.3f} °C")
print(f" LCL = {xbar_r_chart.xbar_lcl:.3f} °C")
print(f"\nR chart:")
print(f" CL = {xbar_r_chart.r_cl:.3f} °C")
print(f" UCL = {xbar_r_chart.r_ucl:.3f} °C")
print(f" LCL = {xbar_r_chart.r_lcl:.3f} °C")
# Phase II: Continuous monitoring (30 subgroups, anomaly occurs midway)
n_subgroups_phase2 = 30
phase2_data = np.random.normal(target_temp, 0.5,
(n_subgroups_phase2, subgroup_size))
# Subgroups 15-20 show mean shift (+1.5°C)
phase2_data[14:20, :] += 1.5
# Subgroup 25 shows increased spread
phase2_data[24, :] = np.random.normal(target_temp, 2.0, subgroup_size)
# Monitor
results = xbar_r_chart.predict(phase2_data)
print("\n=== Phase II: Continuous Monitoring ===")
print(f"X-bar out of control: {results['xbar_ooc'].sum()} points "
f"(Subgroups {np.where(results['xbar_ooc'])[0] + 1})")
print(f"R out of control: {results['r_ooc'].sum()} points "
f"(Subgroups {np.where(results['r_ooc'])[0] + 1})")
# Plot
xbar_r_chart.plot(phase2_data,
title="Chemical Process Temperature Monitoring")
# Expected output:
# === Phase I: Establishing Control Limits ===
# X-bar chart:
# CL = 79.995 °C
# UCL = 80.666 °C
# LCL = 79.323 °C
#
# R chart:
# CL = 1.165 °C
# UCL = 2.464 °C
# LCL = 0.000 °C
#
# === Phase II: Continuous Monitoring ===
# X-bar out of control: 5 points (Subgroups [15 16 17 18 19])
# R out of control: 1 point (Subgroups [25])
Control limits (±3σ) are calculated from process variation and determine whether the process is statistically stable. Specification limits, on the other hand, are determined by customer requirements or design specifications and determine product acceptance. Products can be out of specification even when within control limits.
For attribute data such as number of defectives or defects, rather than continuous variables, different control charts are used.
Manages the proportion defective in sampled products from inspection lots.
# ===================================
# Example 2: P-Chart for Proportion Defective
# ===================================
class PControlChart:
"""P-Chart (Proportion Defective Chart) Implementation
Used for managing defect rate data following binomial distribution.
Applicable even when sample sizes are not constant.
Attributes:
-----------
p_bar : float
Overall average defect rate
ucl, lcl : np.ndarray
Control limits according to each sample size
"""
def __init__(self):
self.p_bar = None
self.ucl = None
self.lcl = None
self.n = None
def fit(self, defectives: np.ndarray, sample_sizes: np.ndarray):
"""Calculate control limits
Parameters:
-----------
defectives : np.ndarray
Number of defectives in each lot
sample_sizes : np.ndarray
Sample size for each lot
"""
# Calculate average defect rate
self.p_bar = defectives.sum() / sample_sizes.sum()
self.n = sample_sizes
# Control limits according to each sample size (±3σ)
# σ_p = sqrt(p_bar * (1 - p_bar) / n)
sigma_p = np.sqrt(self.p_bar * (1 - self.p_bar) / self.n)
self.ucl = self.p_bar + 3 * sigma_p
self.lcl = self.p_bar - 3 * sigma_p
# LCL cannot be negative
self.lcl = np.maximum(self.lcl, 0)
# UCL cannot exceed 1
self.ucl = np.minimum(self.ucl, 1)
return self
def predict(self, defectives: np.ndarray,
sample_sizes: np.ndarray) -> Dict[str, np.ndarray]:
"""Determine control status
Parameters:
-----------
defectives : np.ndarray
Number of defectives in each lot
sample_sizes : np.ndarray
Sample size for each lot
Returns:
--------
results : dict
Dictionary containing 'p', 'ucl', 'lcl', 'ooc'
"""
if self.p_bar is None:
raise ValueError("Call fit() first")
# Calculate defect rate
p = defectives / sample_sizes
# Recalculate control limits (when sample sizes differ)
sigma_p = np.sqrt(self.p_bar * (1 - self.p_bar) / sample_sizes)
ucl = np.minimum(self.p_bar + 3 * sigma_p, 1)
lcl = np.maximum(self.p_bar - 3 * sigma_p, 0)
# Determine out of control
ooc = (p > ucl) | (p < lcl)
return {
'p': p,
'ucl': ucl,
'lcl': lcl,
'ooc': ooc
}
def plot(self, defectives: np.ndarray, sample_sizes: np.ndarray,
title: str = "P Control Chart"):
"""Plot P-chart"""
results = self.predict(defectives, sample_sizes)
n_points = len(results['p'])
plt.figure(figsize=(12, 6))
# Plot defect rate
plt.plot(range(1, n_points+1), results['p'],
marker='o', color='#2c3e50', linewidth=1.5,
markersize=6, label='Proportion Defective')
# Center line
plt.axhline(self.p_bar, color='#11998e', linestyle='-',
linewidth=2, label=f'CL = {self.p_bar:.4f}')
# Control limits (stepped if sample sizes vary)
plt.plot(range(1, n_points+1), results['ucl'],
color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
plt.plot(range(1, n_points+1), results['lcl'],
color='#e74c3c', linestyle='--', linewidth=2, label='LCL')
# Highlight out of control points
ooc_indices = np.where(results['ooc'])[0]
if len(ooc_indices) > 0:
plt.scatter(ooc_indices + 1, results['p'][ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
plt.xlabel('Lot Number', fontsize=11)
plt.ylabel('Proportion Defective', fontsize=11)
plt.title(title, fontsize=13, fontweight='bold')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Example usage: Semiconductor manufacturing defect rate management
np.random.seed(42)
# Phase I: Establish control limits with initial 30 lots
n_lots_phase1 = 30
sample_sizes_phase1 = np.random.randint(100, 200, n_lots_phase1)
true_defect_rate = 0.02 # True defect rate 2%
# Generate number of defectives with binomial distribution
defectives_phase1 = np.random.binomial(sample_sizes_phase1,
true_defect_rate)
# Build P-chart
p_chart = PControlChart()
p_chart.fit(defectives_phase1, sample_sizes_phase1)
print("=== P-Chart: Establishing Control Limits ===")
print(f"Average defect rate (p-bar) = {p_chart.p_bar:.4f} ({p_chart.p_bar*100:.2f}%)")
# Phase II: Continuous monitoring (40 lots, defect rate increases midway)
n_lots_phase2 = 40
sample_sizes_phase2 = np.random.randint(100, 200, n_lots_phase2)
defectives_phase2 = np.random.binomial(sample_sizes_phase2,
true_defect_rate)
# Lots 25-30 show 3x defect rate increase (process anomaly)
defectives_phase2[24:30] = np.random.binomial(
sample_sizes_phase2[24:30], true_defect_rate * 3)
# Monitor
results = p_chart.predict(defectives_phase2, sample_sizes_phase2)
print(f"\n=== Phase II: Continuous Monitoring ===")
print(f"Out of control lots: {results['ooc'].sum()} lots")
print(f"Corresponding lot numbers: {np.where(results['ooc'])[0] + 1}")
# Plot
p_chart.plot(defectives_phase2, sample_sizes_phase2,
title="Semiconductor Manufacturing - Defect Rate Monitoring")
# Expected output:
# === P-Chart: Establishing Control Limits ===
# Average defect rate (p-bar) = 0.0189 (1.89%)
#
# === Phase II: Continuous Monitoring ===
# Out of control lots: 6 lots
# Corresponding lot numbers: [25 26 27 28 29 30]
Manages the number of defects per constant inspection unit. Assumes Poisson distribution.
# ===================================
# Example 3: C-Chart for Count of Defects
# ===================================
class CControlChart:
"""C-Chart (Count of Defects Chart) Implementation
Manages number of defects per constant inspection unit (area, time, volume, etc.).
Assumes defect count follows Poisson distribution.
Attributes:
-----------
c_bar : float
Average defect count
ucl, lcl : float
Control limits
"""
def __init__(self):
self.c_bar = None
self.ucl = None
self.lcl = None
def fit(self, defects: np.ndarray):
"""Calculate control limits
Parameters:
-----------
defects : np.ndarray
Defect count for each inspection unit
"""
# Calculate average defect count
self.c_bar = defects.mean()
# Poisson distribution standard deviation = sqrt(mean)
# Control limits = mean ± 3 * sqrt(mean)
sigma_c = np.sqrt(self.c_bar)
self.ucl = self.c_bar + 3 * sigma_c
self.lcl = self.c_bar - 3 * sigma_c
# LCL cannot be negative
self.lcl = max(self.lcl, 0)
return self
def predict(self, defects: np.ndarray) -> Dict[str, np.ndarray]:
"""Determine control status
Parameters:
-----------
defects : np.ndarray
Defect count for each inspection unit
Returns:
--------
results : dict
Dictionary containing 'c', 'ooc'
"""
if self.c_bar is None:
raise ValueError("Call fit() first")
# Determine out of control
ooc = (defects > self.ucl) | (defects < self.lcl)
return {
'c': defects,
'ooc': ooc
}
def plot(self, defects: np.ndarray, title: str = "C Control Chart"):
"""Plot C-chart"""
results = self.predict(defects)
n_points = len(defects)
plt.figure(figsize=(12, 6))
# Plot defect count
plt.plot(range(1, n_points+1), defects,
marker='o', color='#2c3e50', linewidth=1.5,
markersize=6, label='Count of Defects')
# Center line and control limits
plt.axhline(self.c_bar, color='#11998e', linestyle='-',
linewidth=2, label=f'CL = {self.c_bar:.2f}')
plt.axhline(self.ucl, color='#e74c3c', linestyle='--',
linewidth=2, label=f'UCL = {self.ucl:.2f}')
plt.axhline(self.lcl, color='#e74c3c', linestyle='--',
linewidth=2, label=f'LCL = {self.lcl:.2f}')
# Highlight out of control points
ooc_indices = np.where(results['ooc'])[0]
if len(ooc_indices) > 0:
plt.scatter(ooc_indices + 1, defects[ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
plt.xlabel('Inspection Unit', fontsize=11)
plt.ylabel('Number of Defects', fontsize=11)
plt.title(title, fontsize=13, fontweight='bold')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Example usage: Paint surface defect management (defects per m²)
np.random.seed(42)
# Phase I: Establish control limits with initial 30 panels
n_panels_phase1 = 30
avg_defects = 3.5 # Average 3.5 defects/m²
# Generate defect count with Poisson distribution
defects_phase1 = np.random.poisson(avg_defects, n_panels_phase1)
# Build C-chart
c_chart = CControlChart()
c_chart.fit(defects_phase1)
print("=== C-Chart: Establishing Control Limits ===")
print(f"Average defect count (c-bar) = {c_chart.c_bar:.2f} defects/m²")
print(f"UCL = {c_chart.ucl:.2f} defects/m²")
print(f"LCL = {c_chart.lcl:.2f} defects/m²")
# Phase II: Continuous monitoring (50 panels, defect increase midway)
n_panels_phase2 = 50
defects_phase2 = np.random.poisson(avg_defects, n_panels_phase2)
# Panels 30-35 show 2x defect increase (paint degradation)
defects_phase2[29:35] = np.random.poisson(avg_defects * 2, 6)
# Monitor
results = c_chart.predict(defects_phase2)
print(f"\n=== Phase II: Continuous Monitoring ===")
print(f"Out of control panels: {results['ooc'].sum()} panels")
print(f"Corresponding panel numbers: {np.where(results['ooc'])[0] + 1}")
# Plot
c_chart.plot(defects_phase2,
title="Paint Defects Monitoring (per m²)")
# Expected output:
# === C-Chart: Establishing Control Limits ===
# Average defect count (c-bar) = 3.40 defects/m²
# UCL = 8.93 defects/m²
# LCL = 0.00 defects/m²
#
# === Phase II: Continuous Monitoring ===
# Out of control panels: 2 panels
# Corresponding panel numbers: [30 33]
Traditional control charts excel at detecting large changes but have limitations in early detection of small changes. The following methods enable more sensitive monitoring.
By accumulating deviations from the mean, detects small changes (approximately 0.5-2σ) early. Can detect 4-8 times faster than Shewhart control charts on average.
Implementation of cumulative sum control chart (tabular CUSUM)
# ===================================
# Example 4: CUSUM (Cumulative Sum) Control Chart
# ===================================
class CUSUMControlChart:
"""CUSUM Control Chart Implementation (Tabular CUSUM)
Method for early detection of small mean shifts (0.5-2σ).
Calculates upper and lower CUSUM statistics separately.
Parameters:
-----------
target : float
Target value (process mean)
std : float
Process standard deviation
k : float
Reference value (allowance). Typically half of shift to detect.
Default: 0.5σ (for detecting 1σ shift)
h : float
Decision interval. Functions as control limit.
Default: 5σ (corresponds to Type I error rate of 0.27%)
Attributes:
-----------
cusum_high : np.ndarray
Upper CUSUM statistic
cusum_low : np.ndarray
Lower CUSUM statistic
"""
def __init__(self, target: float, std: float,
k: float = 0.5, h: float = 5.0):
self.target = target
self.std = std
self.k = k * std # Reference value (absolute)
self.h = h * std # Decision interval (absolute)
self.cusum_high = None
self.cusum_low = None
def fit_predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
"""Calculate CUSUM statistics and detect out of control
Parameters:
-----------
data : np.ndarray
Observation data
Returns:
--------
results : dict
Dictionary containing 'cusum_high', 'cusum_low', 'ooc_high', 'ooc_low'
"""
n = len(data)
self.cusum_high = np.zeros(n)
self.cusum_low = np.zeros(n)
# Sequential calculation of CUSUM statistics
for i in range(n):
if i == 0:
# Initial value
self.cusum_high[i] = max(0, data[i] - self.target - self.k)
self.cusum_low[i] = max(0, self.target - data[i] - self.k)
else:
# Sequential update formula
self.cusum_high[i] = max(0, self.cusum_high[i-1] +
data[i] - self.target - self.k)
self.cusum_low[i] = max(0, self.cusum_low[i-1] +
self.target - data[i] - self.k)
# Determine out of control
ooc_high = self.cusum_high > self.h
ooc_low = self.cusum_low > self.h
return {
'data': data,
'cusum_high': self.cusum_high,
'cusum_low': self.cusum_low,
'ooc_high': ooc_high,
'ooc_low': ooc_low
}
def plot(self, data: np.ndarray, title: str = "CUSUM Control Chart"):
"""Plot CUSUM control chart"""
results = self.fit_predict(data)
n_points = len(data)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# Plot original data
ax1.plot(range(1, n_points+1), data,
marker='o', color='#2c3e50', linewidth=1.5, markersize=5)
ax1.axhline(self.target, color='#11998e', linestyle='-',
linewidth=2, label='Target')
ax1.axhline(self.target + self.k, color='#f39c12',
linestyle=':', linewidth=1.5, label=f'Target ± k')
ax1.axhline(self.target - self.k, color='#f39c12',
linestyle=':', linewidth=1.5)
ax1.set_ylabel('Observation', fontsize=11)
ax1.set_title('Process Data', fontsize=12, fontweight='bold')
ax1.legend(loc='best')
ax1.grid(True, alpha=0.3)
# Plot CUSUM statistics
ax2.plot(range(1, n_points+1), results['cusum_high'],
marker='^', color='#e74c3c', linewidth=1.5,
markersize=5, label='CUSUM High')
ax2.plot(range(1, n_points+1), results['cusum_low'],
marker='v', color='#3498db', linewidth=1.5,
markersize=5, label='CUSUM Low')
ax2.axhline(self.h, color='#2c3e50', linestyle='--',
linewidth=2, label=f'Decision Interval (h={self.h/self.std:.1f}σ)')
ax2.axhline(0, color='gray', linestyle='-', linewidth=1)
# Highlight out of control points
ooc_high_indices = np.where(results['ooc_high'])[0]
ooc_low_indices = np.where(results['ooc_low'])[0]
if len(ooc_high_indices) > 0:
ax2.scatter(ooc_high_indices + 1,
results['cusum_high'][ooc_high_indices],
color='red', s=100, zorder=5, marker='^')
if len(ooc_low_indices) > 0:
ax2.scatter(ooc_low_indices + 1,
results['cusum_low'][ooc_low_indices],
color='red', s=100, zorder=5, marker='v')
ax2.set_xlabel('Observation Number', fontsize=11)
ax2.set_ylabel('CUSUM Statistic', fontsize=11)
ax2.set_title('CUSUM Statistics', fontsize=12, fontweight='bold')
ax2.legend(loc='best')
ax2.grid(True, alpha=0.3)
plt.suptitle(title, fontsize=15, fontweight='bold', y=0.995)
plt.tight_layout()
plt.show()
# Example usage: Chemical process mean shift detection
np.random.seed(42)
# Target value and standard deviation
target_value = 100.0
process_std = 1.0
# Generate data (100 observations)
n_obs = 100
data = np.random.normal(target_value, process_std, n_obs)
# Small shift of +0.8σ from observation 40 (difficult to detect with Shewhart chart)
data[40:] += 0.8 * process_std
# Apply CUSUM control chart (configured to detect 1σ shift)
cusum_chart = CUSUMControlChart(
target=target_value,
std=process_std,
k=0.5, # Half of 1σ shift
h=5.0 # Decision interval
)
results = cusum_chart.fit_predict(data)
print("=== CUSUM Control Chart ===")
print(f"Target value: {target_value}")
print(f"Reference value (k): {cusum_chart.k:.3f} ({cusum_chart.k/process_std:.1f}σ)")
print(f"Decision interval (h): {cusum_chart.h:.3f} ({cusum_chart.h/process_std:.1f}σ)")
# Identify detection point
if results['ooc_high'].any():
first_ooc = np.where(results['ooc_high'])[0][0] + 1
print(f"\nUpward shift detected: Observation #{first_ooc}")
print(f" Actual shift start: Observation #41")
print(f" Detection delay: {first_ooc - 41} observations")
# Plot
cusum_chart.plot(data,
title="CUSUM Chart - Small Mean Shift Detection")
# Expected output:
# === CUSUM Control Chart ===
# Target value: 100.0
# Reference value (k): 0.500 (0.5σ)
# Decision interval (h): 5.000 (5.0σ)
#
# Upward shift detected: Observation #49
# Actual shift start: Observation #41
# Detection delay: 8 observations
Control chart using exponentially weighted moving average. Like CUSUM, detects small shifts.
# ===================================
# Example 5: EWMA (Exponentially Weighted Moving Average) Control Chart
# ===================================
class EWMAControlChart:
"""EWMA Control Chart Implementation
Smoothing by applying exponentially decaying weights to past observations.
Like CUSUM, excels at detecting small shifts.
Parameters:
-----------
target : float
Target value (process mean)
std : float
Process standard deviation
lambda_ : float
Weight coefficient (0 < λ ≤ 1). Smaller values give more weight to past.
Typical values: 0.05-0.25
L : float
Control limit width (multiple of standard deviation). Default: 3.0
Attributes:
-----------
ewma : np.ndarray
EWMA statistic
"""
def __init__(self, target: float, std: float,
lambda_: float = 0.2, L: float = 3.0):
if not 0 < lambda_ <= 1:
raise ValueError("lambda_ must be in (0, 1]")
self.target = target
self.std = std
self.lambda_ = lambda_
self.L = L
self.ewma = None
self.ucl = None
self.lcl = None
def fit_predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
"""Calculate EWMA statistics and detect out of control
Parameters:
-----------
data : np.ndarray
Observation data
Returns:
--------
results : dict
Dictionary containing 'ewma', 'ucl', 'lcl', 'ooc'
"""
n = len(data)
self.ewma = np.zeros(n)
self.ucl = np.zeros(n)
self.lcl = np.zeros(n)
# Sequential calculation of EWMA statistics
# Z_t = λ * X_t + (1-λ) * Z_{t-1}
# Initial value: Z_0 = target
self.ewma[0] = self.lambda_ * data[0] + (1 - self.lambda_) * self.target
for i in range(1, n):
self.ewma[i] = self.lambda_ * data[i] + (1 - self.lambda_) * self.ewma[i-1]
# Calculate control limits (time-dependent)
# σ_Z(t) = σ * sqrt(λ/(2-λ) * [1 - (1-λ)^(2t)])
for i in range(n):
t = i + 1
sigma_z = self.std * np.sqrt(
(self.lambda_ / (2 - self.lambda_)) *
(1 - (1 - self.lambda_)**(2 * t))
)
self.ucl[i] = self.target + self.L * sigma_z
self.lcl[i] = self.target - self.L * sigma_z
# Determine out of control
ooc = (self.ewma > self.ucl) | (self.ewma < self.lcl)
return {
'data': data,
'ewma': self.ewma,
'ucl': self.ucl,
'lcl': self.lcl,
'ooc': ooc
}
def plot(self, data: np.ndarray, title: str = "EWMA Control Chart"):
"""Plot EWMA control chart"""
results = self.fit_predict(data)
n_points = len(data)
plt.figure(figsize=(12, 6))
# Plot EWMA statistic
plt.plot(range(1, n_points+1), results['ewma'],
marker='o', color='#2c3e50', linewidth=2,
markersize=5, label='EWMA')
# Original data (faint)
plt.plot(range(1, n_points+1), data,
marker='.', color='gray', linewidth=0.5,
markersize=3, alpha=0.5, label='Original Data')
# Center line and control limits
plt.axhline(self.target, color='#11998e', linestyle='-',
linewidth=2, label='Target')
plt.plot(range(1, n_points+1), results['ucl'],
color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
plt.plot(range(1, n_points+1), results['lcl'],
color='#e74c3c', linestyle='--', linewidth=2, label='LCL')
# Highlight out of control points
ooc_indices = np.where(results['ooc'])[0]
if len(ooc_indices) > 0:
plt.scatter(ooc_indices + 1, results['ewma'][ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
plt.xlabel('Observation Number', fontsize=11)
plt.ylabel('EWMA Statistic', fontsize=11)
plt.title(title + f" (λ={self.lambda_}, L={self.L})",
fontsize=13, fontweight='bold')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Example usage: Pharmaceutical manufacturing content management
np.random.seed(42)
# Target value and standard deviation
target_content = 50.0 # mg
process_std = 0.5 # mg
# Generate data (120 observations)
n_obs = 120
data = np.random.normal(target_content, process_std, n_obs)
# Small shift of +0.5σ from observation 60
data[60:] += 0.5 * process_std
# Apply EWMA control chart (λ=0.2 for moderate smoothing)
ewma_chart = EWMAControlChart(
target=target_content,
std=process_std,
lambda_=0.2,
L=3.0
)
results = ewma_chart.fit_predict(data)
print("=== EWMA Control Chart ===")
print(f"Target value: {target_content} mg")
print(f"Weight coefficient (λ): {ewma_chart.lambda_}")
print(f"Control limit width (L): {ewma_chart.L}σ")
# Identify detection point
if results['ooc'].any():
first_ooc = np.where(results['ooc'])[0][0] + 1
print(f"\nShift detected: Observation #{first_ooc}")
print(f" Actual shift start: Observation #61")
print(f" Detection delay: {first_ooc - 61} observations")
# Plot
ewma_chart.plot(data,
title="EWMA Chart - Drug Content Monitoring")
# Expected output:
# === EWMA Control Chart ===
# Target value: 50.0 mg
# Weight coefficient (λ): 0.2
# Control limit width (L): 3.0σ
#
# Shift detected: Observation #68
# Actual shift start: Observation #61
# Detection delay: 7 observations
After confirming the process is statistically stable using control charts, calculate process capability indices.
Integrated implementation of control charts and process capability analysis
# ===================================
# Example 6: Integrated Process Capability Analysis
# ===================================
from scipy import stats
class ProcessCapabilityAnalysis:
"""Integrated Control Chart and Process Capability Analysis Class
1. Confirm process stability using control charts
2. Calculate capability indices for stable process
3. Generate comprehensive evaluation report
"""
def __init__(self, LSL: float, USL: float, target: float = None):
"""
Parameters:
-----------
LSL : float
Lower Specification Limit
USL : float
Upper Specification Limit
target : float, optional
Target value. If omitted, uses specification center
"""
self.LSL = LSL
self.USL = USL
self.target = target if target is not None else (LSL + USL) / 2
self.control_chart = None
self.capability_results = None
def analyze(self, data: np.ndarray,
subgroup_size: int = 5) -> Dict:
"""Execute integrated analysis
Parameters:
-----------
data : np.ndarray
Data with shape (n_subgroups, subgroup_size)
subgroup_size : int
Subgroup size
Returns:
--------
results : dict
Dictionary containing control chart information and process capability indices
"""
# Step 1: Confirm stability with X-bar and R chart
self.control_chart = XbarRControlChart(subgroup_size)
self.control_chart.fit(data)
chart_results = self.control_chart.predict(data)
# Check for out of control points
n_ooc_xbar = chart_results['xbar_ooc'].sum()
n_ooc_r = chart_results['r_ooc'].sum()
is_stable = (n_ooc_xbar == 0) and (n_ooc_r == 0)
# Step 2: Calculate process capability indices (recommended only when stable)
all_data = data.flatten()
process_mean = all_data.mean()
process_std = all_data.std(ddof=1)
# Cp: Process capability index (assumes centered)
Cp = (self.USL - self.LSL) / (6 * process_std)
# Cpk: Adjusted process capability index (considers actual mean position)
Cpu = (self.USL - process_mean) / (3 * process_std)
Cpl = (process_mean - self.LSL) / (3 * process_std)
Cpk = min(Cpu, Cpl)
# Cpm: Taguchi capability index (considers deviation from target)
tau = np.sqrt(process_std**2 + (process_mean - self.target)**2)
Cpm = (self.USL - self.LSL) / (6 * tau)
# Estimate out-of-spec rate (assuming normal distribution)
z_lower = (self.LSL - process_mean) / process_std
z_upper = (self.USL - process_mean) / process_std
ppm_lower = stats.norm.cdf(z_lower) * 1e6
ppm_upper = (1 - stats.norm.cdf(z_upper)) * 1e6
ppm_total = ppm_lower + ppm_upper
# Step 3: Comprehensive evaluation
self.capability_results = {
'stable': is_stable,
'n_ooc_xbar': n_ooc_xbar,
'n_ooc_r': n_ooc_r,
'process_mean': process_mean,
'process_std': process_std,
'Cp': Cp,
'Cpk': Cpk,
'Cpm': Cpm,
'Cpu': Cpu,
'Cpl': Cpl,
'ppm_lower': ppm_lower,
'ppm_upper': ppm_upper,
'ppm_total': ppm_total,
'LSL': self.LSL,
'USL': self.USL,
'target': self.target
}
return self.capability_results
def print_report(self):
"""Output comprehensive evaluation report"""
if self.capability_results is None:
raise ValueError("Call analyze() first")
r = self.capability_results
print("=" * 60)
print("Integrated Process Capability Analysis Report")
print("=" * 60)
print("\n[1. Process Stability (Control Chart)]")
if r['stable']:
print(" ✅ Process is statistically stable")
else:
print(" ⚠️ Process is unstable (out of control points present)")
print(f" X-bar out of control: {r['n_ooc_xbar']} points")
print(f" R out of control: {r['n_ooc_r']} points")
print(" → Stabilize process first")
print("\n[2. Process Characteristics]")
print(f" Process mean: {r['process_mean']:.4f}")
print(f" Process standard deviation: {r['process_std']:.4f}")
print(f" Specification range: [{r['LSL']:.2f}, {r['USL']:.2f}]")
print(f" Target value: {r['target']:.2f}")
print("\n[3. Process Capability Indices]")
print(f" Cp = {r['Cp']:.3f} ", end="")
print(self._evaluate_index(r['Cp']))
print(f" Cpk = {r['Cpk']:.3f} ", end="")
print(self._evaluate_index(r['Cpk']))
print(f" (Cpu={r['Cpu']:.3f}, Cpl={r['Cpl']:.3f})")
print(f" Cpm = {r['Cpm']:.3f} ", end="")
print(self._evaluate_index(r['Cpm']))
print("\n[4. Estimated Out-of-Spec Rate]")
print(f" Below LSL: {r['ppm_lower']:.1f} ppm")
print(f" Above USL: {r['ppm_upper']:.1f} ppm")
print(f" Total: {r['ppm_total']:.1f} ppm ({r['ppm_total']/1e4:.3f}%)")
print("\n[5. Overall Assessment]")
if r['stable'] and r['Cpk'] >= 1.33:
print(" ✅ Process is stable and has sufficient capability")
elif r['stable'] and r['Cpk'] >= 1.0:
print(" ⚠️ Process is stable but capability improvement needed")
else:
print(" ❌ Process improvement required")
print("=" * 60)
def _evaluate_index(self, value: float) -> str:
"""Evaluate capability index"""
if value >= 1.67:
return "(Excellent)"
elif value >= 1.33:
return "(Good)"
elif value >= 1.0:
return "(Minimum)"
else:
return "(Insufficient)"
def plot_capability(self, data: np.ndarray):
"""Histogram and normal distribution fit for process capability"""
if self.capability_results is None:
raise ValueError("Call analyze() first")
r = self.capability_results
all_data = data.flatten()
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
# Histogram and normal distribution fit
ax1.hist(all_data, bins=30, density=True, alpha=0.6,
color='#3498db', edgecolor='black', label='Data')
# Normal distribution curve
x = np.linspace(all_data.min(), all_data.max(), 200)
pdf = stats.norm.pdf(x, r['process_mean'], r['process_std'])
ax1.plot(x, pdf, 'r-', linewidth=2, label='Normal Fit')
# Specification limits and target
ax1.axvline(r['LSL'], color='#e74c3c', linestyle='--',
linewidth=2, label='LSL')
ax1.axvline(r['USL'], color='#e74c3c', linestyle='--',
linewidth=2, label='USL')
ax1.axvline(r['target'], color='#11998e', linestyle='-',
linewidth=2, label='Target')
ax1.set_xlabel('Value', fontsize=11)
ax1.set_ylabel('Density', fontsize=11)
ax1.set_title('Process Capability Histogram',
fontsize=13, fontweight='bold')
ax1.legend(loc='best')
ax1.grid(True, alpha=0.3)
# Capability indices bar chart
indices = ['Cp', 'Cpk', 'Cpm']
values = [r['Cp'], r['Cpk'], r['Cpm']]
colors = ['#3498db', '#e74c3c', '#11998e']
bars = ax2.bar(indices, values, color=colors, alpha=0.7,
edgecolor='black', linewidth=1.5)
# Reference lines
ax2.axhline(1.33, color='green', linestyle='--',
linewidth=2, label='Good (1.33)', alpha=0.7)
ax2.axhline(1.0, color='orange', linestyle='--',
linewidth=2, label='Minimum (1.0)', alpha=0.7)
# Display values on top of bars
for bar, value in zip(bars, values):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{value:.3f}',
ha='center', va='bottom', fontweight='bold')
ax2.set_ylabel('Index Value', fontsize=11)
ax2.set_title('Process Capability Indices',
fontsize=13, fontweight='bold')
ax2.legend(loc='best')
ax2.grid(True, alpha=0.3, axis='y')
ax2.set_ylim(0, max(values) * 1.3)
plt.tight_layout()
plt.show()
# Example usage: Injection molding dimension management
np.random.seed(42)
# Specification setting
LSL = 49.5 # mm
USL = 50.5 # mm
target = 50.0 # mm
# Generate data (40 subgroups, size 5)
n_subgroups = 40
subgroup_size = 5
# First 30 subgroups are stable (mean 50.0, standard deviation 0.15)
data_stable = np.random.normal(50.0, 0.15, (30, subgroup_size))
# Last 10 subgroups show slight mean shift (50.0 → 50.1)
data_shift = np.random.normal(50.1, 0.15, (10, subgroup_size))
data = np.vstack([data_stable, data_shift])
# Execute integrated analysis
analysis = ProcessCapabilityAnalysis(LSL, USL, target)
results = analysis.analyze(data, subgroup_size)
# Output report
analysis.print_report()
# Visualization
analysis.control_chart.plot(data,
title="Injection Molding - Dimension Control")
analysis.plot_capability(data)
# Expected output:
# ============================================================
# Integrated Process Capability Analysis Report
# ============================================================
#
# [1. Process Stability (Control Chart)]
# ⚠️ Process is unstable (out of control points present)
# X-bar out of control: 6 points
# R out of control: 0 points
# → Stabilize process first
#
# [2. Process Characteristics]
# Process mean: 50.0266
# Process standard deviation: 0.1505
# Specification range: [49.50, 50.50]
# Target value: 50.00
#
# [3. Process Capability Indices]
# Cp = 1.107 (Minimum)
# Cpk = 1.049 (Minimum)
# (Cpu=1.049, Cpl=1.165)
# Cpm = 1.089 (Minimum)
#
# [4. Estimated Out-of-Spec Rate]
# Below LSL: 0.1 ppm
# Above USL: 8.6 ppm
# Total: 8.7 ppm (0.001%)
#
# [5. Overall Assessment]
# ⚠️ Process is stable but capability improvement needed
# ============================================================
An advanced set of rules for detecting anomalies from data patterns, not just single points exceeding control limits.
Implementation of 8 decision rules
# ===================================
# Example 7: Western Electric Rules for Pattern Detection
# ===================================
class WesternElectricRules:
"""Western Electric Rules Implementation
Eight rules for detecting anomalies from data patterns on control charts.
Detects not only single points exceeding control limits but also trends and runs.
Rules:
------
Rule 1: One point beyond 3σ
Rule 2: Two out of three consecutive points beyond 2σ (same side)
Rule 3: Four out of five consecutive points beyond 1σ (same side)
Rule 4: Eight consecutive points on same side of center line
Rule 5: Six consecutive points monotonically increasing or decreasing
Rule 6: Fourteen consecutive points alternating up and down
Rule 7: Fifteen consecutive points within 1σ
Rule 8: Eight consecutive points beyond 1σ (either side)
"""
def __init__(self, center_line: float, ucl: float, lcl: float):
"""
Parameters:
-----------
center_line : float
Center line (mean)
ucl : float
Upper control limit (CL + 3σ)
lcl : float
Lower control limit (CL - 3σ)
"""
self.cl = center_line
self.ucl = ucl
self.lcl = lcl
# Calculate zone boundaries
sigma = (ucl - center_line) / 3
self.zone_a_upper = center_line + 2 * sigma
self.zone_a_lower = center_line - 2 * sigma
self.zone_b_upper = center_line + sigma
self.zone_b_lower = center_line - sigma
def apply_rules(self, data: np.ndarray) -> Dict[str, np.ndarray]:
"""Apply all rules
Parameters:
-----------
data : np.ndarray
Time series data
Returns:
--------
violations : dict
Boolean arrays indicating violation points for each rule
"""
n = len(data)
violations = {f'Rule{i}': np.zeros(n, dtype=bool) for i in range(1, 9)}
for i in range(n):
# Rule 1: One point beyond 3σ
if data[i] > self.ucl or data[i] < self.lcl:
violations['Rule1'][i] = True
# Rule 2: Two out of three consecutive points between 2σ and 3σ
if i >= 2:
window = data[i-2:i+1]
in_zone_a = np.sum((window > self.zone_a_upper) |
(window < self.zone_a_lower))
if in_zone_a >= 2:
violations['Rule2'][i] = True
# Rule 3: Four out of five consecutive points between 1σ and 2σ
if i >= 4:
window = data[i-4:i+1]
in_zone_b = np.sum(
((window > self.zone_b_upper) & (window <= self.zone_a_upper)) |
((window < self.zone_b_lower) & (window >= self.zone_a_lower))
)
if in_zone_b >= 4:
violations['Rule3'][i] = True
# Rule 4: Eight consecutive points on same side
if i >= 7:
window = data[i-7:i+1]
if np.all(window > self.cl) or np.all(window < self.cl):
violations['Rule4'][i] = True
# Rule 5: Six consecutive points monotonically increasing or decreasing
if i >= 5:
window = data[i-5:i+1]
diffs = np.diff(window)
if np.all(diffs > 0) or np.all(diffs < 0):
violations['Rule5'][i] = True
# Rule 6: Fourteen consecutive points alternating up and down
if i >= 13:
window = data[i-13:i+1]
diffs = np.diff(window)
signs = np.sign(diffs)
# Check if signs alternate
alternating = True
for j in range(len(signs) - 1):
if signs[j] * signs[j+1] >= 0:
alternating = False
break
if alternating:
violations['Rule6'][i] = True
# Rule 7: Fifteen consecutive points within 1σ
if i >= 14:
window = data[i-14:i+1]
if np.all((window >= self.zone_b_lower) &
(window <= self.zone_b_upper)):
violations['Rule7'][i] = True
# Rule 8: Eight consecutive points beyond 1σ
if i >= 7:
window = data[i-7:i+1]
if np.all((window > self.zone_b_upper) |
(window < self.zone_b_lower)):
violations['Rule8'][i] = True
return violations
def plot_with_zones(self, data: np.ndarray,
title: str = "Control Chart with Western Electric Rules"):
"""Plot control chart with zones"""
violations = self.apply_rules(data)
n = len(data)
# Combine all violation points
any_violation = np.zeros(n, dtype=bool)
for rule_violations in violations.values():
any_violation |= rule_violations
plt.figure(figsize=(14, 7))
# Display zones with color coding
plt.axhspan(self.ucl, self.zone_a_upper,
color='#e74c3c', alpha=0.1, label='Zone A (2-3σ)')
plt.axhspan(self.zone_a_upper, self.zone_b_upper,
color='#f39c12', alpha=0.1, label='Zone B (1-2σ)')
plt.axhspan(self.zone_b_upper, self.cl,
color='#2ecc71', alpha=0.1, label='Zone C (0-1σ)')
plt.axhspan(self.cl, self.zone_b_lower,
color='#2ecc71', alpha=0.1)
plt.axhspan(self.zone_b_lower, self.zone_a_lower,
color='#f39c12', alpha=0.1)
plt.axhspan(self.zone_a_lower, self.lcl,
color='#e74c3c', alpha=0.1)
# Plot data
plt.plot(range(1, n+1), data, marker='o', color='#2c3e50',
linewidth=1.5, markersize=5, label='Data', zorder=3)
# Control limits
plt.axhline(self.cl, color='#11998e', linestyle='-',
linewidth=2, label='CL', zorder=2)
plt.axhline(self.ucl, color='#e74c3c', linestyle='--',
linewidth=2, label='UCL (3σ)', zorder=2)
plt.axhline(self.lcl, color='#e74c3c', linestyle='--',
linewidth=2, label='LCL (3σ)', zorder=2)
# Zone boundaries (dotted)
plt.axhline(self.zone_a_upper, color='gray', linestyle=':',
linewidth=1, alpha=0.5, zorder=1)
plt.axhline(self.zone_a_lower, color='gray', linestyle=':',
linewidth=1, alpha=0.5, zorder=1)
plt.axhline(self.zone_b_upper, color='gray', linestyle=':',
linewidth=1, alpha=0.5, zorder=1)
plt.axhline(self.zone_b_lower, color='gray', linestyle=':',
linewidth=1, alpha=0.5, zorder=1)
# Highlight violation points
if any_violation.any():
violation_indices = np.where(any_violation)[0]
plt.scatter(violation_indices + 1, data[violation_indices],
color='red', s=150, marker='X', zorder=5,
label='Rule Violation')
plt.xlabel('Observation Number', fontsize=11)
plt.ylabel('Value', fontsize=11)
plt.title(title, fontsize=13, fontweight='bold')
plt.legend(loc='best', ncol=2)
plt.grid(True, alpha=0.3, zorder=0)
plt.tight_layout()
plt.show()
# Output violation summary
print("\n=== Western Electric Rules Violations ===")
for rule_name, rule_violations in violations.items():
n_violations = rule_violations.sum()
if n_violations > 0:
indices = np.where(rule_violations)[0] + 1
print(f"{rule_name}: {n_violations} times "
f"(Observations {list(indices[:5])}{'...' if n_violations > 5 else ''})")
# Example usage: Detecting various anomaly patterns
np.random.seed(42)
# Generate data (100 observations)
n = 100
data = np.random.normal(0, 1, n)
# Insert anomaly patterns
# Rule 1 violation: Exceeds 3σ
data[20] = 3.5
# Rule 4 violation: Eight consecutive points on upper side
data[40:48] += 0.5
# Rule 5 violation: Monotonic increasing trend
data[65:71] = np.linspace(0, 2, 6)
# Rule 7 violation: Reduced variation (within 1σ)
data[85:100] = np.random.normal(0, 0.2, 15)
# Apply Western Electric Rules
cl = 0.0
ucl = 3.0
lcl = -3.0
we_rules = WesternElectricRules(cl, ucl, lcl)
we_rules.plot_with_zones(data,
title="Western Electric Rules - Anomaly Pattern Detection")
# Expected output:
# === Western Electric Rules Violations ===
# Rule1: 1 time (Observations [21])
# Rule4: 1 time (Observations [48])
# Rule5: 1 time (Observations [71])
# Rule7: 1 time (Observations [100])
Sequential update monitoring system usable in actual manufacturing settings
# ===================================
# Example 8: Real-time SPC Monitoring System
# ===================================
import time
from datetime import datetime
from collections import deque
class RealTimeSPCMonitor:
"""Real-time SPC Monitoring System
System that sequentially updates control charts as new data arrives,
immediately detecting and notifying anomalies.
Features:
---------
- Simultaneous monitoring of EWMA, CUSUM, and Shewhart charts
- High-sensitivity detection with Western Electric Rules
- Alert notification functionality
- Historical data retention and visualization
"""
def __init__(self, target: float, std: float,
window_size: int = 100):
"""
Parameters:
-----------
target : float
Target value
std : float
Process standard deviation
window_size : int
Maximum number of historical data points to display
"""
self.target = target
self.std = std
self.window_size = window_size
# Control limits
self.ucl = target + 3 * std
self.lcl = target - 3 * std
# Initialize each method
self.ewma_chart = EWMAControlChart(target, std, lambda_=0.2)
self.cusum_chart = CUSUMControlChart(target, std, k=0.5, h=5.0)
self.we_rules = WesternElectricRules(target, self.ucl, self.lcl)
# Data history (efficiently managed with deque)
self.data_history = deque(maxlen=window_size)
self.timestamps = deque(maxlen=window_size)
self.ewma_history = deque(maxlen=window_size)
# Alert history
self.alerts = []
def add_observation(self, value: float,
timestamp: datetime = None) -> Dict:
"""Add new observation and perform anomaly detection
Parameters:
-----------
value : float
New observation value
timestamp : datetime, optional
Timestamp (current time if omitted)
Returns:
--------
status : dict
Detection results and alert information
"""
if timestamp is None:
timestamp = datetime.now()
# Add data
self.data_history.append(value)
self.timestamps.append(timestamp)
# Anomaly detection
alerts = []
# 1. Shewhart control chart (3σ rule)
if value > self.ucl:
alerts.append({
'method': 'Shewhart',
'type': 'UCL violation',
'value': value,
'severity': 'HIGH'
})
elif value < self.lcl:
alerts.append({
'method': 'Shewhart',
'type': 'LCL violation',
'value': value,
'severity': 'HIGH'
})
# 2. EWMA control chart
if len(self.data_history) >= 1:
data_array = np.array(self.data_history)
ewma_results = self.ewma_chart.fit_predict(data_array)
current_ewma = ewma_results['ewma'][-1]
current_ucl = ewma_results['ucl'][-1]
current_lcl = ewma_results['lcl'][-1]
self.ewma_history.append(current_ewma)
if ewma_results['ooc'][-1]:
alerts.append({
'method': 'EWMA',
'type': 'EWMA limit violation',
'value': current_ewma,
'severity': 'MEDIUM'
})
# 3. CUSUM control chart
if len(self.data_history) >= 1:
data_array = np.array(self.data_history)
cusum_results = self.cusum_chart.fit_predict(data_array)
if cusum_results['ooc_high'][-1]:
alerts.append({
'method': 'CUSUM',
'type': 'Upward shift detected',
'value': cusum_results['cusum_high'][-1],
'severity': 'MEDIUM'
})
if cusum_results['ooc_low'][-1]:
alerts.append({
'method': 'CUSUM',
'type': 'Downward shift detected',
'value': cusum_results['cusum_low'][-1],
'severity': 'MEDIUM'
})
# 4. Western Electric Rules
if len(self.data_history) >= 15: # Minimum data required
data_array = np.array(self.data_history)
violations = self.we_rules.apply_rules(data_array)
for rule_name, rule_violations in violations.items():
if rule_violations[-1]: # Violation at latest point
alerts.append({
'method': 'Western Electric',
'type': f'{rule_name} violation',
'value': value,
'severity': 'LOW'
})
# Save alerts
if alerts:
for alert in alerts:
alert['timestamp'] = timestamp
self.alerts.append(alert)
# Return status
status = {
'value': value,
'timestamp': timestamp,
'in_control': len(alerts) == 0,
'alerts': alerts,
'n_observations': len(self.data_history)
}
return status
def get_summary(self) -> Dict:
"""Summary of current monitoring status"""
if len(self.data_history) == 0:
return {'status': 'No data'}
data_array = np.array(self.data_history)
return {
'n_observations': len(self.data_history),
'current_value': data_array[-1],
'process_mean': data_array.mean(),
'process_std': data_array.std(ddof=1),
'target': self.target,
'n_alerts_total': len(self.alerts),
'n_alerts_last_10': sum(1 for a in self.alerts
if len(self.data_history) -
list(self.timestamps).index(a['timestamp']) <= 10),
'alert_methods': list(set(a['method'] for a in self.alerts))
}
def plot_dashboard(self):
"""Real-time monitoring dashboard"""
if len(self.data_history) < 2:
print("Insufficient data (minimum 2 points required)")
return
data_array = np.array(self.data_history)
n = len(data_array)
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
# 1. Shewhart control chart
ax1 = axes[0]
x_indices = range(1, n+1)
ax1.plot(x_indices, data_array, marker='o', color='#2c3e50',
linewidth=1.5, markersize=4, label='Data')
ax1.axhline(self.target, color='#11998e', linestyle='-',
linewidth=2, label='Target')
ax1.axhline(self.ucl, color='#e74c3c', linestyle='--',
linewidth=2, label='UCL')
ax1.axhline(self.lcl, color='#e74c3c', linestyle='--',
linewidth=2, label='LCL')
# Highlight Shewhart violation points
ooc_indices = (data_array > self.ucl) | (data_array < self.lcl)
if ooc_indices.any():
ooc_x = np.where(ooc_indices)[0] + 1
ax1.scatter(ooc_x, data_array[ooc_indices],
color='red', s=100, zorder=5)
ax1.set_ylabel('Value', fontsize=10)
ax1.set_title('Shewhart Control Chart (3σ)',
fontsize=12, fontweight='bold')
ax1.legend(loc='best', fontsize=9)
ax1.grid(True, alpha=0.3)
# 2. EWMA control chart
ax2 = axes[1]
if len(self.ewma_history) > 0:
ewma_array = np.array(self.ewma_history)
ewma_results = self.ewma_chart.fit_predict(data_array)
ax2.plot(x_indices, data_array, marker='.', color='gray',
linewidth=0.5, markersize=3, alpha=0.5, label='Original')
ax2.plot(x_indices, ewma_array, marker='o', color='#3498db',
linewidth=2, markersize=4, label='EWMA')
ax2.axhline(self.target, color='#11998e', linestyle='-',
linewidth=2, label='Target')
ax2.plot(x_indices, ewma_results['ucl'],
color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
ax2.plot(x_indices, ewma_results['lcl'],
color='#e74c3c', linestyle='--', linewidth=2, label='LCL')
# Highlight EWMA violation points
if ewma_results['ooc'].any():
ooc_x = np.where(ewma_results['ooc'])[0] + 1
ax2.scatter(ooc_x, ewma_array[ewma_results['ooc']],
color='red', s=100, zorder=5)
ax2.set_ylabel('EWMA Value', fontsize=10)
ax2.set_title('EWMA Control Chart (λ=0.2)',
fontsize=12, fontweight='bold')
ax2.legend(loc='best', fontsize=9)
ax2.grid(True, alpha=0.3)
# 3. CUSUM control chart
ax3 = axes[2]
cusum_results = self.cusum_chart.fit_predict(data_array)
ax3.plot(x_indices, cusum_results['cusum_high'],
marker='^', color='#e74c3c', linewidth=1.5,
markersize=4, label='CUSUM High')
ax3.plot(x_indices, cusum_results['cusum_low'],
marker='v', color='#3498db', linewidth=1.5,
markersize=4, label='CUSUM Low')
ax3.axhline(self.cusum_chart.h, color='#2c3e50',
linestyle='--', linewidth=2, label='Decision Interval (h)')
ax3.axhline(0, color='gray', linestyle='-', linewidth=1)
# Highlight CUSUM violation points
if cusum_results['ooc_high'].any():
ooc_x = np.where(cusum_results['ooc_high'])[0] + 1
ax3.scatter(ooc_x, cusum_results['cusum_high'][cusum_results['ooc_high']],
color='red', s=100, zorder=5, marker='^')
if cusum_results['ooc_low'].any():
ooc_x = np.where(cusum_results['ooc_low'])[0] + 1
ax3.scatter(ooc_x, cusum_results['cusum_low'][cusum_results['ooc_low']],
color='red', s=100, zorder=5, marker='v')
ax3.set_xlabel('Observation Number', fontsize=10)
ax3.set_ylabel('CUSUM Value', fontsize=10)
ax3.set_title('CUSUM Control Chart',
fontsize=12, fontweight='bold')
ax3.legend(loc='best', fontsize=9)
ax3.grid(True, alpha=0.3)
plt.suptitle('Real-time SPC Monitoring Dashboard',
fontsize=15, fontweight='bold', y=0.995)
plt.tight_layout()
plt.show()
# Example usage: Real-time monitoring simulation
np.random.seed(42)
# Initialize monitoring system
target = 100.0
std = 1.0
monitor = RealTimeSPCMonitor(target, std, window_size=150)
print("=== Real-time SPC Monitoring System ===")
print(f"Target value: {target}")
print(f"Standard deviation: {std}")
print("\nSequentially adding observation data...\n")
# Simulation: Sequentially add 150 observations
for i in range(150):
# Normal data
if i < 60:
value = np.random.normal(target, std)
# Observations 60-80: +1σ shift
elif i < 80:
value = np.random.normal(target + std, std)
# Observations 80-120: Return to normal
elif i < 120:
value = np.random.normal(target, std)
# Observations 120-150: Increased variation
else:
value = np.random.normal(target, std * 1.5)
# Add observation value
status = monitor.add_observation(value)
# Display if alert
if not status['in_control']:
print(f"⚠️ Observation #{i+1}: Alert triggered")
for alert in status['alerts']:
print(f" - {alert['method']}: {alert['type']} "
f"(Severity: {alert['severity']})")
# Display summary
print("\n=== Monitoring Summary ===")
summary = monitor.get_summary()
for key, value in summary.items():
print(f"{key}: {value}")
# Display dashboard
monitor.plot_dashboard()
# Expected output:
# === Real-time SPC Monitoring System ===
# Target value: 100.0
# Standard deviation: 1.0
#
# Sequentially adding observation data...
#
# ⚠️ Observation #67: Alert triggered
# - EWMA: EWMA limit violation (Severity: MEDIUM)
# - CUSUM: Upward shift detected (Severity: MEDIUM)
# ...(multiple alerts)
#
# === Monitoring Summary ===
# n_observations: 150
# current_value: 101.234
# process_mean: 100.567
# process_std: 1.234
# target: 100.0
# n_alerts_total: 45
# n_alerts_last_10: 8
# alert_methods: ['EWMA', 'CUSUM', 'Western Electric']
Upon completing this chapter, you should be able to explain and implement the following:
Q1: If an X-bar chart has upper control limit (UCL) of 80.5 and center line (CL) of 79.0, what is the lower control limit (LCL)?
Answer: 77.5
Explanation:
UCL = CL + 3σ
LCL = CL - 3σ
UCL - CL = 80.5 - 79.0 = 1.5 = 3σ
Therefore, LCL = CL - 1.5 = 79.0 - 1.5 = 77.5
Q2: For a process with average defect rate of 2% and sample size of 100, calculate the upper control limit (UCL) for a P-chart.
Answer: UCL = 0.062 (6.2%)
Calculation:
p̄ = 0.02
n = 100
σ_p = √(p̄(1-p̄)/n) = √(0.02 × 0.98 / 100) = √0.000196 = 0.014
UCL = p̄ + 3σ_p = 0.02 + 3 × 0.014 = 0.02 + 0.042 = 0.062
Key Point: Larger sample sizes bring control limits closer to the center line (improved detection power).
Q3: Both CUSUM and EWMA excel at detecting small shifts, but compare their strengths and weaknesses and explain which should be chosen in what situations.
Comparison Table:
| Item | CUSUM | EWMA |
|---|---|---|
| Detection Speed | Very fast (excellent ARL) | Fast |
| Parameter Setting | Somewhat difficult (k, h) | Relatively easy (λ) |
| Ease of Interpretation | Difficult (cumulative statistic) | Easy (smoothed value) |
| Shift Direction Detection | Separate for up/down (clear) | Single statistic |
| Implementation Complexity | Moderate | Simple |
Selection Criteria:
Best Practice: For critical processes, using both methods concurrently for complementary monitoring is recommended.