This chapter covers Statistical Process Control (SPC). You will learn advanced SPC methods (CUSUM and control chart out-of-control detection rules.
Learning Objectives
After reading this chapter, you will be able to:
- ✅ Understand the fundamental theory of Statistical Process Control (SPC)
- ✅ Create and interpret Shewhart control charts (X̄-R, I-MR)
- ✅ Calculate and evaluate process capability indices (Cp, Cpk)
- ✅ Implement advanced SPC methods (CUSUM, EWMA, Hotelling's T²)
- ✅ Apply control chart out-of-control detection rules
2.1 Fundamentals of Statistical Process Control (SPC)
What is SPC
Statistical Process Control (SPC) is a methodology that uses statistical techniques to monitor process variation, detect anomalies early, and maintain processes in a state of statistical control.
Main Objectives of SPC:
- Verify Process Stability: Confirm the process is in a state of statistical control
- Early Anomaly Detection: Rapidly detect variation caused by special causes
- Evaluate Process Capability: Quantitatively assess the ability to meet specifications
- Continuous Improvement: Promote data-driven improvement activities
- Preventive Maintenance: Address issues before they occur
Two Types of Variation
| Variation Type | Alternative Name | Characteristics | Example Causes | Response |
|---|---|---|---|---|
| Common Cause | Within-system variation | Predictable, always present | Measurement error, raw material variation, environmental fluctuation | System improvement |
| Special Cause | Outside-system variation | Unpredictable, sporadic | Equipment failure, operator error, abnormal raw material | Immediate action |
Basic Structure of Control Charts
Basic Elements of Control Charts:
- Center Line (CL): Process average value
- Upper Control Limit (UCL): CL + 3σ
- Lower Control Limit (LCL): CL - 3σ
- Data Points: Statistics plotted in time series
2.2 Code Examples: Control Charts and Process Capability Analysis
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
"""
Example: 2.2 Code Examples: Control Charts and Process Capability Ana
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 5-15 seconds
Dependencies: None
"""
<h4>Code Example 1: Implementation of X̄-R Control Chart (Mean and Range Chart)</h4>
<p><strong>Purpose</strong>: Create X̄-R control chart from subgroup data and evaluate process stability.</p>
<pre><code class="language-python">import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# Font settings for Japanese characters (if needed)
plt.rcParams['font.sans-serif'] = ['Hiragino Sans', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
np.random.seed(42)
# Generate simulation data
# 25 subgroups, 5 samples per subgroup
n_subgroups = 25
subgroup_size = 5
# Process data (product weight, target: 100g)
data = []
for i in range(n_subgroups):
# Normal process (first 20 subgroups)
if i < 20:
subgroup = np.random.normal(100, 2, subgroup_size)
# Process mean shifts by 2g (last 5 subgroups)
else:
subgroup = np.random.normal(102, 2, subgroup_size)
data.append(subgroup)
data = np.array(data)
# Calculate statistics
xbar = np.mean(data, axis=1) # Subgroup mean
R = np.max(data, axis=1) - np.min(data, axis=1) # Subgroup range
# Control chart constants (for n=5)
# These constants are statistically derived values
A2 = 0.577 # For X̄ chart
D3 = 0.0 # For R chart lower limit
D4 = 2.115 # For R chart upper limit
# Control limits for X̄ chart
xbar_center = np.mean(xbar)
R_bar = np.mean(R)
xbar_UCL = xbar_center + A2 * R_bar
xbar_LCL = xbar_center - A2 * R_bar
# Control limits for R chart
R_center = R_bar
R_UCL = D4 * R_bar
R_LCL = D3 * R_bar
# Visualization
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
# X̄ chart
axes[0].plot(range(1, n_subgroups + 1), xbar, 'o-', color='#11998e',
markersize=6, linewidth=1.5, label='Subgroup Mean')
axes[0].axhline(y=xbar_center, color='blue', linestyle='-', linewidth=2, label=f'Center Line (CL={xbar_center:.2f})')
axes[0].axhline(y=xbar_UCL, color='red', linestyle='--', linewidth=2, label=f'UCL={xbar_UCL:.2f}')
axes[0].axhline(y=xbar_LCL, color='red', linestyle='--', linewidth=2, label=f'LCL={xbar_LCL:.2f}')
# Highlight out-of-control points
out_of_control = (xbar > xbar_UCL) | (xbar < xbar_LCL)
if out_of_control.any():
axes[0].scatter(np.where(out_of_control)[0] + 1, xbar[out_of_control],
color='red', s=100, marker='o', zorder=5, label='Out of Control')
axes[0].set_xlabel('Subgroup Number', fontsize=12)
axes[0].set_ylabel('Mean X̄ (g)', fontsize=12)
axes[0].set_title('X̄ Control Chart (Mean Chart)', fontsize=14, fontweight='bold')
axes[0].legend(loc='upper left')
axes[0].grid(alpha=0.3)
# R chart
axes[1].plot(range(1, n_subgroups + 1), R, 'o-', color='#f59e0b',
markersize=6, linewidth=1.5, label='Subgroup Range')
axes[1].axhline(y=R_center, color='blue', linestyle='-', linewidth=2, label=f'Center Line (CL={R_center:.2f})')
axes[1].axhline(y=R_UCL, color='red', linestyle='--', linewidth=2, label=f'UCL={R_UCL:.2f}')
if R_LCL > 0:
axes[1].axhline(y=R_LCL, color='red', linestyle='--', linewidth=2, label=f'LCL={R_LCL:.2f}')
# Highlight out-of-control points
out_of_control_R = (R > R_UCL) | (R < R_LCL)
if out_of_control_R.any():
axes[1].scatter(np.where(out_of_control_R)[0] + 1, R[out_of_control_R],
color='red', s=100, marker='o', zorder=5, label='Out of Control')
axes[1].set_xlabel('Subgroup Number', fontsize=12)
axes[1].set_ylabel('Range R (g)', fontsize=12)
axes[1].set_title('R Control Chart (Range Chart)', fontsize=14, fontweight='bold')
axes[1].legend(loc='upper left')
axes[1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
# Statistical summary
print("=== X̄-R Control Chart Statistical Summary ===")
print(f"\n【X̄ Chart】")
print(f" Center Line (CL): {xbar_center:.3f} g")
print(f" Upper Control Limit (UCL): {xbar_UCL:.3f} g")
print(f" Lower Control Limit (LCL): {xbar_LCL:.3f} g")
print(f" Out-of-control points: {out_of_control.sum()}/{n_subgroups}")
print(f"\n【R Chart】")
print(f" Center Line (CL): {R_center:.3f} g")
print(f" Upper Control Limit (UCL): {R_UCL:.3f} g")
print(f" Lower Control Limit (LCL): {R_LCL:.3f} g")
print(f" Out-of-control points: {out_of_control_R.sum()}/{n_subgroups}")
# Judgment
if out_of_control.any():
print(f"\n⚠️ Warning: Process mean anomaly detected at subgroup {np.where(out_of_control)[0] + 1}")
if out_of_control_R.any():
print(f"⚠️ Warning: Process variation anomaly detected at subgroup {np.where(out_of_control_R)[0] + 1}")
if not (out_of_control.any() or out_of_control_R.any()):
print("\n✅ Process is in statistical control")
Expected Output:
=== X̄-R Control Chart Statistical Summary ===
【X̄ Chart】
Center Line (CL): 100.405 g
Upper Control Limit (UCL): 103.088 g
Lower Control Limit (LCL): 97.722 g
Out-of-control points: 3/25
【R Chart】
Center Line (CL): 4.651 g
Upper Control Limit (UCL): 9.836 g
Lower Control Limit (LCL): 0.000 g
Out-of-control points: 0/25
⚠️ Warning: Process mean anomaly detected at subgroup [22 23 25]
Explanation: The X̄-R control chart simultaneously monitors process mean (X̄ chart) and process variation (R chart) of subgroup data. In this example, the process mean shifts by 2g after subgroup 21, which the X̄ chart detects. X̄ and R charts are always used as a pair, and the process is judged "stable" only when both are in control.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
"""
Example: Explanation: The X̄-R control chart simultaneously monitors
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""
<h4>Code Example 2: I-MR Control Chart (Individual and Moving Range Chart)</h4>
<p><strong>Purpose</strong>: Create I-MR control chart from individual measurements (when subgrouping is not possible).</p>
<pre><code class="language-python">import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# Generate individual measurement data (1 measurement per day, 30 days)
n_observations = 30
individual_values = np.random.normal(50, 3, n_observations)
# Process variation increases after day 15
individual_values[15:] += np.random.normal(0, 5, n_observations - 15)
# Calculate Moving Range
# mR = |X_i - X_{i-1}|
moving_range = np.abs(np.diff(individual_values))
# Calculate statistics
I_bar = np.mean(individual_values)
mR_bar = np.mean(moving_range)
# Control chart constants (moving range n=2)
d2 = 1.128 # Expected value constant for moving range
E2 = 2.66 # For I chart
D3 = 0.0 # For mR chart lower limit
D4 = 3.267 # For mR chart upper limit
# Control limits for I chart
I_UCL = I_bar + E2 * mR_bar
I_LCL = I_bar - E2 * mR_bar
# Control limits for mR chart
mR_center = mR_bar
mR_UCL = D4 * mR_bar
mR_LCL = D3 * mR_bar
# Visualization
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
# I chart (Individual chart)
axes[0].plot(range(1, n_observations + 1), individual_values, 'o-',
color='#11998e', markersize=6, linewidth=1.5, label='Individual Values')
axes[0].axhline(y=I_bar, color='blue', linestyle='-', linewidth=2,
label=f'Center Line (CL={I_bar:.2f})')
axes[0].axhline(y=I_UCL, color='red', linestyle='--', linewidth=2,
label=f'UCL={I_UCL:.2f}')
axes[0].axhline(y=I_LCL, color='red', linestyle='--', linewidth=2,
label=f'LCL={I_LCL:.2f}')
# Detect out-of-control points
out_of_control_I = (individual_values > I_UCL) | (individual_values < I_LCL)
if out_of_control_I.any():
axes[0].scatter(np.where(out_of_control_I)[0] + 1,
individual_values[out_of_control_I],
color='red', s=100, marker='o', zorder=5, label='Out of Control')
axes[0].set_xlabel('Observation Number', fontsize=12)
axes[0].set_ylabel('Measurement (units)', fontsize=12)
axes[0].set_title('I Chart (Individual Chart)', fontsize=14, fontweight='bold')
axes[0].legend(loc='upper left')
axes[0].grid(alpha=0.3)
# mR chart (Moving Range chart)
axes[1].plot(range(2, n_observations + 1), moving_range, 'o-',
color='#f59e0b', markersize=6, linewidth=1.5, label='Moving Range')
axes[1].axhline(y=mR_center, color='blue', linestyle='-', linewidth=2,
label=f'Center Line (CL={mR_center:.2f})')
axes[1].axhline(y=mR_UCL, color='red', linestyle='--', linewidth=2,
label=f'UCL={mR_UCL:.2f}')
if mR_LCL > 0:
axes[1].axhline(y=mR_LCL, color='red', linestyle='--', linewidth=2,
label=f'LCL={mR_LCL:.2f}')
# Detect out-of-control points
out_of_control_mR = (moving_range > mR_UCL) | (moving_range < mR_LCL)
if out_of_control_mR.any():
axes[1].scatter(np.where(out_of_control_mR)[0] + 2,
moving_range[out_of_control_mR],
color='red', s=100, marker='o', zorder=5, label='Out of Control')
axes[1].set_xlabel('Observation Number', fontsize=12)
axes[1].set_ylabel('Moving Range mR (units)', fontsize=12)
axes[1].set_title('mR Chart (Moving Range Chart)', fontsize=14, fontweight='bold')
axes[1].legend(loc='upper left')
axes[1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("=== I-MR Control Chart Statistical Summary ===")
print(f"\n【I Chart】")
print(f" Center Line (CL): {I_bar:.3f}")
print(f" Upper Control Limit (UCL): {I_UCL:.3f}")
print(f" Lower Control Limit (LCL): {I_LCL:.3f}")
print(f" Out-of-control points: {out_of_control_I.sum()}/{n_observations}")
print(f"\n【mR Chart】")
print(f" Center Line (CL): {mR_center:.3f}")
print(f" Upper Control Limit (UCL): {mR_UCL:.3f}")
print(f" Lower Control Limit (LCL): {mR_LCL:.3f}")
print(f" Out-of-control points: {out_of_control_mR.sum()}/{len(moving_range)}")
if out_of_control_mR.any():
print(f"\n⚠️ Warning: Increased process variation detected (observation {np.where(out_of_control_mR)[0] + 2})")
Explanation: I-MR control chart is used for individual measurements that cannot be subgrouped (e.g., once-daily inspection, batch processes, expensive destructive testing). The moving range (absolute difference between consecutive measurements) is used to estimate process variation. In this example, process variation increases after day 15, which the mR chart detects.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - scipy>=1.11.0
"""
Example: Explanation: I-MR control chart is used for individual measu
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""
<h4>Code Example 3: Calculation and Interpretation of Process Capability Indices (Cp, Cpk)</h4>
<p><strong>Purpose</strong>: Calculate process capability indices and evaluate the process's ability to meet specifications.</p>
<pre><code class="language-python">import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
np.random.seed(42)
# Generate process data (product dimension, unit: mm)
n_samples = 200
process_data = np.random.normal(10.0, 0.15, n_samples)
# Specification limits
USL = 10.5 # Upper Specification Limit
LSL = 9.5 # Lower Specification Limit
target = 10.0 # Target value
# Calculate process statistics
process_mean = np.mean(process_data)
process_std = np.std(process_data, ddof=1) # Unbiased standard deviation
# Calculate process capability indices
# Cp: Process Capability
Cp = (USL - LSL) / (6 * process_std)
# Cpk: Process Capability (considering process mean offset)
Cpk_upper = (USL - process_mean) / (3 * process_std)
Cpk_lower = (process_mean - LSL) / (3 * process_std)
Cpk = min(Cpk_upper, Cpk_lower)
# Cpm: Process capability considering offset from target
Cpm = (USL - LSL) / (6 * np.sqrt(process_std**2 + (process_mean - target)**2))
# Calculate out-of-spec rate (theoretical)
# Assuming normal distribution
ppm_below_LSL = stats.norm.cdf(LSL, loc=process_mean, scale=process_std) * 1e6
ppm_above_USL = (1 - stats.norm.cdf(USL, loc=process_mean, scale=process_std)) * 1e6
total_ppm = ppm_below_LSL + ppm_above_USL
# Actual out-of-spec count (sample data)
actual_out_of_spec = np.sum((process_data < LSL) | (process_data > USL))
# Visualization
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
# Histogram and specification limits
axes[0].hist(process_data, bins=30, density=True, alpha=0.6, color='#11998e',
edgecolor='black', label='Process Data')
# Theoretical normal distribution curve
x = np.linspace(process_data.min(), process_data.max(), 100)
axes[0].plot(x, stats.norm.pdf(x, loc=process_mean, scale=process_std),
'r-', linewidth=2, label='Normal Distribution (Theoretical)')
# Specification limits and target
axes[0].axvline(x=LSL, color='red', linestyle='--', linewidth=2, label=f'LSL = {LSL}')
axes[0].axvline(x=USL, color='red', linestyle='--', linewidth=2, label=f'USL = {USL}')
axes[0].axvline(x=target, color='green', linestyle='--', linewidth=2, label=f'Target = {target}')
axes[0].axvline(x=process_mean, color='blue', linestyle='-', linewidth=2,
label=f'Process Mean = {process_mean:.3f}')
axes[0].set_xlabel('Measurement (mm)', fontsize=12)
axes[0].set_ylabel('Probability Density', fontsize=12)
axes[0].set_title('Process Distribution and Specification Limits', fontsize=14, fontweight='bold')
axes[0].legend(fontsize=9)
axes[0].grid(alpha=0.3)
# Visualization of process capability indices
capability_indices = {
'Cp': Cp,
'Cpk': Cpk,
'Cpm': Cpm
}
colors = ['#11998e', '#f59e0b', '#7b2cbf']
bars = axes[1].bar(capability_indices.keys(), capability_indices.values(),
color=colors, alpha=0.7, edgecolor='black')
# Capability criterion lines
axes[1].axhline(y=1.33, color='orange', linestyle='--', linewidth=2,
label='Good Criterion (1.33)')
axes[1].axhline(y=1.67, color='green', linestyle='--', linewidth=2,
label='Excellent Criterion (1.67)')
# Display values on each bar
for bar in bars:
height = bar.get_height()
axes[1].text(bar.get_x() + bar.get_width()/2., height,
f'{height:.3f}',
ha='center', va='bottom', fontsize=12, fontweight='bold')
axes[1].set_ylabel('Capability Index', fontsize=12)
axes[1].set_title('Comparison of Process Capability Indices', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3, axis='y')
axes[1].set_ylim(0, max(capability_indices.values()) * 1.2)
plt.tight_layout()
plt.show()
# Results summary
print("=== Process Capability Analysis ===")
print(f"\n【Process Statistics】")
print(f" Process Mean: {process_mean:.4f} mm")
print(f" Process Standard Deviation: {process_std:.4f} mm")
print(f" Sample Size: {n_samples}")
print(f"\n【Specification Information】")
print(f" Upper Specification Limit (USL): {USL} mm")
print(f" Lower Specification Limit (LSL): {LSL} mm")
print(f" Target: {target} mm")
print(f" Specification Width: {USL - LSL} mm")
print(f"\n【Process Capability Indices】")
print(f" Cp = {Cp:.3f} (Ratio of spec width to process width)")
print(f" Cpk = {Cpk:.3f} (Considering process mean offset)")
print(f" Cpm = {Cpm:.3f} (Considering offset from target)")
print(f"\n【Out-of-Spec Rate (Theoretical)】")
print(f" Below LSL: {ppm_below_LSL:.2f} ppm")
print(f" Above USL: {ppm_above_USL:.2f} ppm")
print(f" Total Out-of-Spec: {total_ppm:.2f} ppm ({total_ppm/1e4:.4f}%)")
print(f"\n【Actual Out-of-Spec Count】")
print(f" Out-of-spec samples: {actual_out_of_spec}/{n_samples} ({actual_out_of_spec/n_samples*100:.2f}%)")
# Capability judgment
print(f"\n【Process Capability Judgment】")
if Cpk >= 1.67:
print(" ✅ Excellent: Process has sufficient capability (Cpk ≥ 1.67)")
elif Cpk >= 1.33:
print(" ✅ Good: Process has adequate capability (Cpk ≥ 1.33)")
elif Cpk >= 1.0:
print(" ⚠️ Minimum: Process capability is minimal (Cpk ≥ 1.0), improvement recommended")
else:
print(" ❌ Insufficient: Process capability is inadequate (Cpk < 1.0), urgent improvement needed")
Explanation: Process capability indices quantitatively evaluate the process's ability to meet specifications. Cp considers only process variation, while Cpk also considers process mean offset. Generally, Cpk ≥ 1.33 is considered "good" and Cpk ≥ 1.67 is "excellent". When Cpk is less than 1.0, there is high probability of producing out-of-spec products, requiring process improvement.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
"""
Example: Explanation: Process capability indices quantitatively evalu
Purpose: Demonstrate data visualization techniques
Target: Beginner to Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""
<h4>Code Example 4: Implementation of CUSUM Control Chart (Cumulative Sum Chart)</h4>
<p><strong>Purpose</strong>: Detect small process mean shifts using CUSUM control chart.</p>
<pre><code class="language-python">import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# Generate process data
n_samples = 100
target_mean = 50.0
process_std = 2.0
# Generate data (process mean shifts by 0.5σ from sample 40)
data = np.zeros(n_samples)
data[:40] = np.random.normal(target_mean, process_std, 40)
data[40:] = np.random.normal(target_mean + 0.5 * process_std, process_std, 60)
# CUSUM parameters
# K: Reference value (half of shift to detect)
# H: Decision interval (threshold for alarm)
shift_to_detect = 0.5 * process_std # Detect 0.5σ shift
K = shift_to_detect / 2
H = 5 * process_std # Typically 4σ~5σ
# CUSUM calculation
# Upper CUSUM (detect increase in process mean)
Cp = np.zeros(n_samples)
# Lower CUSUM (detect decrease in process mean)
Cn = np.zeros(n_samples)
for i in range(1, n_samples):
Cp[i] = max(0, Cp[i-1] + (data[i] - target_mean) - K)
Cn[i] = max(0, Cn[i-1] - (data[i] - target_mean) - K)
# Alarm detection
alarm_high = Cp > H
alarm_low = Cn > H
# Visualization
fig, axes = plt.subplots(3, 1, figsize=(14, 12))
# Plot original data
axes[0].plot(range(n_samples), data, 'o-', color='#11998e',
markersize=4, linewidth=1, alpha=0.7, label='Measurements')
axes[0].axhline(y=target_mean, color='blue', linestyle='-', linewidth=2,
label=f'Target = {target_mean}')
axes[0].axvline(x=40, color='red', linestyle='--', alpha=0.5,
label='Process Change Point')
axes[0].set_ylabel('Measurement', fontsize=11)
axes[0].set_title('Process Data (0.5σ shift from sample 40)',
fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)
# Upper CUSUM
axes[1].plot(range(n_samples), Cp, 'o-', color='#f59e0b',
markersize=4, linewidth=1.5, label='Upper CUSUM (C+)')
axes[1].axhline(y=H, color='red', linestyle='--', linewidth=2,
label=f'Decision Interval H = {H:.1f}')
axes[1].axhline(y=0, color='gray', linestyle='-', linewidth=1)
if alarm_high.any():
first_alarm = np.where(alarm_high)[0][0]
axes[1].scatter(np.where(alarm_high)[0], Cp[alarm_high],
color='red', s=80, marker='o', zorder=5, label='Alarm')
axes[1].axvline(x=first_alarm, color='red', linestyle=':', alpha=0.7)
axes[1].set_ylabel('CUSUM C+', fontsize=11)
axes[1].set_title('Upper CUSUM Chart (Detect Process Mean Increase)',
fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
# Lower CUSUM
axes[2].plot(range(n_samples), Cn, 'o-', color='#7b2cbf',
markersize=4, linewidth=1.5, label='Lower CUSUM (C-)')
axes[2].axhline(y=H, color='red', linestyle='--', linewidth=2,
label=f'Decision Interval H = {H:.1f}')
axes[2].axhline(y=0, color='gray', linestyle='-', linewidth=1)
if alarm_low.any():
first_alarm_low = np.where(alarm_low)[0][0]
axes[2].scatter(np.where(alarm_low)[0], Cn[alarm_low],
color='red', s=80, marker='o', zorder=5, label='Alarm')
axes[2].axvline(x=first_alarm_low, color='red', linestyle=':', alpha=0.7)
axes[2].set_xlabel('Sample Number', fontsize=11)
axes[2].set_ylabel('CUSUM C-', fontsize=11)
axes[2].set_title('Lower CUSUM Chart (Detect Process Mean Decrease)',
fontsize=13, fontweight='bold')
axes[2].legend()
axes[2].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("=== CUSUM Chart Analysis Results ===")
print(f"\nParameter Settings:")
print(f" Target (μ₀): {target_mean}")
print(f" Standard Deviation (σ): {process_std}")
print(f" Reference Value (K): {K:.2f} (half of detected shift)")
print(f" Decision Interval (H): {H:.2f}")
print(f"\nProcess Statistics:")
print(f" First half (1-40) mean: {np.mean(data[:40]):.2f}")
print(f" Second half (41-100) mean: {np.mean(data[40:]):.2f}")
print(f" Actual shift: {np.mean(data[40:]) - np.mean(data[:40]):.2f}")
if alarm_high.any():
first_alarm = np.where(alarm_high)[0][0]
detection_delay = first_alarm - 40
print(f"\n✅ Upper alarm detected:")
print(f" First alarm: Sample {first_alarm + 1}")
print(f" Detection delay: {detection_delay} samples")
else:
print("\n❌ Upper alarm: Not detected")
if alarm_low.any():
first_alarm_low = np.where(alarm_low)[0][0]
print(f"\n✅ Lower alarm detected: Sample {first_alarm_low + 1}")
else:
print("\n❌ Lower alarm: Not detected")
Explanation: CUSUM (Cumulative Sum) control chart efficiently detects small process mean shifts (0.5σ~2σ) that are difficult to detect with Shewhart charts. By calculating cumulative sums, it amplifies persistent small changes for detection. This example shows that a 0.5σ shift from sample 40 can be detected earlier than with conventional X̄ charts.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
"""
Example: Explanation: CUSUM (Cumulative Sum) control chart efficientl
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""
<h4>Code Example 5: Implementation of EWMA Control Chart (Exponentially Weighted Moving Average Chart)</h4>
<p><strong>Purpose</strong>: Detect anomalies using time-series weighting with EWMA control chart.</p>
<pre><code class="language-python">import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# Generate process data
n_samples = 100
target_mean = 100.0
process_std = 3.0
# Generate data (process mean shifts by 1σ from sample 50)
data = np.zeros(n_samples)
data[:50] = np.random.normal(target_mean, process_std, 50)
data[50:] = np.random.normal(target_mean + 1.0 * process_std, process_std, 50)
# EWMA parameters
# λ (lambda): Weighting constant (0 < λ ≤ 1)
# Small λ: Emphasize past data (sensitive to small changes)
# Large λ: Emphasize current data (sensitive to large changes)
lambda_param = 0.2
L = 3 # Control limit coefficient (typically 2.7~3)
# EWMA calculation
z = np.zeros(n_samples)
z[0] = target_mean # Initial value is target
for i in range(1, n_samples):
z[i] = lambda_param * data[i] + (1 - lambda_param) * z[i-1]
# Calculate control limits
# Control limits at each time point (converges over time)
ucl = np.zeros(n_samples)
lcl = np.zeros(n_samples)
for i in range(n_samples):
# Standard deviation of control limits
std_z = process_std * np.sqrt(lambda_param / (2 - lambda_param) *
(1 - (1 - lambda_param)**(2 * (i + 1))))
ucl[i] = target_mean + L * std_z
lcl[i] = target_mean - L * std_z
# Alarm detection
alarm = (z > ucl) | (z < lcl)
# Visualization
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
# Original data
axes[0].plot(range(n_samples), data, 'o', color='lightgray',
markersize=4, alpha=0.5, label='Individual Measurements')
axes[0].axhline(y=target_mean, color='blue', linestyle='-', linewidth=2,
label=f'Target = {target_mean}')
axes[0].axvline(x=50, color='red', linestyle='--', alpha=0.5,
label='Process Change Point')
axes[0].set_ylabel('Measurement', fontsize=11)
axes[0].set_title('Process Data (1σ shift from sample 50)',
fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)
# EWMA chart
axes[1].plot(range(n_samples), z, 'o-', color='#11998e',
markersize=5, linewidth=1.5, label=f'EWMA (λ={lambda_param})')
axes[1].plot(range(n_samples), ucl, 'r--', linewidth=2, label=f'UCL (L={L})')
axes[1].plot(range(n_samples), lcl, 'r--', linewidth=2, label=f'LCL (L={L})')
axes[1].axhline(y=target_mean, color='blue', linestyle='-', linewidth=2,
label=f'Center Line = {target_mean}')
# Highlight alarm points
if alarm.any():
first_alarm = np.where(alarm)[0][0]
axes[1].scatter(np.where(alarm)[0], z[alarm],
color='red', s=100, marker='o', zorder=5, label='Alarm')
axes[1].axvline(x=first_alarm, color='red', linestyle=':', alpha=0.7)
axes[1].set_xlabel('Sample Number', fontsize=11)
axes[1].set_ylabel('EWMA Statistic', fontsize=11)
axes[1].set_title('EWMA Control Chart', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("=== EWMA Chart Analysis Results ===")
print(f"\nParameter Settings:")
print(f" Target (μ₀): {target_mean}")
print(f" Standard Deviation (σ): {process_std}")
print(f" Weighting Constant (λ): {lambda_param}")
print(f" Control Limit Coefficient (L): {L}")
print(f"\nProcess Statistics:")
print(f" First half (1-50) mean: {np.mean(data[:50]):.2f}")
print(f" Second half (51-100) mean: {np.mean(data[50:]):.2f}")
print(f" Actual shift: {np.mean(data[50:]) - target_mean:.2f}")
print(f"\nEWMA Statistics:")
print(f" First half EWMA mean: {np.mean(z[:50]):.2f}")
print(f" Second half EWMA mean: {np.mean(z[50:]):.2f}")
if alarm.any():
first_alarm = np.where(alarm)[0][0]
detection_delay = first_alarm - 50
print(f"\n✅ Alarm detected:")
print(f" First alarm: Sample {first_alarm + 1}")
print(f" Detection delay: {detection_delay} samples")
print(f" Total alarms: {alarm.sum()}")
else:
print("\n❌ Alarm: Not detected")
# λ parameter influence comparison
print(f"\n【λ Parameter Influence】")
print(f" λ = 0.1 : Emphasize past 20 samples (sensitive to small shifts)")
print(f" λ = 0.2 : Emphasize past 10 samples (balanced) ← Current")
print(f" λ = 0.5 : Emphasize past 4 samples (sensitive to large shifts)")
print(f" λ = 1.0 : Current sample only (equivalent to Shewhart chart)")
Explanation: EWMA (Exponentially Weighted Moving Average) control chart calculates the average by assigning exponentially decaying weights to past data. The λ parameter adjusts sensitivity to small shifts. Smaller λ values excel at detecting small shifts, while larger λ values excel at detecting large shifts. Generally, λ=0.2 is recommended.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - scipy>=1.11.0
"""
Example: Explanation: EWMA (Exponentially Weighted Moving Average) co
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""
<h4>Code Example 6: Hotelling's T² Control Chart (Multivariate Control Chart)</h4>
<p><strong>Purpose</strong>: Implement multivariate control chart for simultaneous monitoring of multiple variables.</p>
<pre><code class="language-python">import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
np.random.seed(42)
# Generate 2-variable multivariate process data
n_samples = 100
# Two correlated variables (temperature and pressure)
mean_vector = np.array([100, 50]) # Temperature 100°C, Pressure 50kPa
# Covariance matrix (positive correlation)
cov_matrix = np.array([[9, 4],
[4, 4]])
# Normal data (first 70 samples)
data_normal = np.random.multivariate_normal(mean_vector, cov_matrix, 70)
# Abnormal data (last 30 samples: temperature increases)
mean_abnormal = np.array([105, 51])
data_abnormal = np.random.multivariate_normal(mean_abnormal, cov_matrix, 30)
# Combine data
data = np.vstack([data_normal, data_abnormal])
# Calculate Hotelling's T² statistic
# T² = (x - μ)ᵀ Σ⁻¹ (x - μ)
# where μ is mean vector, Σ is covariance matrix
# Estimate mean and covariance from normal data (first 70 samples)
mean_est = np.mean(data_normal, axis=0)
cov_est = np.cov(data_normal, rowvar=False)
cov_inv = np.linalg.inv(cov_est)
# Calculate T² statistic for each sample
T2_values = np.zeros(n_samples)
for i in range(n_samples):
diff = data[i] - mean_est
T2_values[i] = diff @ cov_inv @ diff
# Calculate control limit
# Phase I (historical data evaluation) control limit
p = 2 # Number of variables
m = 70 # Number of normal data samples
alpha = 0.01 # Significance level
# Control limit based on beta distribution
UCL = (p * (m + 1) * (m - 1) / (m * (m - p))) * \
stats.f.ppf(1 - alpha, p, m - p)
# Alarm detection
alarm = T2_values > UCL
# Visualization
fig = plt.figure(figsize=(16, 10))
# 2-variable scatter plot
ax1 = plt.subplot(2, 2, 1)
ax1.scatter(data_normal[:, 0], data_normal[:, 1], c='#11998e', s=40,
alpha=0.6, label='Normal Data (1-70)')
ax1.scatter(data_abnormal[:, 0], data_abnormal[:, 1], c='orange', s=40,
alpha=0.6, label='Abnormal Data (71-100)')
# Draw control limit ellipse
from matplotlib.patches import Ellipse
# Ellipse for T²=UCL
eigenvalues, eigenvectors = np.linalg.eig(cov_est)
angle = np.arctan2(eigenvectors[1, 0], eigenvectors[0, 0])
width, height = 2 * np.sqrt(UCL * eigenvalues)
ellipse = Ellipse(mean_est, width, height, angle=np.degrees(angle),
facecolor='none', edgecolor='red', linewidth=2,
linestyle='--', label='Control Limit (T²=UCL)')
ax1.add_patch(ellipse)
ax1.scatter(mean_est[0], mean_est[1], c='blue', s=100, marker='x',
linewidths=3, label='Process Center')
ax1.set_xlabel('Temperature (°C)', fontsize=11)
ax1.set_ylabel('Pressure (kPa)', fontsize=11)
ax1.set_title('2-Variable Scatter Plot and Control Limit Ellipse', fontsize=13, fontweight='bold')
ax1.legend()
ax1.grid(alpha=0.3)
# T² control chart
ax2 = plt.subplot(2, 2, 2)
ax2.plot(range(1, n_samples + 1), T2_values, 'o-', color='#11998e',
markersize=5, linewidth=1.5, label='T² Statistic')
ax2.axhline(y=UCL, color='red', linestyle='--', linewidth=2,
label=f'UCL = {UCL:.2f}')
ax2.axvline(x=70, color='gray', linestyle='--', alpha=0.5,
label='Process Change Point')
# Highlight alarm points
if alarm.any():
ax2.scatter(np.where(alarm)[0] + 1, T2_values[alarm],
color='red', s=100, marker='o', zorder=5, label='Alarm')
ax2.set_xlabel('Sample Number', fontsize=11)
ax2.set_ylabel('T² Statistic', fontsize=11)
ax2.set_title('Hotelling\'s T² Control Chart', fontsize=13, fontweight='bold')
ax2.legend()
ax2.grid(alpha=0.3)
# Temperature time series
ax3 = plt.subplot(2, 2, 3)
ax3.plot(range(1, n_samples + 1), data[:, 0], 'o-', color='#f59e0b',
markersize=4, linewidth=1, label='Temperature')
ax3.axhline(y=mean_est[0], color='blue', linestyle='-', linewidth=2,
label=f'Mean = {mean_est[0]:.1f}°C')
ax3.axvline(x=70, color='gray', linestyle='--', alpha=0.5)
ax3.set_xlabel('Sample Number', fontsize=11)
ax3.set_ylabel('Temperature (°C)', fontsize=11)
ax3.set_title('Temperature Time Series', fontsize=13, fontweight='bold')
ax3.legend()
ax3.grid(alpha=0.3)
# Pressure time series
ax4 = plt.subplot(2, 2, 4)
ax4.plot(range(1, n_samples + 1), data[:, 1], 'o-', color='#7b2cbf',
markersize=4, linewidth=1, label='Pressure')
ax4.axhline(y=mean_est[1], color='blue', linestyle='-', linewidth=2,
label=f'Mean = {mean_est[1]:.1f}kPa')
ax4.axvline(x=70, color='gray', linestyle='--', alpha=0.5)
ax4.set_xlabel('Sample Number', fontsize=11)
ax4.set_ylabel('Pressure (kPa)', fontsize=11)
ax4.set_title('Pressure Time Series', fontsize=13, fontweight='bold')
ax4.legend()
ax4.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("=== Hotelling's T² Chart Analysis Results ===")
print(f"\nProcess Settings:")
print(f" Number of variables (p): {p}")
print(f" Sample size: {n_samples}")
print(f" Normal data samples: {m}")
print(f"\nProcess Statistics (estimated from normal data):")
print(f" Mean vector:")
print(f" Temperature: {mean_est[0]:.2f} °C")
print(f" Pressure: {mean_est[1]:.2f} kPa")
print(f" Covariance matrix:")
print(f"{cov_est}")
print(f" Correlation coefficient: {cov_est[0,1] / (np.sqrt(cov_est[0,0]) * np.sqrt(cov_est[1,1])):.3f}")
print(f"\nControl Limit:")
print(f" UCL (α={alpha}): {UCL:.2f}")
print(f"\nT² Statistics:")
print(f" Normal data mean T²: {np.mean(T2_values[:70]):.2f}")
print(f" Abnormal data mean T²: {np.mean(T2_values[70:]):.2f}")
if alarm.any():
first_alarm = np.where(alarm)[0][0]
detection_delay = first_alarm - 70 if first_alarm >= 70 else 0
print(f"\n✅ Alarm detected:")
print(f" First alarm: Sample {first_alarm + 1}")
if first_alarm >= 70:
print(f" Detection delay: {detection_delay} samples")
print(f" Total alarms: {alarm.sum()}/{n_samples}")
else:
print("\n❌ Alarm: Not detected")
Explanation: Hotelling's T² control chart is a multivariate control chart that simultaneously monitors multiple variables. It detects anomalies in multidimensional space while considering correlations between variables. It can detect anomalies from combinations of multiple variables that univariate control charts might miss. In process industries, it is useful for simultaneously monitoring correlated variables such as temperature, pressure, and flow rate.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
"""
Example: Explanation: Hotelling's T² control chart is a multivariate
Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""
<h4>Code Example 7: Implementation of Western Electric Rules (Abnormal Pattern Detection)</h4>
<p><strong>Purpose</strong>: Implement eight rules to detect abnormal patterns on control charts.</p>
<pre><code class="language-python">import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# Generate process data (including various abnormal patterns)
n_samples = 100
target_mean = 100
sigma = 3
data = np.random.normal(target_mean, sigma, n_samples)
# Insert intentional abnormal patterns
# Pattern 1: Out of control limits (sample 15)
data[15] = target_mean + 3.5 * sigma
# Pattern 2: 9 consecutive points on same side of center line (samples 30-38)
data[30:39] = np.random.normal(target_mean + 0.8 * sigma, sigma * 0.5, 9)
# Pattern 3: 6 consecutive points trending up (samples 50-55)
data[50:56] = target_mean + np.linspace(-sigma, 2*sigma, 6)
# Pattern 4: 14 consecutive points alternating up and down (samples 70-83)
for i in range(70, 84):
if i % 2 == 0:
data[i] = target_mean + sigma * 0.8
else:
data[i] = target_mean - sigma * 0.8
# Calculate control limits
UCL = target_mean + 3 * sigma
LCL = target_mean - 3 * sigma
UCL_2sigma = target_mean + 2 * sigma
LCL_2sigma = target_mean - 2 * sigma
UCL_1sigma = target_mean + 1 * sigma
LCL_1sigma = target_mean - 1 * sigma
# Implementation of Western Electric rules
def apply_western_electric_rules(data, mean, sigma):
"""
Apply Western Electric rules to detect abnormal patterns
Eight rules:
1. Point beyond control limits (beyond 3σ)
2. 9 consecutive points on same side of center line
3. 6 consecutive points trending up or down
4. 14 consecutive points alternating up and down
5. 2 out of 3 consecutive points beyond 2σ (same side of center line)
6. 4 out of 5 consecutive points beyond 1σ (same side of center line)
7. 15 consecutive points within 1σ (both sides of center line)
8. 8 consecutive points beyond 1σ (both sides of center line)
"""
n = len(data)
violations = {f'Rule{i}': [] for i in range(1, 9)}
UCL = mean + 3 * sigma
LCL = mean - 3 * sigma
for i in range(n):
# Rule 1: Beyond control limits
if data[i] > UCL or data[i] < LCL:
violations['Rule1'].append(i)
# Rule 2: 9 consecutive points on same side
if i >= 8:
if all(data[i-j] > mean for j in range(9)) or \
all(data[i-j] < mean for j in range(9)):
violations['Rule2'].append(i)
# Rule 3: 6 consecutive points trending
if i >= 5:
if all(data[i-j] < data[i-j-1] for j in range(5)) or \
all(data[i-j] > data[i-j-1] for j in range(5)):
violations['Rule3'].append(i)
# Rule 4: 14 consecutive points alternating
if i >= 13:
alternating = True
for j in range(13):
if (data[i-j] - data[i-j-1]) * (data[i-j-1] - data[i-j-2]) >= 0:
alternating = False
break
if alternating:
violations['Rule4'].append(i)
# Rule 5: 2 out of 3 consecutive points beyond 2σ (same side)
if i >= 2:
above_2sigma = sum(1 for j in range(3) if data[i-j] > mean + 2*sigma)
below_2sigma = sum(1 for j in range(3) if data[i-j] < mean - 2*sigma)
if above_2sigma >= 2 or below_2sigma >= 2:
violations['Rule5'].append(i)
# Rule 6: 4 out of 5 consecutive points beyond 1σ (same side)
if i >= 4:
above_1sigma = sum(1 for j in range(5) if data[i-j] > mean + 1*sigma)
below_1sigma = sum(1 for j in range(5) if data[i-j] < mean - 1*sigma)
if above_1sigma >= 4 or below_1sigma >= 4:
violations['Rule6'].append(i)
# Rule 7: 15 consecutive points within 1σ
if i >= 14:
if all(abs(data[i-j] - mean) < sigma for j in range(15)):
violations['Rule7'].append(i)
# Rule 8: 8 consecutive points beyond 1σ (both sides)
if i >= 7:
if all(abs(data[i-j] - mean) > sigma for j in range(8)):
violations['Rule8'].append(i)
return violations
# Apply rules
violations = apply_western_electric_rules(data, target_mean, sigma)
# Visualization
fig, ax = plt.subplots(figsize=(16, 8))
# Plot data
ax.plot(range(n_samples), data, 'o-', color='#11998e',
markersize=4, linewidth=1, alpha=0.6, label='Measurements')
# Control limits
ax.axhline(y=target_mean, color='blue', linestyle='-', linewidth=2,
label=f'Center Line = {target_mean}')
ax.axhline(y=UCL, color='red', linestyle='--', linewidth=2, label='±3σ (UCL/LCL)')
ax.axhline(y=LCL, color='red', linestyle='--', linewidth=2)
ax.axhline(y=UCL_2sigma, color='orange', linestyle='--', linewidth=1,
alpha=0.5, label='±2σ')
ax.axhline(y=LCL_2sigma, color='orange', linestyle='--', linewidth=1, alpha=0.5)
ax.axhline(y=UCL_1sigma, color='green', linestyle='--', linewidth=1,
alpha=0.5, label='±1σ')
ax.axhline(y=LCL_1sigma, color='green', linestyle='--', linewidth=1, alpha=0.5)
# Highlight abnormal points with color coding
colors = ['red', 'orange', 'purple', 'brown', 'pink', 'cyan', 'magenta', 'yellow']
for idx, (rule, indices) in enumerate(violations.items()):
if indices:
ax.scatter(indices, data[indices], color=colors[idx], s=80,
marker='o', zorder=5, label=f'{rule} Detected')
ax.set_xlabel('Sample Number', fontsize=12)
ax.set_ylabel('Measurement', fontsize=12)
ax.set_title('Abnormal Pattern Detection by Western Electric Rules',
fontsize=14, fontweight='bold')
ax.legend(loc='upper right', fontsize=9)
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# Results summary
print("=== Western Electric Rules Detection Results ===\n")
total_violations = 0
for rule, indices in violations.items():
if indices:
print(f"{rule}: {len(indices)} violations detected")
print(f" Positions: Sample {[i+1 for i in indices]}")
total_violations += len(indices)
else:
print(f"{rule}: No violations")
print(f"\nTotal violations: {total_violations}")
print(f"Violation rate: {total_violations/n_samples*100:.1f}%")
# Rule explanations
print("\n【Western Electric Rules Explanation】")
print("Rule 1: Point beyond control limits (beyond 3σ) → Extreme anomaly")
print("Rule 2: 9 consecutive points on same side → Process mean shift")
print("Rule 3: 6 consecutive points trending up/down → Trend")
print("Rule 4: 14 consecutive points alternating → Systematic variation")
print("Rule 5: 2 out of 3 points beyond 2σ → Moderate anomaly")
print("Rule 6: 4 out of 5 points beyond 1σ → Small shift")
print("Rule 7: 15 consecutive points within 1σ → Insufficient variation (possible data manipulation)")
print("Rule 8: 8 consecutive points beyond 1σ → Excessive variation or mixture of two distributions")
Explanation: Western Electric Rules (or Nelson Rules) are eight judgment rules that detect abnormal patterns even within control limits. By detecting patterns such as continuity, trends, and periodicity beyond just exceeding control limits, early detection of process anomalies becomes possible. These rules are widely used in actual process monitoring systems.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - pandas>=2.0.0, <2.2.0
<h4>Code Example 8: SPC Integrated System (Alarm Management System)</h4>
<p><strong>Purpose</strong>: Build a practical system that integrates multiple SPC methods and performs alarm generation and prioritization.</p>
<pre><code class="language-python">import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
np.random.seed(42)
class SPCMonitoringSystem:
"""Integrated SPC Monitoring System"""
def __init__(self, target_mean, target_std, spec_limits=None):
"""
Parameters:
-----------
target_mean : float
Target process mean
target_std : float
Target process standard deviation
spec_limits : tuple
Specification limits (LSL, USL)
"""
self.target_mean = target_mean
self.target_std = target_std
self.spec_limits = spec_limits
# Control limits
self.UCL = target_mean + 3 * target_std
self.LCL = target_mean - 3 * target_std
# Alarm log
self.alarm_log = []
def check_shewhart(self, value, sample_id):
"""Check using Shewhart control chart"""
if value > self.UCL:
self.alarm_log.append({
'sample_id': sample_id,
'timestamp': datetime.now(),
'method': 'Shewhart',
'severity': 'HIGH',
'message': f'Value {value:.2f} exceeds UCL {self.UCL:.2f}'
})
return False
elif value < self.LCL:
self.alarm_log.append({
'sample_id': sample_id,
'timestamp': datetime.now(),
'method': 'Shewhart',
'severity': 'HIGH',
'message': f'Value {value:.2f} below LCL {self.LCL:.2f}'
})
return False
return True
def check_specification(self, value, sample_id):
"""Specification limit check"""
if self.spec_limits is None:
return True
LSL, USL = self.spec_limits
if value > USL:
self.alarm_log.append({
'sample_id': sample_id,
'timestamp': datetime.now(),
'method': 'Specification',
'severity': 'CRITICAL',
'message': f'Out-of-spec: Value {value:.2f} exceeds USL {USL}'
})
return False
elif value < LSL:
self.alarm_log.append({
'sample_id': sample_id,
'timestamp': datetime.now(),
'method': 'Specification',
'severity': 'CRITICAL',
'message': f'Out-of-spec: Value {value:.2f} below LSL {LSL}'
})
return False
return True
def check_trend(self, recent_data, sample_id, window=6):
"""Trend detection (6 consecutive increasing/decreasing points)"""
if len(recent_data) < window:
return True
last_window = recent_data[-window:]
increasing = all(last_window[i] < last_window[i+1]
for i in range(len(last_window)-1))
decreasing = all(last_window[i] > last_window[i+1]
for i in range(len(last_window)-1))
if increasing or decreasing:
trend_type = 'increasing' if increasing else 'decreasing'
self.alarm_log.append({
'sample_id': sample_id,
'timestamp': datetime.now(),
'method': 'Trend',
'severity': 'MEDIUM',
'message': f'{window} consecutive {trend_type} trend detected'
})
return False
return True
def monitor_process(self, data):
"""Monitor entire process"""
results = []
recent_data = []
for i, value in enumerate(data):
sample_id = i + 1
recent_data.append(value)
# Execute each check
shewhart_ok = self.check_shewhart(value, sample_id)
spec_ok = self.check_specification(value, sample_id)
trend_ok = self.check_trend(recent_data, sample_id)
status = 'OK' if (shewhart_ok and spec_ok and trend_ok) else 'ALARM'
results.append(status)
return results
def get_alarm_summary(self):
"""Get alarm summary"""
if not self.alarm_log:
return "No alarms"
df_alarms = pd.DataFrame(self.alarm_log)
summary = df_alarms.groupby(['severity', 'method']).size().reset_index(name='count')
return summary
def generate_report(self):
"""Generate monitoring report"""
if not self.alarm_log:
print("✅ Process is operating normally. No alarms.")
return
print("=" * 70)
print("SPC Monitoring System - Alarm Report")
print("=" * 70)
# Count by severity
df_alarms = pd.DataFrame(self.alarm_log)
severity_counts = df_alarms['severity'].value_counts()
print(f"\n【Alarm Statistics】")
print(f"Total alarms: {len(self.alarm_log)}")
for severity, count in severity_counts.items():
print(f" {severity}: {count} cases")
# Count by method
print(f"\n【By Detection Method】")
method_counts = df_alarms['method'].value_counts()
for method, count in method_counts.items():
print(f" {method}: {count} cases")
# Recent alarms (up to 5)
print(f"\n【Latest Alarms (up to 5)】")
for alarm in self.alarm_log[-5:]:
print(f"\n Sample {alarm['sample_id']} | "
f"{alarm['severity']} | {alarm['method']}")
print(f" → {alarm['message']}")
print("\n" + "=" * 70)
# System demonstration
print("=== SPC Integrated Monitoring System Demonstration ===\n")
# Process settings
target_mean = 50.0
target_std = 2.0
LSL, USL = 44, 56 # Specification limits (±3σ)
# Initialize system
spc_system = SPCMonitoringSystem(target_mean, target_std,
spec_limits=(LSL, USL))
# Generate process data (100 samples)
n_samples = 100
data = np.random.normal(target_mean, target_std, n_samples)
# Insert intentional anomalies
data[30] = 58 # Out-of-spec (exceeds UCL)
data[60:66] = target_mean + np.linspace(0, 4, 6) # Trend
data[80] = 42 # Out-of-spec (below LCL)
# Execute process monitoring
print("Starting process monitoring...\n")
statuses = spc_system.monitor_process(data)
# Generate report
spc_system.generate_report()
# Visualization
fig, axes = plt.subplots(2, 1, figsize=(16, 10))
# Process trend
alarm_samples = [i for i, s in enumerate(statuses) if s == 'ALARM']
ok_samples = [i for i, s in enumerate(statuses) if s == 'OK']
axes[0].plot(ok_samples, data[ok_samples], 'o', color='#11998e',
markersize=5, label='Normal', alpha=0.6)
axes[0].plot(alarm_samples, data[alarm_samples], 'o', color='red',
markersize=8, label='Alarm', zorder=5)
# Control limits and specification limits
axes[0].axhline(y=target_mean, color='blue', linestyle='-', linewidth=2,
label='Target')
axes[0].axhline(y=spc_system.UCL, color='orange', linestyle='--', linewidth=2,
label='Control Limits (UCL/LCL)')
axes[0].axhline(y=spc_system.LCL, color='orange', linestyle='--', linewidth=2)
axes[0].axhline(y=USL, color='red', linestyle='--', linewidth=2,
label='Specification Limits (USL/LSL)')
axes[0].axhline(y=LSL, color='red', linestyle='--', linewidth=2)
axes[0].set_xlabel('Sample Number', fontsize=12)
axes[0].set_ylabel('Measurement', fontsize=12)
axes[0].set_title('SPC Integrated Monitoring - Process Trend', fontsize=14, fontweight='bold')
axes[0].legend(loc='upper left')
axes[0].grid(alpha=0.3)
# Alarm distribution
if spc_system.alarm_log:
df_alarms = pd.DataFrame(spc_system.alarm_log)
# Alarm count by severity
severity_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
severity_counts = df_alarms['severity'].value_counts()
severity_counts = severity_counts.reindex(severity_order, fill_value=0)
colors_severity = {'CRITICAL': 'red', 'HIGH': 'orange',
'MEDIUM': 'yellow', 'LOW': 'green'}
bar_colors = [colors_severity[s] for s in severity_counts.index]
bars = axes[1].bar(severity_counts.index, severity_counts.values,
color=bar_colors, alpha=0.7, edgecolor='black')
for bar in bars:
height = bar.get_height()
axes[1].text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}',
ha='center', va='bottom', fontsize=12, fontweight='bold')
axes[1].set_ylabel('Number of Alarms', fontsize=12)
axes[1].set_title('Alarm Distribution by Severity', fontsize=14, fontweight='bold')
axes[1].grid(alpha=0.3, axis='y')
plt.tight_layout()
plt.show()
# Process capability evaluation
process_mean = np.mean(data)
process_std = np.std(data, ddof=1)
Cp = (USL - LSL) / (6 * process_std)
Cpk = min((USL - process_mean) / (3 * process_std),
(process_mean - LSL) / (3 * process_std))
print(f"\n【Process Capability Evaluation】")
print(f" Cp = {Cp:.3f}")
print(f" Cpk = {Cpk:.3f}")
if Cpk >= 1.33:
print(" ✅ Process capability: Good")
elif Cpk >= 1.0:
print(" ⚠️ Process capability: Minimum (improvement recommended)")
else:
print(" ❌ Process capability: Insufficient (urgent improvement needed)")
Explanation: This integrated SPC system combines multiple detection methods (Shewhart control chart, specification limit check, trend detection) and classifies and manages alarms by severity. In actual process monitoring systems, this integrated approach enables early anomaly detection and appropriate response. Alarm prioritization (CRITICAL > HIGH > MEDIUM > LOW) allows operators to focus on critical anomalies.
2.3 Chapter Summary
What We Learned
- SPC Fundamental Theory
- Difference between common cause and special cause
- Basic structure of control charts (CL, UCL, LCL)
- Statistical basis of 3-sigma control limits
- Shewhart Control Charts
- X̄-R chart: Monitoring subgroup data
- I-MR chart: Monitoring individual measurements
- Control chart creation procedure and interpretation
- Process Capability Evaluation
- Cp: Evaluation of process variation
- Cpk: Evaluation considering process mean offset
- Cpm: Evaluation considering offset from target
- Capability index judgment criteria (1.33, 1.67)
- Advanced SPC Methods
- CUSUM: Early detection of small shifts
- EWMA: Monitoring with time-series weighting
- Hotelling's T²: Multivariate process monitoring
- Abnormal Pattern Detection
- Western Electric Rules (8 judgment rules)
- Anomaly detection in trends, periodicity, and variation
- Integrated alarm management system
Important Points
Control Chart Selection requires choosing the appropriate chart based on data characteristics such as subgrouping possibility and number of variables. Process Capability analysis demands understanding the difference between Cp and Cpk, with evaluation considering process mean offset being particularly important. Sensitivity and False Alarms must be balanced since CUSUM/EWMA are sensitive to small shifts but false alarm rates also change with parameter settings. Multivariate Monitoring using Hotelling's T² improves detection capability for correlated variables. An Integrated Approach combining multiple methods and managing alarms by severity proves most practical in real applications.
To the Next Chapter
In Chapter 3, we will learn Anomaly Detection and Process Monitoring, covering rule-based anomaly detection using thresholds, statistical anomaly detection methods including Z-score and modified Z-score, and machine learning-based approaches such as Isolation Forest and One-Class SVM. The chapter also explores time-series anomaly detection using LSTM Autoencoders and addresses alarm management system design with strategies for false alarm reduction.