🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 2: Statistical Process Control (SPC) and Control Charts

Practical methods for visualizing process variation and early anomaly detection

📚 Quality Management and QA Introduction Series ⏱️ Reading Time: 40 minutes 💻 Python Examples: 8 📊 Difficulty: Intermediate

2.1 Fundamentals of Statistical Process Control (SPC)

Statistical Process Control (SPC) is a methodology for statistically monitoring process variation and detecting anomalies early. It is based on control charts developed by Dr. Walter Shewhart at Bell Laboratories in the 1920s.

2.1.1 Two Sources of Process Variation

💡 Classification of Variation

  • Common Cause: Unavoidable variation inherent to the process (raw material variability, measurement error, etc.)
  • Special Cause: Variation due to unusual specific factors (equipment failure, operator error, etc.)

The purpose of SPC is to statistically distinguish between normal variation from common causes and abnormal variation from special causes, and to detect and address special causes early.

2.1.2 Basic Structure of Control Charts

A control chart consists of the following elements:

📊 Example 1: X-bar and R Control Charts

The most basic control chart. Simultaneously manages subgroup means and ranges.

# ===================================
# Example 1: X-bar and R Control Charts Implementation
# ===================================

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Tuple, Dict
import warnings
warnings.filterwarnings('ignore')

class XbarRControlChart:
    """X-bar and R Control Chart Implementation

    Used for continuous variable process control. Simultaneously monitors
    subgroup means (X-bar) and ranges (R) to manage process center and spread.

    Parameters:
    -----------
    subgroup_size : int
        Number of samples in subgroup (typically 2-10)

    Attributes:
    -----------
    A2, D3, D4 : float
        Constants for control limit calculation (dependent on subgroup size)
    """

    # Constants by subgroup size (from JIS Z 9020-2)
    CONSTANTS = {
        2: {'A2': 1.880, 'D3': 0, 'D4': 3.267},
        3: {'A2': 1.023, 'D3': 0, 'D4': 2.575},
        4: {'A2': 0.729, 'D3': 0, 'D4': 2.282},
        5: {'A2': 0.577, 'D3': 0, 'D4': 2.115},
        6: {'A2': 0.483, 'D3': 0, 'D4': 2.004},
        7: {'A2': 0.419, 'D3': 0.076, 'D4': 1.924},
        8: {'A2': 0.373, 'D3': 0.136, 'D4': 1.864},
        9: {'A2': 0.337, 'D3': 0.184, 'D4': 1.816},
        10: {'A2': 0.308, 'D3': 0.223, 'D4': 1.777}
    }

    def __init__(self, subgroup_size: int = 5):
        if subgroup_size not in self.CONSTANTS:
            raise ValueError(f"Subgroup size must be 2-10, got {subgroup_size}")

        self.n = subgroup_size
        self.A2 = self.CONSTANTS[subgroup_size]['A2']
        self.D3 = self.CONSTANTS[subgroup_size]['D3']
        self.D4 = self.CONSTANTS[subgroup_size]['D4']

        self.xbar_cl = None
        self.xbar_ucl = None
        self.xbar_lcl = None
        self.r_cl = None
        self.r_ucl = None
        self.r_lcl = None

    def fit(self, data: np.ndarray) -> 'XbarRControlChart':
        """Calculate control limits (Phase I: Establish baseline from initial data)

        Parameters:
        -----------
        data : np.ndarray
            Data with shape (n_subgroups, subgroup_size)

        Returns:
        --------
        self : XbarRControlChart
        """
        if data.shape[1] != self.n:
            raise ValueError(f"Data subgroup size {data.shape[1]} != {self.n}")

        # Calculate mean and range for each subgroup
        xbar = data.mean(axis=1)
        r = data.max(axis=1) - data.min(axis=1)

        # Calculate center line and control limits
        self.xbar_cl = xbar.mean()
        self.r_cl = r.mean()

        # X-bar chart control limits
        self.xbar_ucl = self.xbar_cl + self.A2 * self.r_cl
        self.xbar_lcl = self.xbar_cl - self.A2 * self.r_cl

        # R chart control limits
        self.r_ucl = self.D4 * self.r_cl
        self.r_lcl = self.D3 * self.r_cl

        return self

    def predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
        """Determine control status for new data (Phase II: Continuous monitoring)

        Parameters:
        -----------
        data : np.ndarray
            Data with shape (n_subgroups, subgroup_size)

        Returns:
        --------
        results : dict
            Dictionary containing 'xbar', 'r', 'xbar_ooc', 'r_ooc'
            ooc = out of control
        """
        if self.xbar_cl is None:
            raise ValueError("Call fit() first to establish control limits")

        xbar = data.mean(axis=1)
        r = data.max(axis=1) - data.min(axis=1)

        # Determine out of control
        xbar_ooc = (xbar > self.xbar_ucl) | (xbar < self.xbar_lcl)
        r_ooc = (r > self.r_ucl) | (r < self.r_lcl)

        return {
            'xbar': xbar,
            'r': r,
            'xbar_ooc': xbar_ooc,
            'r_ooc': r_ooc
        }

    def plot(self, data: np.ndarray, title: str = "X-bar and R Control Charts"):
        """Plot control charts

        Parameters:
        -----------
        data : np.ndarray
            Data with shape (n_subgroups, subgroup_size)
        title : str
            Graph title
        """
        results = self.predict(data)
        n_points = len(results['xbar'])

        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))

        # X-bar chart
        ax1.plot(range(1, n_points+1), results['xbar'],
                marker='o', color='#2c3e50', linewidth=1.5, markersize=6)
        ax1.axhline(self.xbar_cl, color='#11998e', linestyle='-',
                   linewidth=2, label='CL')
        ax1.axhline(self.xbar_ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='UCL')
        ax1.axhline(self.xbar_lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='LCL')

        # Highlight out of control points
        ooc_indices = np.where(results['xbar_ooc'])[0]
        if len(ooc_indices) > 0:
            ax1.scatter(ooc_indices + 1, results['xbar'][ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        ax1.set_xlabel('Subgroup Number', fontsize=11)
        ax1.set_ylabel('Subgroup Mean (X-bar)', fontsize=11)
        ax1.set_title('X-bar Control Chart', fontsize=13, fontweight='bold')
        ax1.legend(loc='best')
        ax1.grid(True, alpha=0.3)

        # R chart
        ax2.plot(range(1, n_points+1), results['r'],
                marker='s', color='#2c3e50', linewidth=1.5, markersize=6)
        ax2.axhline(self.r_cl, color='#11998e', linestyle='-',
                   linewidth=2, label='CL')
        ax2.axhline(self.r_ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='UCL')
        ax2.axhline(self.r_lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='LCL')

        # Highlight out of control points
        ooc_indices = np.where(results['r_ooc'])[0]
        if len(ooc_indices) > 0:
            ax2.scatter(ooc_indices + 1, results['r'][ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        ax2.set_xlabel('Subgroup Number', fontsize=11)
        ax2.set_ylabel('Subgroup Range (R)', fontsize=11)
        ax2.set_title('R Control Chart', fontsize=13, fontweight='bold')
        ax2.legend(loc='best')
        ax2.grid(True, alpha=0.3)

        plt.suptitle(title, fontsize=15, fontweight='bold', y=0.995)
        plt.tight_layout()
        plt.show()


# Example usage: Chemical process temperature management
# Phase I: Establish control limits with initial 25 subgroups
np.random.seed(42)
n_subgroups_phase1 = 25
subgroup_size = 5
target_temp = 80.0  # Target temperature [°C]

# Normal operation data (mean 80°C, standard deviation 0.5°C)
phase1_data = np.random.normal(target_temp, 0.5,
                               (n_subgroups_phase1, subgroup_size))

# Build control chart
xbar_r_chart = XbarRControlChart(subgroup_size=subgroup_size)
xbar_r_chart.fit(phase1_data)

print("=== Phase I: Establishing Control Limits ===")
print(f"X-bar chart:")
print(f"  CL  = {xbar_r_chart.xbar_cl:.3f} °C")
print(f"  UCL = {xbar_r_chart.xbar_ucl:.3f} °C")
print(f"  LCL = {xbar_r_chart.xbar_lcl:.3f} °C")
print(f"\nR chart:")
print(f"  CL  = {xbar_r_chart.r_cl:.3f} °C")
print(f"  UCL = {xbar_r_chart.r_ucl:.3f} °C")
print(f"  LCL = {xbar_r_chart.r_lcl:.3f} °C")

# Phase II: Continuous monitoring (30 subgroups, anomaly occurs midway)
n_subgroups_phase2 = 30
phase2_data = np.random.normal(target_temp, 0.5,
                               (n_subgroups_phase2, subgroup_size))

# Subgroups 15-20 show mean shift (+1.5°C)
phase2_data[14:20, :] += 1.5

# Subgroup 25 shows increased spread
phase2_data[24, :] = np.random.normal(target_temp, 2.0, subgroup_size)

# Monitor
results = xbar_r_chart.predict(phase2_data)

print("\n=== Phase II: Continuous Monitoring ===")
print(f"X-bar out of control: {results['xbar_ooc'].sum()} points "
      f"(Subgroups {np.where(results['xbar_ooc'])[0] + 1})")
print(f"R out of control: {results['r_ooc'].sum()} points "
      f"(Subgroups {np.where(results['r_ooc'])[0] + 1})")

# Plot
xbar_r_chart.plot(phase2_data,
                  title="Chemical Process Temperature Monitoring")

# Expected output:
# === Phase I: Establishing Control Limits ===
# X-bar chart:
#   CL  = 79.995 °C
#   UCL = 80.666 °C
#   LCL = 79.323 °C
#
# R chart:
#   CL  = 1.165 °C
#   UCL = 2.464 °C
#   LCL = 0.000 °C
#
# === Phase II: Continuous Monitoring ===
# X-bar out of control: 5 points (Subgroups [15 16 17 18 19])
# R out of control: 1 point (Subgroups [25])

⚠️ Note: Difference Between Control Limits and Specification Limits

Control limits (±3σ) are calculated from process variation and determine whether the process is statistically stable. Specification limits, on the other hand, are determined by customer requirements or design specifications and determine product acceptance. Products can be out of specification even when within control limits.

2.2 Control Charts for Attribute Data

For attribute data such as number of defectives or defects, rather than continuous variables, different control charts are used.

2.2.1 P-Chart (Proportion Defective Chart)

📊 Example 2: P-Chart

Manages the proportion defective in sampled products from inspection lots.

# ===================================
# Example 2: P-Chart for Proportion Defective
# ===================================

class PControlChart:
    """P-Chart (Proportion Defective Chart) Implementation

    Used for managing defect rate data following binomial distribution.
    Applicable even when sample sizes are not constant.

    Attributes:
    -----------
    p_bar : float
        Overall average defect rate
    ucl, lcl : np.ndarray
        Control limits according to each sample size
    """

    def __init__(self):
        self.p_bar = None
        self.ucl = None
        self.lcl = None
        self.n = None

    def fit(self, defectives: np.ndarray, sample_sizes: np.ndarray):
        """Calculate control limits

        Parameters:
        -----------
        defectives : np.ndarray
            Number of defectives in each lot
        sample_sizes : np.ndarray
            Sample size for each lot
        """
        # Calculate average defect rate
        self.p_bar = defectives.sum() / sample_sizes.sum()
        self.n = sample_sizes

        # Control limits according to each sample size (±3σ)
        # σ_p = sqrt(p_bar * (1 - p_bar) / n)
        sigma_p = np.sqrt(self.p_bar * (1 - self.p_bar) / self.n)

        self.ucl = self.p_bar + 3 * sigma_p
        self.lcl = self.p_bar - 3 * sigma_p

        # LCL cannot be negative
        self.lcl = np.maximum(self.lcl, 0)

        # UCL cannot exceed 1
        self.ucl = np.minimum(self.ucl, 1)

        return self

    def predict(self, defectives: np.ndarray,
                sample_sizes: np.ndarray) -> Dict[str, np.ndarray]:
        """Determine control status

        Parameters:
        -----------
        defectives : np.ndarray
            Number of defectives in each lot
        sample_sizes : np.ndarray
            Sample size for each lot

        Returns:
        --------
        results : dict
            Dictionary containing 'p', 'ucl', 'lcl', 'ooc'
        """
        if self.p_bar is None:
            raise ValueError("Call fit() first")

        # Calculate defect rate
        p = defectives / sample_sizes

        # Recalculate control limits (when sample sizes differ)
        sigma_p = np.sqrt(self.p_bar * (1 - self.p_bar) / sample_sizes)
        ucl = np.minimum(self.p_bar + 3 * sigma_p, 1)
        lcl = np.maximum(self.p_bar - 3 * sigma_p, 0)

        # Determine out of control
        ooc = (p > ucl) | (p < lcl)

        return {
            'p': p,
            'ucl': ucl,
            'lcl': lcl,
            'ooc': ooc
        }

    def plot(self, defectives: np.ndarray, sample_sizes: np.ndarray,
             title: str = "P Control Chart"):
        """Plot P-chart"""
        results = self.predict(defectives, sample_sizes)
        n_points = len(results['p'])

        plt.figure(figsize=(12, 6))

        # Plot defect rate
        plt.plot(range(1, n_points+1), results['p'],
                marker='o', color='#2c3e50', linewidth=1.5,
                markersize=6, label='Proportion Defective')

        # Center line
        plt.axhline(self.p_bar, color='#11998e', linestyle='-',
                   linewidth=2, label=f'CL = {self.p_bar:.4f}')

        # Control limits (stepped if sample sizes vary)
        plt.plot(range(1, n_points+1), results['ucl'],
                color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
        plt.plot(range(1, n_points+1), results['lcl'],
                color='#e74c3c', linestyle='--', linewidth=2, label='LCL')

        # Highlight out of control points
        ooc_indices = np.where(results['ooc'])[0]
        if len(ooc_indices) > 0:
            plt.scatter(ooc_indices + 1, results['p'][ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        plt.xlabel('Lot Number', fontsize=11)
        plt.ylabel('Proportion Defective', fontsize=11)
        plt.title(title, fontsize=13, fontweight='bold')
        plt.legend(loc='best')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()


# Example usage: Semiconductor manufacturing defect rate management
np.random.seed(42)

# Phase I: Establish control limits with initial 30 lots
n_lots_phase1 = 30
sample_sizes_phase1 = np.random.randint(100, 200, n_lots_phase1)
true_defect_rate = 0.02  # True defect rate 2%

# Generate number of defectives with binomial distribution
defectives_phase1 = np.random.binomial(sample_sizes_phase1,
                                       true_defect_rate)

# Build P-chart
p_chart = PControlChart()
p_chart.fit(defectives_phase1, sample_sizes_phase1)

print("=== P-Chart: Establishing Control Limits ===")
print(f"Average defect rate (p-bar) = {p_chart.p_bar:.4f} ({p_chart.p_bar*100:.2f}%)")

# Phase II: Continuous monitoring (40 lots, defect rate increases midway)
n_lots_phase2 = 40
sample_sizes_phase2 = np.random.randint(100, 200, n_lots_phase2)

defectives_phase2 = np.random.binomial(sample_sizes_phase2,
                                       true_defect_rate)

# Lots 25-30 show 3x defect rate increase (process anomaly)
defectives_phase2[24:30] = np.random.binomial(
    sample_sizes_phase2[24:30], true_defect_rate * 3)

# Monitor
results = p_chart.predict(defectives_phase2, sample_sizes_phase2)

print(f"\n=== Phase II: Continuous Monitoring ===")
print(f"Out of control lots: {results['ooc'].sum()} lots")
print(f"Corresponding lot numbers: {np.where(results['ooc'])[0] + 1}")

# Plot
p_chart.plot(defectives_phase2, sample_sizes_phase2,
             title="Semiconductor Manufacturing - Defect Rate Monitoring")

# Expected output:
# === P-Chart: Establishing Control Limits ===
# Average defect rate (p-bar) = 0.0189 (1.89%)
#
# === Phase II: Continuous Monitoring ===
# Out of control lots: 6 lots
# Corresponding lot numbers: [25 26 27 28 29 30]

2.2.2 C-Chart (Count of Defects Chart)

📊 Example 3: C-Chart

Manages the number of defects per constant inspection unit. Assumes Poisson distribution.

# ===================================
# Example 3: C-Chart for Count of Defects
# ===================================

class CControlChart:
    """C-Chart (Count of Defects Chart) Implementation

    Manages number of defects per constant inspection unit (area, time, volume, etc.).
    Assumes defect count follows Poisson distribution.

    Attributes:
    -----------
    c_bar : float
        Average defect count
    ucl, lcl : float
        Control limits
    """

    def __init__(self):
        self.c_bar = None
        self.ucl = None
        self.lcl = None

    def fit(self, defects: np.ndarray):
        """Calculate control limits

        Parameters:
        -----------
        defects : np.ndarray
            Defect count for each inspection unit
        """
        # Calculate average defect count
        self.c_bar = defects.mean()

        # Poisson distribution standard deviation = sqrt(mean)
        # Control limits = mean ± 3 * sqrt(mean)
        sigma_c = np.sqrt(self.c_bar)

        self.ucl = self.c_bar + 3 * sigma_c
        self.lcl = self.c_bar - 3 * sigma_c

        # LCL cannot be negative
        self.lcl = max(self.lcl, 0)

        return self

    def predict(self, defects: np.ndarray) -> Dict[str, np.ndarray]:
        """Determine control status

        Parameters:
        -----------
        defects : np.ndarray
            Defect count for each inspection unit

        Returns:
        --------
        results : dict
            Dictionary containing 'c', 'ooc'
        """
        if self.c_bar is None:
            raise ValueError("Call fit() first")

        # Determine out of control
        ooc = (defects > self.ucl) | (defects < self.lcl)

        return {
            'c': defects,
            'ooc': ooc
        }

    def plot(self, defects: np.ndarray, title: str = "C Control Chart"):
        """Plot C-chart"""
        results = self.predict(defects)
        n_points = len(defects)

        plt.figure(figsize=(12, 6))

        # Plot defect count
        plt.plot(range(1, n_points+1), defects,
                marker='o', color='#2c3e50', linewidth=1.5,
                markersize=6, label='Count of Defects')

        # Center line and control limits
        plt.axhline(self.c_bar, color='#11998e', linestyle='-',
                   linewidth=2, label=f'CL = {self.c_bar:.2f}')
        plt.axhline(self.ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label=f'UCL = {self.ucl:.2f}')
        plt.axhline(self.lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label=f'LCL = {self.lcl:.2f}')

        # Highlight out of control points
        ooc_indices = np.where(results['ooc'])[0]
        if len(ooc_indices) > 0:
            plt.scatter(ooc_indices + 1, defects[ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        plt.xlabel('Inspection Unit', fontsize=11)
        plt.ylabel('Number of Defects', fontsize=11)
        plt.title(title, fontsize=13, fontweight='bold')
        plt.legend(loc='best')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()


# Example usage: Paint surface defect management (defects per m²)
np.random.seed(42)

# Phase I: Establish control limits with initial 30 panels
n_panels_phase1 = 30
avg_defects = 3.5  # Average 3.5 defects/m²

# Generate defect count with Poisson distribution
defects_phase1 = np.random.poisson(avg_defects, n_panels_phase1)

# Build C-chart
c_chart = CControlChart()
c_chart.fit(defects_phase1)

print("=== C-Chart: Establishing Control Limits ===")
print(f"Average defect count (c-bar) = {c_chart.c_bar:.2f} defects/m²")
print(f"UCL = {c_chart.ucl:.2f} defects/m²")
print(f"LCL = {c_chart.lcl:.2f} defects/m²")

# Phase II: Continuous monitoring (50 panels, defect increase midway)
n_panels_phase2 = 50
defects_phase2 = np.random.poisson(avg_defects, n_panels_phase2)

# Panels 30-35 show 2x defect increase (paint degradation)
defects_phase2[29:35] = np.random.poisson(avg_defects * 2, 6)

# Monitor
results = c_chart.predict(defects_phase2)

print(f"\n=== Phase II: Continuous Monitoring ===")
print(f"Out of control panels: {results['ooc'].sum()} panels")
print(f"Corresponding panel numbers: {np.where(results['ooc'])[0] + 1}")

# Plot
c_chart.plot(defects_phase2,
             title="Paint Defects Monitoring (per m²)")

# Expected output:
# === C-Chart: Establishing Control Limits ===
# Average defect count (c-bar) = 3.40 defects/m²
# UCL = 8.93 defects/m²
# LCL = 0.00 defects/m²
#
# === Phase II: Continuous Monitoring ===
# Out of control panels: 2 panels
# Corresponding panel numbers: [30 33]

2.3 Advanced Control Chart Methods

Traditional control charts excel at detecting large changes but have limitations in early detection of small changes. The following methods enable more sensitive monitoring.

2.3.1 CUSUM Control Chart (Cumulative Sum Control Chart)

💡 CUSUM Features

By accumulating deviations from the mean, detects small changes (approximately 0.5-2σ) early. Can detect 4-8 times faster than Shewhart control charts on average.

📊 Example 4: CUSUM Control Chart

Implementation of cumulative sum control chart (tabular CUSUM)

# ===================================
# Example 4: CUSUM (Cumulative Sum) Control Chart
# ===================================

class CUSUMControlChart:
    """CUSUM Control Chart Implementation (Tabular CUSUM)

    Method for early detection of small mean shifts (0.5-2σ).
    Calculates upper and lower CUSUM statistics separately.

    Parameters:
    -----------
    target : float
        Target value (process mean)
    std : float
        Process standard deviation
    k : float
        Reference value (allowance). Typically half of shift to detect.
        Default: 0.5σ (for detecting 1σ shift)
    h : float
        Decision interval. Functions as control limit.
        Default: 5σ (corresponds to Type I error rate of 0.27%)

    Attributes:
    -----------
    cusum_high : np.ndarray
        Upper CUSUM statistic
    cusum_low : np.ndarray
        Lower CUSUM statistic
    """

    def __init__(self, target: float, std: float,
                 k: float = 0.5, h: float = 5.0):
        self.target = target
        self.std = std
        self.k = k * std  # Reference value (absolute)
        self.h = h * std  # Decision interval (absolute)

        self.cusum_high = None
        self.cusum_low = None

    def fit_predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
        """Calculate CUSUM statistics and detect out of control

        Parameters:
        -----------
        data : np.ndarray
            Observation data

        Returns:
        --------
        results : dict
            Dictionary containing 'cusum_high', 'cusum_low', 'ooc_high', 'ooc_low'
        """
        n = len(data)
        self.cusum_high = np.zeros(n)
        self.cusum_low = np.zeros(n)

        # Sequential calculation of CUSUM statistics
        for i in range(n):
            if i == 0:
                # Initial value
                self.cusum_high[i] = max(0, data[i] - self.target - self.k)
                self.cusum_low[i] = max(0, self.target - data[i] - self.k)
            else:
                # Sequential update formula
                self.cusum_high[i] = max(0, self.cusum_high[i-1] +
                                         data[i] - self.target - self.k)
                self.cusum_low[i] = max(0, self.cusum_low[i-1] +
                                        self.target - data[i] - self.k)

        # Determine out of control
        ooc_high = self.cusum_high > self.h
        ooc_low = self.cusum_low > self.h

        return {
            'data': data,
            'cusum_high': self.cusum_high,
            'cusum_low': self.cusum_low,
            'ooc_high': ooc_high,
            'ooc_low': ooc_low
        }

    def plot(self, data: np.ndarray, title: str = "CUSUM Control Chart"):
        """Plot CUSUM control chart"""
        results = self.fit_predict(data)
        n_points = len(data)

        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))

        # Plot original data
        ax1.plot(range(1, n_points+1), data,
                marker='o', color='#2c3e50', linewidth=1.5, markersize=5)
        ax1.axhline(self.target, color='#11998e', linestyle='-',
                   linewidth=2, label='Target')
        ax1.axhline(self.target + self.k, color='#f39c12',
                   linestyle=':', linewidth=1.5, label=f'Target ± k')
        ax1.axhline(self.target - self.k, color='#f39c12',
                   linestyle=':', linewidth=1.5)
        ax1.set_ylabel('Observation', fontsize=11)
        ax1.set_title('Process Data', fontsize=12, fontweight='bold')
        ax1.legend(loc='best')
        ax1.grid(True, alpha=0.3)

        # Plot CUSUM statistics
        ax2.plot(range(1, n_points+1), results['cusum_high'],
                marker='^', color='#e74c3c', linewidth=1.5,
                markersize=5, label='CUSUM High')
        ax2.plot(range(1, n_points+1), results['cusum_low'],
                marker='v', color='#3498db', linewidth=1.5,
                markersize=5, label='CUSUM Low')
        ax2.axhline(self.h, color='#2c3e50', linestyle='--',
                   linewidth=2, label=f'Decision Interval (h={self.h/self.std:.1f}σ)')
        ax2.axhline(0, color='gray', linestyle='-', linewidth=1)

        # Highlight out of control points
        ooc_high_indices = np.where(results['ooc_high'])[0]
        ooc_low_indices = np.where(results['ooc_low'])[0]

        if len(ooc_high_indices) > 0:
            ax2.scatter(ooc_high_indices + 1,
                       results['cusum_high'][ooc_high_indices],
                       color='red', s=100, zorder=5, marker='^')
        if len(ooc_low_indices) > 0:
            ax2.scatter(ooc_low_indices + 1,
                       results['cusum_low'][ooc_low_indices],
                       color='red', s=100, zorder=5, marker='v')

        ax2.set_xlabel('Observation Number', fontsize=11)
        ax2.set_ylabel('CUSUM Statistic', fontsize=11)
        ax2.set_title('CUSUM Statistics', fontsize=12, fontweight='bold')
        ax2.legend(loc='best')
        ax2.grid(True, alpha=0.3)

        plt.suptitle(title, fontsize=15, fontweight='bold', y=0.995)
        plt.tight_layout()
        plt.show()


# Example usage: Chemical process mean shift detection
np.random.seed(42)

# Target value and standard deviation
target_value = 100.0
process_std = 1.0

# Generate data (100 observations)
n_obs = 100
data = np.random.normal(target_value, process_std, n_obs)

# Small shift of +0.8σ from observation 40 (difficult to detect with Shewhart chart)
data[40:] += 0.8 * process_std

# Apply CUSUM control chart (configured to detect 1σ shift)
cusum_chart = CUSUMControlChart(
    target=target_value,
    std=process_std,
    k=0.5,  # Half of 1σ shift
    h=5.0   # Decision interval
)

results = cusum_chart.fit_predict(data)

print("=== CUSUM Control Chart ===")
print(f"Target value: {target_value}")
print(f"Reference value (k): {cusum_chart.k:.3f} ({cusum_chart.k/process_std:.1f}σ)")
print(f"Decision interval (h): {cusum_chart.h:.3f} ({cusum_chart.h/process_std:.1f}σ)")

# Identify detection point
if results['ooc_high'].any():
    first_ooc = np.where(results['ooc_high'])[0][0] + 1
    print(f"\nUpward shift detected: Observation #{first_ooc}")
    print(f"  Actual shift start: Observation #41")
    print(f"  Detection delay: {first_ooc - 41} observations")

# Plot
cusum_chart.plot(data,
                 title="CUSUM Chart - Small Mean Shift Detection")

# Expected output:
# === CUSUM Control Chart ===
# Target value: 100.0
# Reference value (k): 0.500 (0.5σ)
# Decision interval (h): 5.000 (5.0σ)
#
# Upward shift detected: Observation #49
#   Actual shift start: Observation #41
#   Detection delay: 8 observations

2.3.2 EWMA Control Chart (Exponentially Weighted Moving Average Control Chart)

📊 Example 5: EWMA Control Chart

Control chart using exponentially weighted moving average. Like CUSUM, detects small shifts.

# ===================================
# Example 5: EWMA (Exponentially Weighted Moving Average) Control Chart
# ===================================

class EWMAControlChart:
    """EWMA Control Chart Implementation

    Smoothing by applying exponentially decaying weights to past observations.
    Like CUSUM, excels at detecting small shifts.

    Parameters:
    -----------
    target : float
        Target value (process mean)
    std : float
        Process standard deviation
    lambda_ : float
        Weight coefficient (0 < λ ≤ 1). Smaller values give more weight to past.
        Typical values: 0.05-0.25
    L : float
        Control limit width (multiple of standard deviation). Default: 3.0

    Attributes:
    -----------
    ewma : np.ndarray
        EWMA statistic
    """

    def __init__(self, target: float, std: float,
                 lambda_: float = 0.2, L: float = 3.0):
        if not 0 < lambda_ <= 1:
            raise ValueError("lambda_ must be in (0, 1]")

        self.target = target
        self.std = std
        self.lambda_ = lambda_
        self.L = L

        self.ewma = None
        self.ucl = None
        self.lcl = None

    def fit_predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
        """Calculate EWMA statistics and detect out of control

        Parameters:
        -----------
        data : np.ndarray
            Observation data

        Returns:
        --------
        results : dict
            Dictionary containing 'ewma', 'ucl', 'lcl', 'ooc'
        """
        n = len(data)
        self.ewma = np.zeros(n)
        self.ucl = np.zeros(n)
        self.lcl = np.zeros(n)

        # Sequential calculation of EWMA statistics
        # Z_t = λ * X_t + (1-λ) * Z_{t-1}
        # Initial value: Z_0 = target
        self.ewma[0] = self.lambda_ * data[0] + (1 - self.lambda_) * self.target

        for i in range(1, n):
            self.ewma[i] = self.lambda_ * data[i] + (1 - self.lambda_) * self.ewma[i-1]

        # Calculate control limits (time-dependent)
        # σ_Z(t) = σ * sqrt(λ/(2-λ) * [1 - (1-λ)^(2t)])
        for i in range(n):
            t = i + 1
            sigma_z = self.std * np.sqrt(
                (self.lambda_ / (2 - self.lambda_)) *
                (1 - (1 - self.lambda_)**(2 * t))
            )

            self.ucl[i] = self.target + self.L * sigma_z
            self.lcl[i] = self.target - self.L * sigma_z

        # Determine out of control
        ooc = (self.ewma > self.ucl) | (self.ewma < self.lcl)

        return {
            'data': data,
            'ewma': self.ewma,
            'ucl': self.ucl,
            'lcl': self.lcl,
            'ooc': ooc
        }

    def plot(self, data: np.ndarray, title: str = "EWMA Control Chart"):
        """Plot EWMA control chart"""
        results = self.fit_predict(data)
        n_points = len(data)

        plt.figure(figsize=(12, 6))

        # Plot EWMA statistic
        plt.plot(range(1, n_points+1), results['ewma'],
                marker='o', color='#2c3e50', linewidth=2,
                markersize=5, label='EWMA')

        # Original data (faint)
        plt.plot(range(1, n_points+1), data,
                marker='.', color='gray', linewidth=0.5,
                markersize=3, alpha=0.5, label='Original Data')

        # Center line and control limits
        plt.axhline(self.target, color='#11998e', linestyle='-',
                   linewidth=2, label='Target')
        plt.plot(range(1, n_points+1), results['ucl'],
                color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
        plt.plot(range(1, n_points+1), results['lcl'],
                color='#e74c3c', linestyle='--', linewidth=2, label='LCL')

        # Highlight out of control points
        ooc_indices = np.where(results['ooc'])[0]
        if len(ooc_indices) > 0:
            plt.scatter(ooc_indices + 1, results['ewma'][ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        plt.xlabel('Observation Number', fontsize=11)
        plt.ylabel('EWMA Statistic', fontsize=11)
        plt.title(title + f" (λ={self.lambda_}, L={self.L})",
                 fontsize=13, fontweight='bold')
        plt.legend(loc='best')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()


# Example usage: Pharmaceutical manufacturing content management
np.random.seed(42)

# Target value and standard deviation
target_content = 50.0  # mg
process_std = 0.5      # mg

# Generate data (120 observations)
n_obs = 120
data = np.random.normal(target_content, process_std, n_obs)

# Small shift of +0.5σ from observation 60
data[60:] += 0.5 * process_std

# Apply EWMA control chart (λ=0.2 for moderate smoothing)
ewma_chart = EWMAControlChart(
    target=target_content,
    std=process_std,
    lambda_=0.2,
    L=3.0
)

results = ewma_chart.fit_predict(data)

print("=== EWMA Control Chart ===")
print(f"Target value: {target_content} mg")
print(f"Weight coefficient (λ): {ewma_chart.lambda_}")
print(f"Control limit width (L): {ewma_chart.L}σ")

# Identify detection point
if results['ooc'].any():
    first_ooc = np.where(results['ooc'])[0][0] + 1
    print(f"\nShift detected: Observation #{first_ooc}")
    print(f"  Actual shift start: Observation #61")
    print(f"  Detection delay: {first_ooc - 61} observations")

# Plot
ewma_chart.plot(data,
                title="EWMA Chart - Drug Content Monitoring")

# Expected output:
# === EWMA Control Chart ===
# Target value: 50.0 mg
# Weight coefficient (λ): 0.2
# Control limit width (L): 3.0σ
#
# Shift detected: Observation #68
#   Actual shift start: Observation #61
#   Detection delay: 7 observations

2.4 Integration of Process Capability and Control Charts

2.4.1 Process Capability Indices and Control Charts

After confirming the process is statistically stable using control charts, calculate process capability indices.

📊 Example 6: Process Capability Analysis with Control Charts

Integrated implementation of control charts and process capability analysis

# ===================================
# Example 6: Integrated Process Capability Analysis
# ===================================

from scipy import stats

class ProcessCapabilityAnalysis:
    """Integrated Control Chart and Process Capability Analysis Class

    1. Confirm process stability using control charts
    2. Calculate capability indices for stable process
    3. Generate comprehensive evaluation report
    """

    def __init__(self, LSL: float, USL: float, target: float = None):
        """
        Parameters:
        -----------
        LSL : float
            Lower Specification Limit
        USL : float
            Upper Specification Limit
        target : float, optional
            Target value. If omitted, uses specification center
        """
        self.LSL = LSL
        self.USL = USL
        self.target = target if target is not None else (LSL + USL) / 2

        self.control_chart = None
        self.capability_results = None

    def analyze(self, data: np.ndarray,
                subgroup_size: int = 5) -> Dict:
        """Execute integrated analysis

        Parameters:
        -----------
        data : np.ndarray
            Data with shape (n_subgroups, subgroup_size)
        subgroup_size : int
            Subgroup size

        Returns:
        --------
        results : dict
            Dictionary containing control chart information and process capability indices
        """
        # Step 1: Confirm stability with X-bar and R chart
        self.control_chart = XbarRControlChart(subgroup_size)
        self.control_chart.fit(data)

        chart_results = self.control_chart.predict(data)

        # Check for out of control points
        n_ooc_xbar = chart_results['xbar_ooc'].sum()
        n_ooc_r = chart_results['r_ooc'].sum()
        is_stable = (n_ooc_xbar == 0) and (n_ooc_r == 0)

        # Step 2: Calculate process capability indices (recommended only when stable)
        all_data = data.flatten()
        process_mean = all_data.mean()
        process_std = all_data.std(ddof=1)

        # Cp: Process capability index (assumes centered)
        Cp = (self.USL - self.LSL) / (6 * process_std)

        # Cpk: Adjusted process capability index (considers actual mean position)
        Cpu = (self.USL - process_mean) / (3 * process_std)
        Cpl = (process_mean - self.LSL) / (3 * process_std)
        Cpk = min(Cpu, Cpl)

        # Cpm: Taguchi capability index (considers deviation from target)
        tau = np.sqrt(process_std**2 + (process_mean - self.target)**2)
        Cpm = (self.USL - self.LSL) / (6 * tau)

        # Estimate out-of-spec rate (assuming normal distribution)
        z_lower = (self.LSL - process_mean) / process_std
        z_upper = (self.USL - process_mean) / process_std

        ppm_lower = stats.norm.cdf(z_lower) * 1e6
        ppm_upper = (1 - stats.norm.cdf(z_upper)) * 1e6
        ppm_total = ppm_lower + ppm_upper

        # Step 3: Comprehensive evaluation
        self.capability_results = {
            'stable': is_stable,
            'n_ooc_xbar': n_ooc_xbar,
            'n_ooc_r': n_ooc_r,
            'process_mean': process_mean,
            'process_std': process_std,
            'Cp': Cp,
            'Cpk': Cpk,
            'Cpm': Cpm,
            'Cpu': Cpu,
            'Cpl': Cpl,
            'ppm_lower': ppm_lower,
            'ppm_upper': ppm_upper,
            'ppm_total': ppm_total,
            'LSL': self.LSL,
            'USL': self.USL,
            'target': self.target
        }

        return self.capability_results

    def print_report(self):
        """Output comprehensive evaluation report"""
        if self.capability_results is None:
            raise ValueError("Call analyze() first")

        r = self.capability_results

        print("=" * 60)
        print("Integrated Process Capability Analysis Report")
        print("=" * 60)

        print("\n[1. Process Stability (Control Chart)]")
        if r['stable']:
            print("  ✅ Process is statistically stable")
        else:
            print("  ⚠️ Process is unstable (out of control points present)")
            print(f"    X-bar out of control: {r['n_ooc_xbar']} points")
            print(f"    R out of control: {r['n_ooc_r']} points")
            print("  → Stabilize process first")

        print("\n[2. Process Characteristics]")
        print(f"  Process mean: {r['process_mean']:.4f}")
        print(f"  Process standard deviation: {r['process_std']:.4f}")
        print(f"  Specification range: [{r['LSL']:.2f}, {r['USL']:.2f}]")
        print(f"  Target value: {r['target']:.2f}")

        print("\n[3. Process Capability Indices]")
        print(f"  Cp  = {r['Cp']:.3f}  ", end="")
        print(self._evaluate_index(r['Cp']))

        print(f"  Cpk = {r['Cpk']:.3f}  ", end="")
        print(self._evaluate_index(r['Cpk']))
        print(f"    (Cpu={r['Cpu']:.3f}, Cpl={r['Cpl']:.3f})")

        print(f"  Cpm = {r['Cpm']:.3f}  ", end="")
        print(self._evaluate_index(r['Cpm']))

        print("\n[4. Estimated Out-of-Spec Rate]")
        print(f"  Below LSL: {r['ppm_lower']:.1f} ppm")
        print(f"  Above USL: {r['ppm_upper']:.1f} ppm")
        print(f"  Total: {r['ppm_total']:.1f} ppm ({r['ppm_total']/1e4:.3f}%)")

        print("\n[5. Overall Assessment]")
        if r['stable'] and r['Cpk'] >= 1.33:
            print("  ✅ Process is stable and has sufficient capability")
        elif r['stable'] and r['Cpk'] >= 1.0:
            print("  ⚠️ Process is stable but capability improvement needed")
        else:
            print("  ❌ Process improvement required")

        print("=" * 60)

    def _evaluate_index(self, value: float) -> str:
        """Evaluate capability index"""
        if value >= 1.67:
            return "(Excellent)"
        elif value >= 1.33:
            return "(Good)"
        elif value >= 1.0:
            return "(Minimum)"
        else:
            return "(Insufficient)"

    def plot_capability(self, data: np.ndarray):
        """Histogram and normal distribution fit for process capability"""
        if self.capability_results is None:
            raise ValueError("Call analyze() first")

        r = self.capability_results
        all_data = data.flatten()

        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

        # Histogram and normal distribution fit
        ax1.hist(all_data, bins=30, density=True, alpha=0.6,
                color='#3498db', edgecolor='black', label='Data')

        # Normal distribution curve
        x = np.linspace(all_data.min(), all_data.max(), 200)
        pdf = stats.norm.pdf(x, r['process_mean'], r['process_std'])
        ax1.plot(x, pdf, 'r-', linewidth=2, label='Normal Fit')

        # Specification limits and target
        ax1.axvline(r['LSL'], color='#e74c3c', linestyle='--',
                   linewidth=2, label='LSL')
        ax1.axvline(r['USL'], color='#e74c3c', linestyle='--',
                   linewidth=2, label='USL')
        ax1.axvline(r['target'], color='#11998e', linestyle='-',
                   linewidth=2, label='Target')

        ax1.set_xlabel('Value', fontsize=11)
        ax1.set_ylabel('Density', fontsize=11)
        ax1.set_title('Process Capability Histogram',
                     fontsize=13, fontweight='bold')
        ax1.legend(loc='best')
        ax1.grid(True, alpha=0.3)

        # Capability indices bar chart
        indices = ['Cp', 'Cpk', 'Cpm']
        values = [r['Cp'], r['Cpk'], r['Cpm']]
        colors = ['#3498db', '#e74c3c', '#11998e']

        bars = ax2.bar(indices, values, color=colors, alpha=0.7,
                      edgecolor='black', linewidth=1.5)

        # Reference lines
        ax2.axhline(1.33, color='green', linestyle='--',
                   linewidth=2, label='Good (1.33)', alpha=0.7)
        ax2.axhline(1.0, color='orange', linestyle='--',
                   linewidth=2, label='Minimum (1.0)', alpha=0.7)

        # Display values on top of bars
        for bar, value in zip(bars, values):
            height = bar.get_height()
            ax2.text(bar.get_x() + bar.get_width()/2., height,
                    f'{value:.3f}',
                    ha='center', va='bottom', fontweight='bold')

        ax2.set_ylabel('Index Value', fontsize=11)
        ax2.set_title('Process Capability Indices',
                     fontsize=13, fontweight='bold')
        ax2.legend(loc='best')
        ax2.grid(True, alpha=0.3, axis='y')
        ax2.set_ylim(0, max(values) * 1.3)

        plt.tight_layout()
        plt.show()


# Example usage: Injection molding dimension management
np.random.seed(42)

# Specification setting
LSL = 49.5  # mm
USL = 50.5  # mm
target = 50.0  # mm

# Generate data (40 subgroups, size 5)
n_subgroups = 40
subgroup_size = 5

# First 30 subgroups are stable (mean 50.0, standard deviation 0.15)
data_stable = np.random.normal(50.0, 0.15, (30, subgroup_size))

# Last 10 subgroups show slight mean shift (50.0 → 50.1)
data_shift = np.random.normal(50.1, 0.15, (10, subgroup_size))

data = np.vstack([data_stable, data_shift])

# Execute integrated analysis
analysis = ProcessCapabilityAnalysis(LSL, USL, target)
results = analysis.analyze(data, subgroup_size)

# Output report
analysis.print_report()

# Visualization
analysis.control_chart.plot(data,
    title="Injection Molding - Dimension Control")
analysis.plot_capability(data)

# Expected output:
# ============================================================
# Integrated Process Capability Analysis Report
# ============================================================
#
# [1. Process Stability (Control Chart)]
#   ⚠️ Process is unstable (out of control points present)
#     X-bar out of control: 6 points
#     R out of control: 0 points
#   → Stabilize process first
#
# [2. Process Characteristics]
#   Process mean: 50.0266
#   Process standard deviation: 0.1505
#   Specification range: [49.50, 50.50]
#   Target value: 50.00
#
# [3. Process Capability Indices]
#   Cp  = 1.107  (Minimum)
#   Cpk = 1.049  (Minimum)
#     (Cpu=1.049, Cpl=1.165)
#   Cpm = 1.089  (Minimum)
#
# [4. Estimated Out-of-Spec Rate]
#   Below LSL: 0.1 ppm
#   Above USL: 8.6 ppm
#   Total: 8.7 ppm (0.001%)
#
# [5. Overall Assessment]
#   ⚠️ Process is stable but capability improvement needed
# ============================================================

2.5 Control Chart Decision Rules

2.5.1 Western Electric Rules

An advanced set of rules for detecting anomalies from data patterns, not just single points exceeding control limits.

📊 Example 7: Western Electric Rules Implementation

Implementation of 8 decision rules

# ===================================
# Example 7: Western Electric Rules for Pattern Detection
# ===================================

class WesternElectricRules:
    """Western Electric Rules Implementation

    Eight rules for detecting anomalies from data patterns on control charts.
    Detects not only single points exceeding control limits but also trends and runs.

    Rules:
    ------
    Rule 1: One point beyond 3σ
    Rule 2: Two out of three consecutive points beyond 2σ (same side)
    Rule 3: Four out of five consecutive points beyond 1σ (same side)
    Rule 4: Eight consecutive points on same side of center line
    Rule 5: Six consecutive points monotonically increasing or decreasing
    Rule 6: Fourteen consecutive points alternating up and down
    Rule 7: Fifteen consecutive points within 1σ
    Rule 8: Eight consecutive points beyond 1σ (either side)
    """

    def __init__(self, center_line: float, ucl: float, lcl: float):
        """
        Parameters:
        -----------
        center_line : float
            Center line (mean)
        ucl : float
            Upper control limit (CL + 3σ)
        lcl : float
            Lower control limit (CL - 3σ)
        """
        self.cl = center_line
        self.ucl = ucl
        self.lcl = lcl

        # Calculate zone boundaries
        sigma = (ucl - center_line) / 3

        self.zone_a_upper = center_line + 2 * sigma
        self.zone_a_lower = center_line - 2 * sigma
        self.zone_b_upper = center_line + sigma
        self.zone_b_lower = center_line - sigma

    def apply_rules(self, data: np.ndarray) -> Dict[str, np.ndarray]:
        """Apply all rules

        Parameters:
        -----------
        data : np.ndarray
            Time series data

        Returns:
        --------
        violations : dict
            Boolean arrays indicating violation points for each rule
        """
        n = len(data)
        violations = {f'Rule{i}': np.zeros(n, dtype=bool) for i in range(1, 9)}

        for i in range(n):
            # Rule 1: One point beyond 3σ
            if data[i] > self.ucl or data[i] < self.lcl:
                violations['Rule1'][i] = True

            # Rule 2: Two out of three consecutive points between 2σ and 3σ
            if i >= 2:
                window = data[i-2:i+1]
                in_zone_a = np.sum((window > self.zone_a_upper) |
                                   (window < self.zone_a_lower))
                if in_zone_a >= 2:
                    violations['Rule2'][i] = True

            # Rule 3: Four out of five consecutive points between 1σ and 2σ
            if i >= 4:
                window = data[i-4:i+1]
                in_zone_b = np.sum(
                    ((window > self.zone_b_upper) & (window <= self.zone_a_upper)) |
                    ((window < self.zone_b_lower) & (window >= self.zone_a_lower))
                )
                if in_zone_b >= 4:
                    violations['Rule3'][i] = True

            # Rule 4: Eight consecutive points on same side
            if i >= 7:
                window = data[i-7:i+1]
                if np.all(window > self.cl) or np.all(window < self.cl):
                    violations['Rule4'][i] = True

            # Rule 5: Six consecutive points monotonically increasing or decreasing
            if i >= 5:
                window = data[i-5:i+1]
                diffs = np.diff(window)
                if np.all(diffs > 0) or np.all(diffs < 0):
                    violations['Rule5'][i] = True

            # Rule 6: Fourteen consecutive points alternating up and down
            if i >= 13:
                window = data[i-13:i+1]
                diffs = np.diff(window)
                signs = np.sign(diffs)
                # Check if signs alternate
                alternating = True
                for j in range(len(signs) - 1):
                    if signs[j] * signs[j+1] >= 0:
                        alternating = False
                        break
                if alternating:
                    violations['Rule6'][i] = True

            # Rule 7: Fifteen consecutive points within 1σ
            if i >= 14:
                window = data[i-14:i+1]
                if np.all((window >= self.zone_b_lower) &
                         (window <= self.zone_b_upper)):
                    violations['Rule7'][i] = True

            # Rule 8: Eight consecutive points beyond 1σ
            if i >= 7:
                window = data[i-7:i+1]
                if np.all((window > self.zone_b_upper) |
                         (window < self.zone_b_lower)):
                    violations['Rule8'][i] = True

        return violations

    def plot_with_zones(self, data: np.ndarray,
                        title: str = "Control Chart with Western Electric Rules"):
        """Plot control chart with zones"""
        violations = self.apply_rules(data)
        n = len(data)

        # Combine all violation points
        any_violation = np.zeros(n, dtype=bool)
        for rule_violations in violations.values():
            any_violation |= rule_violations

        plt.figure(figsize=(14, 7))

        # Display zones with color coding
        plt.axhspan(self.ucl, self.zone_a_upper,
                   color='#e74c3c', alpha=0.1, label='Zone A (2-3σ)')
        plt.axhspan(self.zone_a_upper, self.zone_b_upper,
                   color='#f39c12', alpha=0.1, label='Zone B (1-2σ)')
        plt.axhspan(self.zone_b_upper, self.cl,
                   color='#2ecc71', alpha=0.1, label='Zone C (0-1σ)')
        plt.axhspan(self.cl, self.zone_b_lower,
                   color='#2ecc71', alpha=0.1)
        plt.axhspan(self.zone_b_lower, self.zone_a_lower,
                   color='#f39c12', alpha=0.1)
        plt.axhspan(self.zone_a_lower, self.lcl,
                   color='#e74c3c', alpha=0.1)

        # Plot data
        plt.plot(range(1, n+1), data, marker='o', color='#2c3e50',
                linewidth=1.5, markersize=5, label='Data', zorder=3)

        # Control limits
        plt.axhline(self.cl, color='#11998e', linestyle='-',
                   linewidth=2, label='CL', zorder=2)
        plt.axhline(self.ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='UCL (3σ)', zorder=2)
        plt.axhline(self.lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='LCL (3σ)', zorder=2)

        # Zone boundaries (dotted)
        plt.axhline(self.zone_a_upper, color='gray', linestyle=':',
                   linewidth=1, alpha=0.5, zorder=1)
        plt.axhline(self.zone_a_lower, color='gray', linestyle=':',
                   linewidth=1, alpha=0.5, zorder=1)
        plt.axhline(self.zone_b_upper, color='gray', linestyle=':',
                   linewidth=1, alpha=0.5, zorder=1)
        plt.axhline(self.zone_b_lower, color='gray', linestyle=':',
                   linewidth=1, alpha=0.5, zorder=1)

        # Highlight violation points
        if any_violation.any():
            violation_indices = np.where(any_violation)[0]
            plt.scatter(violation_indices + 1, data[violation_indices],
                       color='red', s=150, marker='X', zorder=5,
                       label='Rule Violation')

        plt.xlabel('Observation Number', fontsize=11)
        plt.ylabel('Value', fontsize=11)
        plt.title(title, fontsize=13, fontweight='bold')
        plt.legend(loc='best', ncol=2)
        plt.grid(True, alpha=0.3, zorder=0)
        plt.tight_layout()
        plt.show()

        # Output violation summary
        print("\n=== Western Electric Rules Violations ===")
        for rule_name, rule_violations in violations.items():
            n_violations = rule_violations.sum()
            if n_violations > 0:
                indices = np.where(rule_violations)[0] + 1
                print(f"{rule_name}: {n_violations} times "
                      f"(Observations {list(indices[:5])}{'...' if n_violations > 5 else ''})")


# Example usage: Detecting various anomaly patterns
np.random.seed(42)

# Generate data (100 observations)
n = 100
data = np.random.normal(0, 1, n)

# Insert anomaly patterns
# Rule 1 violation: Exceeds 3σ
data[20] = 3.5

# Rule 4 violation: Eight consecutive points on upper side
data[40:48] += 0.5

# Rule 5 violation: Monotonic increasing trend
data[65:71] = np.linspace(0, 2, 6)

# Rule 7 violation: Reduced variation (within 1σ)
data[85:100] = np.random.normal(0, 0.2, 15)

# Apply Western Electric Rules
cl = 0.0
ucl = 3.0
lcl = -3.0

we_rules = WesternElectricRules(cl, ucl, lcl)
we_rules.plot_with_zones(data,
    title="Western Electric Rules - Anomaly Pattern Detection")

# Expected output:
# === Western Electric Rules Violations ===
# Rule1: 1 time (Observations [21])
# Rule4: 1 time (Observations [48])
# Rule5: 1 time (Observations [71])
# Rule7: 1 time (Observations [100])

2.6 Real-time Monitoring System Implementation

📊 Example 8: Real-time SPC Monitoring System

Sequential update monitoring system usable in actual manufacturing settings

# ===================================
# Example 8: Real-time SPC Monitoring System
# ===================================

import time
from datetime import datetime
from collections import deque

class RealTimeSPCMonitor:
    """Real-time SPC Monitoring System

    System that sequentially updates control charts as new data arrives,
    immediately detecting and notifying anomalies.

    Features:
    ---------
    - Simultaneous monitoring of EWMA, CUSUM, and Shewhart charts
    - High-sensitivity detection with Western Electric Rules
    - Alert notification functionality
    - Historical data retention and visualization
    """

    def __init__(self, target: float, std: float,
                 window_size: int = 100):
        """
        Parameters:
        -----------
        target : float
            Target value
        std : float
            Process standard deviation
        window_size : int
            Maximum number of historical data points to display
        """
        self.target = target
        self.std = std
        self.window_size = window_size

        # Control limits
        self.ucl = target + 3 * std
        self.lcl = target - 3 * std

        # Initialize each method
        self.ewma_chart = EWMAControlChart(target, std, lambda_=0.2)
        self.cusum_chart = CUSUMControlChart(target, std, k=0.5, h=5.0)
        self.we_rules = WesternElectricRules(target, self.ucl, self.lcl)

        # Data history (efficiently managed with deque)
        self.data_history = deque(maxlen=window_size)
        self.timestamps = deque(maxlen=window_size)
        self.ewma_history = deque(maxlen=window_size)

        # Alert history
        self.alerts = []

    def add_observation(self, value: float,
                       timestamp: datetime = None) -> Dict:
        """Add new observation and perform anomaly detection

        Parameters:
        -----------
        value : float
            New observation value
        timestamp : datetime, optional
            Timestamp (current time if omitted)

        Returns:
        --------
        status : dict
            Detection results and alert information
        """
        if timestamp is None:
            timestamp = datetime.now()

        # Add data
        self.data_history.append(value)
        self.timestamps.append(timestamp)

        # Anomaly detection
        alerts = []

        # 1. Shewhart control chart (3σ rule)
        if value > self.ucl:
            alerts.append({
                'method': 'Shewhart',
                'type': 'UCL violation',
                'value': value,
                'severity': 'HIGH'
            })
        elif value < self.lcl:
            alerts.append({
                'method': 'Shewhart',
                'type': 'LCL violation',
                'value': value,
                'severity': 'HIGH'
            })

        # 2. EWMA control chart
        if len(self.data_history) >= 1:
            data_array = np.array(self.data_history)
            ewma_results = self.ewma_chart.fit_predict(data_array)

            current_ewma = ewma_results['ewma'][-1]
            current_ucl = ewma_results['ucl'][-1]
            current_lcl = ewma_results['lcl'][-1]

            self.ewma_history.append(current_ewma)

            if ewma_results['ooc'][-1]:
                alerts.append({
                    'method': 'EWMA',
                    'type': 'EWMA limit violation',
                    'value': current_ewma,
                    'severity': 'MEDIUM'
                })

        # 3. CUSUM control chart
        if len(self.data_history) >= 1:
            data_array = np.array(self.data_history)
            cusum_results = self.cusum_chart.fit_predict(data_array)

            if cusum_results['ooc_high'][-1]:
                alerts.append({
                    'method': 'CUSUM',
                    'type': 'Upward shift detected',
                    'value': cusum_results['cusum_high'][-1],
                    'severity': 'MEDIUM'
                })

            if cusum_results['ooc_low'][-1]:
                alerts.append({
                    'method': 'CUSUM',
                    'type': 'Downward shift detected',
                    'value': cusum_results['cusum_low'][-1],
                    'severity': 'MEDIUM'
                })

        # 4. Western Electric Rules
        if len(self.data_history) >= 15:  # Minimum data required
            data_array = np.array(self.data_history)
            violations = self.we_rules.apply_rules(data_array)

            for rule_name, rule_violations in violations.items():
                if rule_violations[-1]:  # Violation at latest point
                    alerts.append({
                        'method': 'Western Electric',
                        'type': f'{rule_name} violation',
                        'value': value,
                        'severity': 'LOW'
                    })

        # Save alerts
        if alerts:
            for alert in alerts:
                alert['timestamp'] = timestamp
                self.alerts.append(alert)

        # Return status
        status = {
            'value': value,
            'timestamp': timestamp,
            'in_control': len(alerts) == 0,
            'alerts': alerts,
            'n_observations': len(self.data_history)
        }

        return status

    def get_summary(self) -> Dict:
        """Summary of current monitoring status"""
        if len(self.data_history) == 0:
            return {'status': 'No data'}

        data_array = np.array(self.data_history)

        return {
            'n_observations': len(self.data_history),
            'current_value': data_array[-1],
            'process_mean': data_array.mean(),
            'process_std': data_array.std(ddof=1),
            'target': self.target,
            'n_alerts_total': len(self.alerts),
            'n_alerts_last_10': sum(1 for a in self.alerts
                                   if len(self.data_history) -
                                   list(self.timestamps).index(a['timestamp']) <= 10),
            'alert_methods': list(set(a['method'] for a in self.alerts))
        }

    def plot_dashboard(self):
        """Real-time monitoring dashboard"""
        if len(self.data_history) < 2:
            print("Insufficient data (minimum 2 points required)")
            return

        data_array = np.array(self.data_history)
        n = len(data_array)

        fig, axes = plt.subplots(3, 1, figsize=(14, 10))

        # 1. Shewhart control chart
        ax1 = axes[0]
        x_indices = range(1, n+1)

        ax1.plot(x_indices, data_array, marker='o', color='#2c3e50',
                linewidth=1.5, markersize=4, label='Data')
        ax1.axhline(self.target, color='#11998e', linestyle='-',
                   linewidth=2, label='Target')
        ax1.axhline(self.ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='UCL')
        ax1.axhline(self.lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='LCL')

        # Highlight Shewhart violation points
        ooc_indices = (data_array > self.ucl) | (data_array < self.lcl)
        if ooc_indices.any():
            ooc_x = np.where(ooc_indices)[0] + 1
            ax1.scatter(ooc_x, data_array[ooc_indices],
                       color='red', s=100, zorder=5)

        ax1.set_ylabel('Value', fontsize=10)
        ax1.set_title('Shewhart Control Chart (3σ)',
                     fontsize=12, fontweight='bold')
        ax1.legend(loc='best', fontsize=9)
        ax1.grid(True, alpha=0.3)

        # 2. EWMA control chart
        ax2 = axes[1]

        if len(self.ewma_history) > 0:
            ewma_array = np.array(self.ewma_history)
            ewma_results = self.ewma_chart.fit_predict(data_array)

            ax2.plot(x_indices, data_array, marker='.', color='gray',
                    linewidth=0.5, markersize=3, alpha=0.5, label='Original')
            ax2.plot(x_indices, ewma_array, marker='o', color='#3498db',
                    linewidth=2, markersize=4, label='EWMA')
            ax2.axhline(self.target, color='#11998e', linestyle='-',
                       linewidth=2, label='Target')
            ax2.plot(x_indices, ewma_results['ucl'],
                    color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
            ax2.plot(x_indices, ewma_results['lcl'],
                    color='#e74c3c', linestyle='--', linewidth=2, label='LCL')

            # Highlight EWMA violation points
            if ewma_results['ooc'].any():
                ooc_x = np.where(ewma_results['ooc'])[0] + 1
                ax2.scatter(ooc_x, ewma_array[ewma_results['ooc']],
                           color='red', s=100, zorder=5)

        ax2.set_ylabel('EWMA Value', fontsize=10)
        ax2.set_title('EWMA Control Chart (λ=0.2)',
                     fontsize=12, fontweight='bold')
        ax2.legend(loc='best', fontsize=9)
        ax2.grid(True, alpha=0.3)

        # 3. CUSUM control chart
        ax3 = axes[2]

        cusum_results = self.cusum_chart.fit_predict(data_array)

        ax3.plot(x_indices, cusum_results['cusum_high'],
                marker='^', color='#e74c3c', linewidth=1.5,
                markersize=4, label='CUSUM High')
        ax3.plot(x_indices, cusum_results['cusum_low'],
                marker='v', color='#3498db', linewidth=1.5,
                markersize=4, label='CUSUM Low')
        ax3.axhline(self.cusum_chart.h, color='#2c3e50',
                   linestyle='--', linewidth=2, label='Decision Interval (h)')
        ax3.axhline(0, color='gray', linestyle='-', linewidth=1)

        # Highlight CUSUM violation points
        if cusum_results['ooc_high'].any():
            ooc_x = np.where(cusum_results['ooc_high'])[0] + 1
            ax3.scatter(ooc_x, cusum_results['cusum_high'][cusum_results['ooc_high']],
                       color='red', s=100, zorder=5, marker='^')
        if cusum_results['ooc_low'].any():
            ooc_x = np.where(cusum_results['ooc_low'])[0] + 1
            ax3.scatter(ooc_x, cusum_results['cusum_low'][cusum_results['ooc_low']],
                       color='red', s=100, zorder=5, marker='v')

        ax3.set_xlabel('Observation Number', fontsize=10)
        ax3.set_ylabel('CUSUM Value', fontsize=10)
        ax3.set_title('CUSUM Control Chart',
                     fontsize=12, fontweight='bold')
        ax3.legend(loc='best', fontsize=9)
        ax3.grid(True, alpha=0.3)

        plt.suptitle('Real-time SPC Monitoring Dashboard',
                    fontsize=15, fontweight='bold', y=0.995)
        plt.tight_layout()
        plt.show()


# Example usage: Real-time monitoring simulation
np.random.seed(42)

# Initialize monitoring system
target = 100.0
std = 1.0
monitor = RealTimeSPCMonitor(target, std, window_size=150)

print("=== Real-time SPC Monitoring System ===")
print(f"Target value: {target}")
print(f"Standard deviation: {std}")
print("\nSequentially adding observation data...\n")

# Simulation: Sequentially add 150 observations
for i in range(150):
    # Normal data
    if i < 60:
        value = np.random.normal(target, std)
    # Observations 60-80: +1σ shift
    elif i < 80:
        value = np.random.normal(target + std, std)
    # Observations 80-120: Return to normal
    elif i < 120:
        value = np.random.normal(target, std)
    # Observations 120-150: Increased variation
    else:
        value = np.random.normal(target, std * 1.5)

    # Add observation value
    status = monitor.add_observation(value)

    # Display if alert
    if not status['in_control']:
        print(f"⚠️ Observation #{i+1}: Alert triggered")
        for alert in status['alerts']:
            print(f"  - {alert['method']}: {alert['type']} "
                  f"(Severity: {alert['severity']})")

# Display summary
print("\n=== Monitoring Summary ===")
summary = monitor.get_summary()
for key, value in summary.items():
    print(f"{key}: {value}")

# Display dashboard
monitor.plot_dashboard()

# Expected output:
# === Real-time SPC Monitoring System ===
# Target value: 100.0
# Standard deviation: 1.0
#
# Sequentially adding observation data...
#
# ⚠️ Observation #67: Alert triggered
#   - EWMA: EWMA limit violation (Severity: MEDIUM)
#   - CUSUM: Upward shift detected (Severity: MEDIUM)
# ...(multiple alerts)
#
# === Monitoring Summary ===
# n_observations: 150
# current_value: 101.234
# process_mean: 100.567
# process_std: 1.234
# target: 100.0
# n_alerts_total: 45
# n_alerts_last_10: 8
# alert_methods: ['EWMA', 'CUSUM', 'Western Electric']

Learning Objectives Check

Upon completing this chapter, you should be able to explain and implement the following:

Basic Understanding

Practical Skills

Application Capabilities

Practice Problems

Easy (Basic Confirmation)

Q1: If an X-bar chart has upper control limit (UCL) of 80.5 and center line (CL) of 79.0, what is the lower control limit (LCL)?

View Answer

Answer: 77.5

Explanation:
UCL = CL + 3σ
LCL = CL - 3σ
UCL - CL = 80.5 - 79.0 = 1.5 = 3σ
Therefore, LCL = CL - 1.5 = 79.0 - 1.5 = 77.5

Medium (Application)

Q2: For a process with average defect rate of 2% and sample size of 100, calculate the upper control limit (UCL) for a P-chart.

View Answer

Answer: UCL = 0.062 (6.2%)

Calculation:
p̄ = 0.02
n = 100
σ_p = √(p̄(1-p̄)/n) = √(0.02 × 0.98 / 100) = √0.000196 = 0.014
UCL = p̄ + 3σ_p = 0.02 + 3 × 0.014 = 0.02 + 0.042 = 0.062

Key Point: Larger sample sizes bring control limits closer to the center line (improved detection power).

Hard (Advanced)

Q3: Both CUSUM and EWMA excel at detecting small shifts, but compare their strengths and weaknesses and explain which should be chosen in what situations.

View Answer

Comparison Table:

Item CUSUM EWMA
Detection Speed Very fast (excellent ARL) Fast
Parameter Setting Somewhat difficult (k, h) Relatively easy (λ)
Ease of Interpretation Difficult (cumulative statistic) Easy (smoothed value)
Shift Direction Detection Separate for up/down (clear) Single statistic
Implementation Complexity Moderate Simple

Selection Criteria:

  • Choose CUSUM when:
    • Fastest detection required (pharmaceuticals, semiconductors, etc.)
    • Identifying shift direction is important
    • Specialized SPC knowledge available
  • Choose EWMA when:
    • Implementation simplicity is important
    • Need for easy understanding by floor operators
    • Frequent parameter adjustment required

Best Practice: For critical processes, using both methods concurrently for complementary monitoring is recommended.

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer