第2章:統計的工程管理(SPC)と管理図

プロセス変動を可視化し、異常を早期検知するための実践的手法

📚 品質管理とQA入門シリーズ ⏱️ 読了時間: 40分 💻 Python実装例: 8つ 📊 難易度: 中級

2.1 統計的工程管理(SPC)の基礎

統計的工程管理(Statistical Process Control: SPC)は、プロセスの変動を統計的に監視し、異常を早期に検知するための手法です。ウォルター・シューハート博士が1920年代にベル研究所で開発した管理図を基礎としています。

2.1.1 プロセス変動の2つの原因

💡 変動の分類

  • 偶然原因(Common Cause): プロセスに固有の避けられない変動(原材料のばらつき、測定誤差など)
  • 異常原因(Special Cause): 通常でない特定の要因による変動(設備故障、操作ミスなど)

SPCの目的は、偶然原因による正常な変動と異常原因による異常な変動を統計的に区別し、異常原因を早期に発見して対処することです。

2.1.2 管理図の基本構造

管理図は以下の要素で構成されます:

📊 Example 1: X-bar and R Control Charts (平均-範囲管理図)

最も基本的な管理図。サブグループの平均と範囲を同時に管理します。

# ===================================
# Example 1: X-bar and R Control Charts Implementation
# ===================================

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Tuple, Dict
import warnings
warnings.filterwarnings('ignore')

class XbarRControlChart:
    """X-bar and R管理図の実装

    連続変数のプロセス管理に使用。サブグループの平均(X-bar)と
    範囲(R)を同時に監視し、プロセスの中心と散らばりを管理する。

    Parameters:
    -----------
    subgroup_size : int
        サブグループのサンプル数 (通常2-10)

    Attributes:
    -----------
    A2, D3, D4 : float
        管理限界計算用の定数(サブグループサイズに依存)
    """

    # サブグループサイズ別の定数(JIS Z 9020-2より)
    CONSTANTS = {
        2: {'A2': 1.880, 'D3': 0, 'D4': 3.267},
        3: {'A2': 1.023, 'D3': 0, 'D4': 2.575},
        4: {'A2': 0.729, 'D3': 0, 'D4': 2.282},
        5: {'A2': 0.577, 'D3': 0, 'D4': 2.115},
        6: {'A2': 0.483, 'D3': 0, 'D4': 2.004},
        7: {'A2': 0.419, 'D3': 0.076, 'D4': 1.924},
        8: {'A2': 0.373, 'D3': 0.136, 'D4': 1.864},
        9: {'A2': 0.337, 'D3': 0.184, 'D4': 1.816},
        10: {'A2': 0.308, 'D3': 0.223, 'D4': 1.777}
    }

    def __init__(self, subgroup_size: int = 5):
        if subgroup_size not in self.CONSTANTS:
            raise ValueError(f"Subgroup size must be 2-10, got {subgroup_size}")

        self.n = subgroup_size
        self.A2 = self.CONSTANTS[subgroup_size]['A2']
        self.D3 = self.CONSTANTS[subgroup_size]['D3']
        self.D4 = self.CONSTANTS[subgroup_size]['D4']

        self.xbar_cl = None
        self.xbar_ucl = None
        self.xbar_lcl = None
        self.r_cl = None
        self.r_ucl = None
        self.r_lcl = None

    def fit(self, data: np.ndarray) -> 'XbarRControlChart':
        """管理限界を計算(フェーズI: 初期データから基準を設定)

        Parameters:
        -----------
        data : np.ndarray
            shape (n_subgroups, subgroup_size) のデータ

        Returns:
        --------
        self : XbarRControlChart
        """
        if data.shape[1] != self.n:
            raise ValueError(f"Data subgroup size {data.shape[1]} != {self.n}")

        # 各サブグループの平均と範囲を計算
        xbar = data.mean(axis=1)
        r = data.max(axis=1) - data.min(axis=1)

        # 中心線と管理限界の計算
        self.xbar_cl = xbar.mean()
        self.r_cl = r.mean()

        # X-bar管理図の管理限界
        self.xbar_ucl = self.xbar_cl + self.A2 * self.r_cl
        self.xbar_lcl = self.xbar_cl - self.A2 * self.r_cl

        # R管理図の管理限界
        self.r_ucl = self.D4 * self.r_cl
        self.r_lcl = self.D3 * self.r_cl

        return self

    def predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
        """新しいデータに対して管理状態を判定(フェーズII: 継続監視)

        Parameters:
        -----------
        data : np.ndarray
            shape (n_subgroups, subgroup_size) のデータ

        Returns:
        --------
        results : dict
            'xbar', 'r', 'xbar_ooc', 'r_ooc' を含む辞書
            ooc = out of control (管理外れ)
        """
        if self.xbar_cl is None:
            raise ValueError("Call fit() first to establish control limits")

        xbar = data.mean(axis=1)
        r = data.max(axis=1) - data.min(axis=1)

        # 管理外れの判定
        xbar_ooc = (xbar > self.xbar_ucl) | (xbar < self.xbar_lcl)
        r_ooc = (r > self.r_ucl) | (r < self.r_lcl)

        return {
            'xbar': xbar,
            'r': r,
            'xbar_ooc': xbar_ooc,
            'r_ooc': r_ooc
        }

    def plot(self, data: np.ndarray, title: str = "X-bar and R Control Charts"):
        """管理図のプロット

        Parameters:
        -----------
        data : np.ndarray
            shape (n_subgroups, subgroup_size) のデータ
        title : str
            グラフタイトル
        """
        results = self.predict(data)
        n_points = len(results['xbar'])

        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))

        # X-bar管理図
        ax1.plot(range(1, n_points+1), results['xbar'],
                marker='o', color='#2c3e50', linewidth=1.5, markersize=6)
        ax1.axhline(self.xbar_cl, color='#11998e', linestyle='-',
                   linewidth=2, label='CL')
        ax1.axhline(self.xbar_ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='UCL')
        ax1.axhline(self.xbar_lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='LCL')

        # 管理外れ点を強調
        ooc_indices = np.where(results['xbar_ooc'])[0]
        if len(ooc_indices) > 0:
            ax1.scatter(ooc_indices + 1, results['xbar'][ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        ax1.set_xlabel('Subgroup Number', fontsize=11)
        ax1.set_ylabel('Subgroup Mean (X-bar)', fontsize=11)
        ax1.set_title('X-bar Control Chart', fontsize=13, fontweight='bold')
        ax1.legend(loc='best')
        ax1.grid(True, alpha=0.3)

        # R管理図
        ax2.plot(range(1, n_points+1), results['r'],
                marker='s', color='#2c3e50', linewidth=1.5, markersize=6)
        ax2.axhline(self.r_cl, color='#11998e', linestyle='-',
                   linewidth=2, label='CL')
        ax2.axhline(self.r_ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='UCL')
        ax2.axhline(self.r_lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='LCL')

        # 管理外れ点を強調
        ooc_indices = np.where(results['r_ooc'])[0]
        if len(ooc_indices) > 0:
            ax2.scatter(ooc_indices + 1, results['r'][ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        ax2.set_xlabel('Subgroup Number', fontsize=11)
        ax2.set_ylabel('Subgroup Range (R)', fontsize=11)
        ax2.set_title('R Control Chart', fontsize=13, fontweight='bold')
        ax2.legend(loc='best')
        ax2.grid(True, alpha=0.3)

        plt.suptitle(title, fontsize=15, fontweight='bold', y=0.995)
        plt.tight_layout()
        plt.show()


# 使用例: 化学プロセスの温度管理
# Phase I: 初期25サブグループで管理限界を設定
np.random.seed(42)
n_subgroups_phase1 = 25
subgroup_size = 5
target_temp = 80.0  # 目標温度 [℃]

# 正常運転データ(平均80℃、標準偏差0.5℃)
phase1_data = np.random.normal(target_temp, 0.5,
                               (n_subgroups_phase1, subgroup_size))

# 管理図の構築
xbar_r_chart = XbarRControlChart(subgroup_size=subgroup_size)
xbar_r_chart.fit(phase1_data)

print("=== Phase I: 管理限界の設定 ===")
print(f"X-bar管理図:")
print(f"  CL  = {xbar_r_chart.xbar_cl:.3f} ℃")
print(f"  UCL = {xbar_r_chart.xbar_ucl:.3f} ℃")
print(f"  LCL = {xbar_r_chart.xbar_lcl:.3f} ℃")
print(f"\nR管理図:")
print(f"  CL  = {xbar_r_chart.r_cl:.3f} ℃")
print(f"  UCL = {xbar_r_chart.r_ucl:.3f} ℃")
print(f"  LCL = {xbar_r_chart.r_lcl:.3f} ℃")

# Phase II: 継続監視(30サブグループ、途中で異常発生)
n_subgroups_phase2 = 30
phase2_data = np.random.normal(target_temp, 0.5,
                               (n_subgroups_phase2, subgroup_size))

# サブグループ15-20で平均シフト(+1.5℃)
phase2_data[14:20, :] += 1.5

# サブグループ25で散らばり増大
phase2_data[24, :] = np.random.normal(target_temp, 2.0, subgroup_size)

# 監視
results = xbar_r_chart.predict(phase2_data)

print("\n=== Phase II: 継続監視 ===")
print(f"X-bar管理外れ: {results['xbar_ooc'].sum()}個 "
      f"(サブグループ {np.where(results['xbar_ooc'])[0] + 1})")
print(f"R管理外れ: {results['r_ooc'].sum()}個 "
      f"(サブグループ {np.where(results['r_ooc'])[0] + 1})")

# プロット
xbar_r_chart.plot(phase2_data,
                  title="Chemical Process Temperature Monitoring")

# 期待される出力:
# === Phase I: 管理限界の設定 ===
# X-bar管理図:
#   CL  = 79.995 ℃
#   UCL = 80.666 ℃
#   LCL = 79.323 ℃
#
# R管理図:
#   CL  = 1.165 ℃
#   UCL = 2.464 ℃
#   LCL = 0.000 ℃
#
# === Phase II: 継続監視 ===
# X-bar管理外れ: 5個 (サブグループ [15 16 17 18 19])
# R管理外れ: 1個 (サブグループ [25])

⚠️ 注意: 管理限界と規格限界の違い

管理限界(±3σ)はプロセスの変動から計算され、プロセスが統計的に安定しているかを判定します。一方、規格限界は顧客要求や設計仕様から決まり、製品の合否を判定します。管理限界内でも規格外れの可能性があります。

2.2 計数値データの管理図

計量値(連続変数)ではなく、不良品数や欠陥数などの計数値データに対しては、異なる管理図を使用します。

2.2.1 P管理図(不良率管理図)

📊 Example 2: P-Chart (不良率管理図)

検査ロットから抜き取った製品の不良率を管理します。

# ===================================
# Example 2: P-Chart for Proportion Defective
# ===================================

class PControlChart:
    """P管理図(不良率管理図)の実装

    二項分布に従う不良率データの管理に使用。
    サンプルサイズが一定でなくても適用可能。

    Attributes:
    -----------
    p_bar : float
        全体の平均不良率
    ucl, lcl : np.ndarray
        各サンプルサイズに応じた管理限界
    """

    def __init__(self):
        self.p_bar = None
        self.ucl = None
        self.lcl = None
        self.n = None

    def fit(self, defectives: np.ndarray, sample_sizes: np.ndarray):
        """管理限界を計算

        Parameters:
        -----------
        defectives : np.ndarray
            各ロットの不良品数
        sample_sizes : np.ndarray
            各ロットのサンプルサイズ
        """
        # 平均不良率の計算
        self.p_bar = defectives.sum() / sample_sizes.sum()
        self.n = sample_sizes

        # 各サンプルサイズに応じた管理限界(±3σ)
        # σ_p = sqrt(p_bar * (1 - p_bar) / n)
        sigma_p = np.sqrt(self.p_bar * (1 - self.p_bar) / self.n)

        self.ucl = self.p_bar + 3 * sigma_p
        self.lcl = self.p_bar - 3 * sigma_p

        # LCLは0未満にならない
        self.lcl = np.maximum(self.lcl, 0)

        # UCLは1を超えない
        self.ucl = np.minimum(self.ucl, 1)

        return self

    def predict(self, defectives: np.ndarray,
                sample_sizes: np.ndarray) -> Dict[str, np.ndarray]:
        """管理状態を判定

        Parameters:
        -----------
        defectives : np.ndarray
            各ロットの不良品数
        sample_sizes : np.ndarray
            各ロットのサンプルサイズ

        Returns:
        --------
        results : dict
            'p', 'ucl', 'lcl', 'ooc' を含む辞書
        """
        if self.p_bar is None:
            raise ValueError("Call fit() first")

        # 不良率の計算
        p = defectives / sample_sizes

        # 管理限界の再計算(サンプルサイズが異なる場合)
        sigma_p = np.sqrt(self.p_bar * (1 - self.p_bar) / sample_sizes)
        ucl = np.minimum(self.p_bar + 3 * sigma_p, 1)
        lcl = np.maximum(self.p_bar - 3 * sigma_p, 0)

        # 管理外れの判定
        ooc = (p > ucl) | (p < lcl)

        return {
            'p': p,
            'ucl': ucl,
            'lcl': lcl,
            'ooc': ooc
        }

    def plot(self, defectives: np.ndarray, sample_sizes: np.ndarray,
             title: str = "P Control Chart"):
        """P管理図のプロット"""
        results = self.predict(defectives, sample_sizes)
        n_points = len(results['p'])

        plt.figure(figsize=(12, 6))

        # 不良率のプロット
        plt.plot(range(1, n_points+1), results['p'],
                marker='o', color='#2c3e50', linewidth=1.5,
                markersize=6, label='Proportion Defective')

        # 中心線
        plt.axhline(self.p_bar, color='#11998e', linestyle='-',
                   linewidth=2, label=f'CL = {self.p_bar:.4f}')

        # 管理限界(サンプルサイズが変わる場合は階段状)
        plt.plot(range(1, n_points+1), results['ucl'],
                color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
        plt.plot(range(1, n_points+1), results['lcl'],
                color='#e74c3c', linestyle='--', linewidth=2, label='LCL')

        # 管理外れ点を強調
        ooc_indices = np.where(results['ooc'])[0]
        if len(ooc_indices) > 0:
            plt.scatter(ooc_indices + 1, results['p'][ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        plt.xlabel('Lot Number', fontsize=11)
        plt.ylabel('Proportion Defective', fontsize=11)
        plt.title(title, fontsize=13, fontweight='bold')
        plt.legend(loc='best')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()


# 使用例: 半導体製造の不良率管理
np.random.seed(42)

# Phase I: 初期30ロットで管理限界を設定
n_lots_phase1 = 30
sample_sizes_phase1 = np.random.randint(100, 200, n_lots_phase1)
true_defect_rate = 0.02  # 真の不良率2%

# 二項分布で不良品数を生成
defectives_phase1 = np.random.binomial(sample_sizes_phase1,
                                       true_defect_rate)

# P管理図の構築
p_chart = PControlChart()
p_chart.fit(defectives_phase1, sample_sizes_phase1)

print("=== P管理図: 管理限界の設定 ===")
print(f"平均不良率 (p-bar) = {p_chart.p_bar:.4f} ({p_chart.p_bar*100:.2f}%)")

# Phase II: 継続監視(40ロット、途中で不良率増加)
n_lots_phase2 = 40
sample_sizes_phase2 = np.random.randint(100, 200, n_lots_phase2)

defectives_phase2 = np.random.binomial(sample_sizes_phase2,
                                       true_defect_rate)

# ロット25-30で不良率が3倍に増加(工程異常)
defectives_phase2[24:30] = np.random.binomial(
    sample_sizes_phase2[24:30], true_defect_rate * 3)

# 監視
results = p_chart.predict(defectives_phase2, sample_sizes_phase2)

print(f"\n=== Phase II: 継続監視 ===")
print(f"管理外れロット: {results['ooc'].sum()}個")
print(f"該当ロット番号: {np.where(results['ooc'])[0] + 1}")

# プロット
p_chart.plot(defectives_phase2, sample_sizes_phase2,
             title="Semiconductor Manufacturing - Defect Rate Monitoring")

# 期待される出力:
# === P管理図: 管理限界の設定 ===
# 平均不良率 (p-bar) = 0.0189 (1.89%)
#
# === Phase II: 継続監視 ===
# 管理外れロット: 6個
# 該当ロット番号: [25 26 27 28 29 30]

2.2.2 C管理図(欠陥数管理図)

📊 Example 3: C-Chart (欠陥数管理図)

一定の検査単位あたりの欠陥数を管理します。ポアソン分布を仮定。

# ===================================
# Example 3: C-Chart for Count of Defects
# ===================================

class CControlChart:
    """C管理図(欠陥数管理図)の実装

    一定の検査単位(面積、時間、体積など)あたりの欠陥数を管理。
    欠陥数はポアソン分布に従うと仮定。

    Attributes:
    -----------
    c_bar : float
        平均欠陥数
    ucl, lcl : float
        管理限界
    """

    def __init__(self):
        self.c_bar = None
        self.ucl = None
        self.lcl = None

    def fit(self, defects: np.ndarray):
        """管理限界を計算

        Parameters:
        -----------
        defects : np.ndarray
            各検査単位の欠陥数
        """
        # 平均欠陥数の計算
        self.c_bar = defects.mean()

        # ポアソン分布の標準偏差 = sqrt(平均)
        # 管理限界 = 平均 ± 3 * sqrt(平均)
        sigma_c = np.sqrt(self.c_bar)

        self.ucl = self.c_bar + 3 * sigma_c
        self.lcl = self.c_bar - 3 * sigma_c

        # LCLは0未満にならない
        self.lcl = max(self.lcl, 0)

        return self

    def predict(self, defects: np.ndarray) -> Dict[str, np.ndarray]:
        """管理状態を判定

        Parameters:
        -----------
        defects : np.ndarray
            各検査単位の欠陥数

        Returns:
        --------
        results : dict
            'c', 'ooc' を含む辞書
        """
        if self.c_bar is None:
            raise ValueError("Call fit() first")

        # 管理外れの判定
        ooc = (defects > self.ucl) | (defects < self.lcl)

        return {
            'c': defects,
            'ooc': ooc
        }

    def plot(self, defects: np.ndarray, title: str = "C Control Chart"):
        """C管理図のプロット"""
        results = self.predict(defects)
        n_points = len(defects)

        plt.figure(figsize=(12, 6))

        # 欠陥数のプロット
        plt.plot(range(1, n_points+1), defects,
                marker='o', color='#2c3e50', linewidth=1.5,
                markersize=6, label='Count of Defects')

        # 中心線と管理限界
        plt.axhline(self.c_bar, color='#11998e', linestyle='-',
                   linewidth=2, label=f'CL = {self.c_bar:.2f}')
        plt.axhline(self.ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label=f'UCL = {self.ucl:.2f}')
        plt.axhline(self.lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label=f'LCL = {self.lcl:.2f}')

        # 管理外れ点を強調
        ooc_indices = np.where(results['ooc'])[0]
        if len(ooc_indices) > 0:
            plt.scatter(ooc_indices + 1, defects[ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        plt.xlabel('Inspection Unit', fontsize=11)
        plt.ylabel('Number of Defects', fontsize=11)
        plt.title(title, fontsize=13, fontweight='bold')
        plt.legend(loc='best')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()


# 使用例: 塗装面の欠陥管理(1m²あたりの欠陥数)
np.random.seed(42)

# Phase I: 初期30枚のパネルで管理限界を設定
n_panels_phase1 = 30
avg_defects = 3.5  # 平均3.5個/m²

# ポアソン分布で欠陥数を生成
defects_phase1 = np.random.poisson(avg_defects, n_panels_phase1)

# C管理図の構築
c_chart = CControlChart()
c_chart.fit(defects_phase1)

print("=== C管理図: 管理限界の設定 ===")
print(f"平均欠陥数 (c-bar) = {c_chart.c_bar:.2f} 個/m²")
print(f"UCL = {c_chart.ucl:.2f} 個/m²")
print(f"LCL = {c_chart.lcl:.2f} 個/m²")

# Phase II: 継続監視(50枚、途中で欠陥増加)
n_panels_phase2 = 50
defects_phase2 = np.random.poisson(avg_defects, n_panels_phase2)

# パネル30-35で欠陥が2倍に増加(塗料劣化)
defects_phase2[29:35] = np.random.poisson(avg_defects * 2, 6)

# 監視
results = c_chart.predict(defects_phase2)

print(f"\n=== Phase II: 継続監視 ===")
print(f"管理外れパネル: {results['ooc'].sum()}枚")
print(f"該当パネル番号: {np.where(results['ooc'])[0] + 1}")

# プロット
c_chart.plot(defects_phase2,
             title="Paint Defects Monitoring (per m²)")

# 期待される出力:
# === C管理図: 管理限界の設定 ===
# 平均欠陥数 (c-bar) = 3.40 個/m²
# UCL = 8.93 個/m²
# LCL = 0.00 個/m²
#
# === Phase II: 継続監視 ===
# 管理外れパネル: 2枚
# 該当パネル番号: [30 33]

2.3 高度な管理図手法

従来の管理図は大きな変化の検出には優れていますが、小さな変化の早期検知には限界があります。以下の手法はより感度の高い監視を可能にします。

2.3.1 CUSUM管理図(累積和管理図)

💡 CUSUMの特徴

平均値からの偏差を累積することで、小さな変化(0.5-2σ程度)を早期に検出します。シューハート管理図より平均4-8倍早く検知可能。

📊 Example 4: CUSUM Control Chart

累積和管理図の実装(表形式CUSUM)

# ===================================
# Example 4: CUSUM (Cumulative Sum) Control Chart
# ===================================

class CUSUMControlChart:
    """CUSUM管理図の実装(表形式CUSUM)

    平均値の小さなシフト(0.5-2σ)を早期に検出するための手法。
    上側と下側のCUSUM統計量を別々に計算。

    Parameters:
    -----------
    target : float
        目標値(プロセス平均)
    std : float
        プロセス標準偏差
    k : float
        許容限界(reference value)。通常は検出したいシフトの半分。
        デフォルト: 0.5σ (1σシフトを検出する場合)
    h : float
        決定間隔(decision interval)。管理限界として機能。
        デフォルト: 5σ (第一種過誤率 0.27%に相当)

    Attributes:
    -----------
    cusum_high : np.ndarray
        上側CUSUM統計量
    cusum_low : np.ndarray
        下側CUSUM統計量
    """

    def __init__(self, target: float, std: float,
                 k: float = 0.5, h: float = 5.0):
        self.target = target
        self.std = std
        self.k = k * std  # 許容限界(絶対値)
        self.h = h * std  # 決定間隔(絶対値)

        self.cusum_high = None
        self.cusum_low = None

    def fit_predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
        """CUSUM統計量を計算し、管理外れを検出

        Parameters:
        -----------
        data : np.ndarray
            観測データ

        Returns:
        --------
        results : dict
            'cusum_high', 'cusum_low', 'ooc_high', 'ooc_low' を含む辞書
        """
        n = len(data)
        self.cusum_high = np.zeros(n)
        self.cusum_low = np.zeros(n)

        # CUSUM統計量の逐次計算
        for i in range(n):
            if i == 0:
                # 初期値
                self.cusum_high[i] = max(0, data[i] - self.target - self.k)
                self.cusum_low[i] = max(0, self.target - data[i] - self.k)
            else:
                # 逐次更新式
                self.cusum_high[i] = max(0, self.cusum_high[i-1] +
                                         data[i] - self.target - self.k)
                self.cusum_low[i] = max(0, self.cusum_low[i-1] +
                                        self.target - data[i] - self.k)

        # 管理外れの判定
        ooc_high = self.cusum_high > self.h
        ooc_low = self.cusum_low > self.h

        return {
            'data': data,
            'cusum_high': self.cusum_high,
            'cusum_low': self.cusum_low,
            'ooc_high': ooc_high,
            'ooc_low': ooc_low
        }

    def plot(self, data: np.ndarray, title: str = "CUSUM Control Chart"):
        """CUSUM管理図のプロット"""
        results = self.fit_predict(data)
        n_points = len(data)

        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))

        # 元データのプロット
        ax1.plot(range(1, n_points+1), data,
                marker='o', color='#2c3e50', linewidth=1.5, markersize=5)
        ax1.axhline(self.target, color='#11998e', linestyle='-',
                   linewidth=2, label='Target')
        ax1.axhline(self.target + self.k, color='#f39c12',
                   linestyle=':', linewidth=1.5, label=f'Target ± k')
        ax1.axhline(self.target - self.k, color='#f39c12',
                   linestyle=':', linewidth=1.5)
        ax1.set_ylabel('Observation', fontsize=11)
        ax1.set_title('Process Data', fontsize=12, fontweight='bold')
        ax1.legend(loc='best')
        ax1.grid(True, alpha=0.3)

        # CUSUM統計量のプロット
        ax2.plot(range(1, n_points+1), results['cusum_high'],
                marker='^', color='#e74c3c', linewidth=1.5,
                markersize=5, label='CUSUM High')
        ax2.plot(range(1, n_points+1), results['cusum_low'],
                marker='v', color='#3498db', linewidth=1.5,
                markersize=5, label='CUSUM Low')
        ax2.axhline(self.h, color='#2c3e50', linestyle='--',
                   linewidth=2, label=f'Decision Interval (h={self.h/self.std:.1f}σ)')
        ax2.axhline(0, color='gray', linestyle='-', linewidth=1)

        # 管理外れ点を強調
        ooc_high_indices = np.where(results['ooc_high'])[0]
        ooc_low_indices = np.where(results['ooc_low'])[0]

        if len(ooc_high_indices) > 0:
            ax2.scatter(ooc_high_indices + 1,
                       results['cusum_high'][ooc_high_indices],
                       color='red', s=100, zorder=5, marker='^')
        if len(ooc_low_indices) > 0:
            ax2.scatter(ooc_low_indices + 1,
                       results['cusum_low'][ooc_low_indices],
                       color='red', s=100, zorder=5, marker='v')

        ax2.set_xlabel('Observation Number', fontsize=11)
        ax2.set_ylabel('CUSUM Statistic', fontsize=11)
        ax2.set_title('CUSUM Statistics', fontsize=12, fontweight='bold')
        ax2.legend(loc='best')
        ax2.grid(True, alpha=0.3)

        plt.suptitle(title, fontsize=15, fontweight='bold', y=0.995)
        plt.tight_layout()
        plt.show()


# 使用例: 化学プロセスの平均シフト検出
np.random.seed(42)

# 目標値と標準偏差
target_value = 100.0
process_std = 1.0

# データ生成(100観測)
n_obs = 100
data = np.random.normal(target_value, process_std, n_obs)

# 観測40から+0.8σの小さなシフト(シューハート管理図では検出困難)
data[40:] += 0.8 * process_std

# CUSUM管理図の適用(1σシフトを検出する設定)
cusum_chart = CUSUMControlChart(
    target=target_value,
    std=process_std,
    k=0.5,  # 1σシフトの半分
    h=5.0   # 決定間隔
)

results = cusum_chart.fit_predict(data)

print("=== CUSUM管理図 ===")
print(f"目標値: {target_value}")
print(f"許容限界 (k): {cusum_chart.k:.3f} ({cusum_chart.k/process_std:.1f}σ)")
print(f"決定間隔 (h): {cusum_chart.h:.3f} ({cusum_chart.h/process_std:.1f}σ)")

# 検出時点の特定
if results['ooc_high'].any():
    first_ooc = np.where(results['ooc_high'])[0][0] + 1
    print(f"\n上側シフト検出: 観測 #{first_ooc}")
    print(f"  実際のシフト開始: 観測 #41")
    print(f"  検出遅延: {first_ooc - 41}観測")

# プロット
cusum_chart.plot(data,
                 title="CUSUM Chart - Small Mean Shift Detection")

# 期待される出力:
# === CUSUM管理図 ===
# 目標値: 100.0
# 許容限界 (k): 0.500 (0.5σ)
# 決定間隔 (h): 5.000 (5.0σ)
#
# 上側シフト検出: 観測 #49
#   実際のシフト開始: 観測 #41
#   検出遅延: 8観測

2.3.2 EWMA管理図(指数重み付き移動平均管理図)

📊 Example 5: EWMA Control Chart

指数重み付き移動平均を用いた管理図。CUSUMと同様に小さなシフトを検出。

# ===================================
# Example 5: EWMA (Exponentially Weighted Moving Average) Control Chart
# ===================================

class EWMAControlChart:
    """EWMA管理図の実装

    過去の観測値に指数的に減衰する重みをつけて平滑化。
    CUSUMと同様に小さなシフトの検出に優れる。

    Parameters:
    -----------
    target : float
        目標値(プロセス平均)
    std : float
        プロセス標準偏差
    lambda_ : float
        重み係数(0 < λ ≤ 1)。小さいほど過去の影響が大きい。
        一般的な値: 0.05-0.25
    L : float
        管理限界の幅(標準偏差の倍数)。デフォルト: 3.0

    Attributes:
    -----------
    ewma : np.ndarray
        EWMA統計量
    """

    def __init__(self, target: float, std: float,
                 lambda_: float = 0.2, L: float = 3.0):
        if not 0 < lambda_ <= 1:
            raise ValueError("lambda_ must be in (0, 1]")

        self.target = target
        self.std = std
        self.lambda_ = lambda_
        self.L = L

        self.ewma = None
        self.ucl = None
        self.lcl = None

    def fit_predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
        """EWMA統計量を計算し、管理外れを検出

        Parameters:
        -----------
        data : np.ndarray
            観測データ

        Returns:
        --------
        results : dict
            'ewma', 'ucl', 'lcl', 'ooc' を含む辞書
        """
        n = len(data)
        self.ewma = np.zeros(n)
        self.ucl = np.zeros(n)
        self.lcl = np.zeros(n)

        # EWMA統計量の逐次計算
        # Z_t = λ * X_t + (1-λ) * Z_{t-1}
        # 初期値: Z_0 = target
        self.ewma[0] = self.lambda_ * data[0] + (1 - self.lambda_) * self.target

        for i in range(1, n):
            self.ewma[i] = self.lambda_ * data[i] + (1 - self.lambda_) * self.ewma[i-1]

        # 管理限界の計算(時間依存)
        # σ_Z(t) = σ * sqrt(λ/(2-λ) * [1 - (1-λ)^(2t)])
        for i in range(n):
            t = i + 1
            sigma_z = self.std * np.sqrt(
                (self.lambda_ / (2 - self.lambda_)) *
                (1 - (1 - self.lambda_)**(2 * t))
            )

            self.ucl[i] = self.target + self.L * sigma_z
            self.lcl[i] = self.target - self.L * sigma_z

        # 管理外れの判定
        ooc = (self.ewma > self.ucl) | (self.ewma < self.lcl)

        return {
            'data': data,
            'ewma': self.ewma,
            'ucl': self.ucl,
            'lcl': self.lcl,
            'ooc': ooc
        }

    def plot(self, data: np.ndarray, title: str = "EWMA Control Chart"):
        """EWMA管理図のプロット"""
        results = self.fit_predict(data)
        n_points = len(data)

        plt.figure(figsize=(12, 6))

        # EWMA統計量のプロット
        plt.plot(range(1, n_points+1), results['ewma'],
                marker='o', color='#2c3e50', linewidth=2,
                markersize=5, label='EWMA')

        # 元データ(薄く)
        plt.plot(range(1, n_points+1), data,
                marker='.', color='gray', linewidth=0.5,
                markersize=3, alpha=0.5, label='Original Data')

        # 中心線と管理限界
        plt.axhline(self.target, color='#11998e', linestyle='-',
                   linewidth=2, label='Target')
        plt.plot(range(1, n_points+1), results['ucl'],
                color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
        plt.plot(range(1, n_points+1), results['lcl'],
                color='#e74c3c', linestyle='--', linewidth=2, label='LCL')

        # 管理外れ点を強調
        ooc_indices = np.where(results['ooc'])[0]
        if len(ooc_indices) > 0:
            plt.scatter(ooc_indices + 1, results['ewma'][ooc_indices],
                       color='red', s=100, zorder=5, label='Out of Control')

        plt.xlabel('Observation Number', fontsize=11)
        plt.ylabel('EWMA Statistic', fontsize=11)
        plt.title(title + f" (λ={self.lambda_}, L={self.L})",
                 fontsize=13, fontweight='bold')
        plt.legend(loc='best')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()


# 使用例: 医薬品製造の含量管理
np.random.seed(42)

# 目標値と標準偏差
target_content = 50.0  # mg
process_std = 0.5      # mg

# データ生成(120観測)
n_obs = 120
data = np.random.normal(target_content, process_std, n_obs)

# 観測60から+0.5σの小さなシフト
data[60:] += 0.5 * process_std

# EWMA管理図の適用(λ=0.2で適度な平滑化)
ewma_chart = EWMAControlChart(
    target=target_content,
    std=process_std,
    lambda_=0.2,
    L=3.0
)

results = ewma_chart.fit_predict(data)

print("=== EWMA管理図 ===")
print(f"目標値: {target_content} mg")
print(f"重み係数 (λ): {ewma_chart.lambda_}")
print(f"管理限界幅 (L): {ewma_chart.L}σ")

# 検出時点の特定
if results['ooc'].any():
    first_ooc = np.where(results['ooc'])[0][0] + 1
    print(f"\nシフト検出: 観測 #{first_ooc}")
    print(f"  実際のシフト開始: 観測 #61")
    print(f"  検出遅延: {first_ooc - 61}観測")

# プロット
ewma_chart.plot(data,
                title="EWMA Chart - Drug Content Monitoring")

# 期待される出力:
# === EWMA管理図 ===
# 目標値: 50.0 mg
# 重み係数 (λ): 0.2
# 管理限界幅 (L): 3.0σ
#
# シフト検出: 観測 #68
#   実際のシフト開始: 観測 #61
#   検出遅延: 7観測

2.4 プロセス能力と管理図の統合

2.4.1 プロセス能力指数と管理図

管理図でプロセスが統計的に安定していることを確認した上で、プロセス能力指数を計算します。

📊 Example 6: Process Capability Analysis with Control Charts

管理図とプロセス能力分析の統合実装

# ===================================
# Example 6: Integrated Process Capability Analysis
# ===================================

from scipy import stats

class ProcessCapabilityAnalysis:
    """管理図とプロセス能力分析の統合クラス

    1. 管理図でプロセスの安定性を確認
    2. 安定したプロセスの能力指数を計算
    3. 総合評価レポートを生成
    """

    def __init__(self, LSL: float, USL: float, target: float = None):
        """
        Parameters:
        -----------
        LSL : float
            下方規格限界 (Lower Specification Limit)
        USL : float
            上方規格限界 (Upper Specification Limit)
        target : float, optional
            目標値。省略時は規格の中央値
        """
        self.LSL = LSL
        self.USL = USL
        self.target = target if target is not None else (LSL + USL) / 2

        self.control_chart = None
        self.capability_results = None

    def analyze(self, data: np.ndarray,
                subgroup_size: int = 5) -> Dict:
        """統合分析の実行

        Parameters:
        -----------
        data : np.ndarray
            shape (n_subgroups, subgroup_size) のデータ
        subgroup_size : int
            サブグループのサイズ

        Returns:
        --------
        results : dict
            管理図情報とプロセス能力指数を含む辞書
        """
        # Step 1: X-bar and R管理図で安定性を確認
        self.control_chart = XbarRControlChart(subgroup_size)
        self.control_chart.fit(data)

        chart_results = self.control_chart.predict(data)

        # 管理外れ点の確認
        n_ooc_xbar = chart_results['xbar_ooc'].sum()
        n_ooc_r = chart_results['r_ooc'].sum()
        is_stable = (n_ooc_xbar == 0) and (n_ooc_r == 0)

        # Step 2: プロセス能力指数の計算(安定している場合のみ推奨)
        all_data = data.flatten()
        process_mean = all_data.mean()
        process_std = all_data.std(ddof=1)

        # Cp: プロセス能力指数(中心化されている前提)
        Cp = (self.USL - self.LSL) / (6 * process_std)

        # Cpk: 修正プロセス能力指数(実際の平均位置を考慮)
        Cpu = (self.USL - process_mean) / (3 * process_std)
        Cpl = (process_mean - self.LSL) / (3 * process_std)
        Cpk = min(Cpu, Cpl)

        # Cpm: タグチ能力指数(目標値からのずれを考慮)
        tau = np.sqrt(process_std**2 + (process_mean - self.target)**2)
        Cpm = (self.USL - self.LSL) / (6 * tau)

        # 規格外れ率の推定(正規分布を仮定)
        z_lower = (self.LSL - process_mean) / process_std
        z_upper = (self.USL - process_mean) / process_std

        ppm_lower = stats.norm.cdf(z_lower) * 1e6
        ppm_upper = (1 - stats.norm.cdf(z_upper)) * 1e6
        ppm_total = ppm_lower + ppm_upper

        # Step 3: 総合評価
        self.capability_results = {
            'stable': is_stable,
            'n_ooc_xbar': n_ooc_xbar,
            'n_ooc_r': n_ooc_r,
            'process_mean': process_mean,
            'process_std': process_std,
            'Cp': Cp,
            'Cpk': Cpk,
            'Cpm': Cpm,
            'Cpu': Cpu,
            'Cpl': Cpl,
            'ppm_lower': ppm_lower,
            'ppm_upper': ppm_upper,
            'ppm_total': ppm_total,
            'LSL': self.LSL,
            'USL': self.USL,
            'target': self.target
        }

        return self.capability_results

    def print_report(self):
        """総合評価レポートの出力"""
        if self.capability_results is None:
            raise ValueError("Call analyze() first")

        r = self.capability_results

        print("=" * 60)
        print("統合プロセス能力分析レポート")
        print("=" * 60)

        print("\n【1. プロセス安定性(管理図)】")
        if r['stable']:
            print("  ✅ プロセスは統計的に安定しています")
        else:
            print("  ⚠️ プロセスが不安定です(管理外れ点あり)")
            print(f"    X-bar管理外れ: {r['n_ooc_xbar']}点")
            print(f"    R管理外れ: {r['n_ooc_r']}点")
            print("  → まずプロセスを安定化させてください")

        print("\n【2. プロセス特性】")
        print(f"  プロセス平均: {r['process_mean']:.4f}")
        print(f"  プロセス標準偏差: {r['process_std']:.4f}")
        print(f"  規格範囲: [{r['LSL']:.2f}, {r['USL']:.2f}]")
        print(f"  目標値: {r['target']:.2f}")

        print("\n【3. プロセス能力指数】")
        print(f"  Cp  = {r['Cp']:.3f}  ", end="")
        print(self._evaluate_index(r['Cp']))

        print(f"  Cpk = {r['Cpk']:.3f}  ", end="")
        print(self._evaluate_index(r['Cpk']))
        print(f"    (Cpu={r['Cpu']:.3f}, Cpl={r['Cpl']:.3f})")

        print(f"  Cpm = {r['Cpm']:.3f}  ", end="")
        print(self._evaluate_index(r['Cpm']))

        print("\n【4. 規格外れ率推定】")
        print(f"  下限外れ: {r['ppm_lower']:.1f} ppm")
        print(f"  上限外れ: {r['ppm_upper']:.1f} ppm")
        print(f"  合計: {r['ppm_total']:.1f} ppm ({r['ppm_total']/1e4:.3f}%)")

        print("\n【5. 総合判定】")
        if r['stable'] and r['Cpk'] >= 1.33:
            print("  ✅ プロセスは安定かつ十分な能力があります")
        elif r['stable'] and r['Cpk'] >= 1.0:
            print("  ⚠️ プロセスは安定していますが、能力改善が必要です")
        else:
            print("  ❌ プロセス改善が必要です")

        print("=" * 60)

    def _evaluate_index(self, value: float) -> str:
        """能力指数の評価"""
        if value >= 1.67:
            return "(優秀)"
        elif value >= 1.33:
            return "(良好)"
        elif value >= 1.0:
            return "(最低限)"
        else:
            return "(不十分)"

    def plot_capability(self, data: np.ndarray):
        """プロセス能力のヒストグラムと正規分布フィット"""
        if self.capability_results is None:
            raise ValueError("Call analyze() first")

        r = self.capability_results
        all_data = data.flatten()

        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

        # ヒストグラムと正規分布フィット
        ax1.hist(all_data, bins=30, density=True, alpha=0.6,
                color='#3498db', edgecolor='black', label='Data')

        # 正規分布曲線
        x = np.linspace(all_data.min(), all_data.max(), 200)
        pdf = stats.norm.pdf(x, r['process_mean'], r['process_std'])
        ax1.plot(x, pdf, 'r-', linewidth=2, label='Normal Fit')

        # 規格限界と目標値
        ax1.axvline(r['LSL'], color='#e74c3c', linestyle='--',
                   linewidth=2, label='LSL')
        ax1.axvline(r['USL'], color='#e74c3c', linestyle='--',
                   linewidth=2, label='USL')
        ax1.axvline(r['target'], color='#11998e', linestyle='-',
                   linewidth=2, label='Target')

        ax1.set_xlabel('Value', fontsize=11)
        ax1.set_ylabel('Density', fontsize=11)
        ax1.set_title('Process Capability Histogram',
                     fontsize=13, fontweight='bold')
        ax1.legend(loc='best')
        ax1.grid(True, alpha=0.3)

        # 能力指数の棒グラフ
        indices = ['Cp', 'Cpk', 'Cpm']
        values = [r['Cp'], r['Cpk'], r['Cpm']]
        colors = ['#3498db', '#e74c3c', '#11998e']

        bars = ax2.bar(indices, values, color=colors, alpha=0.7,
                      edgecolor='black', linewidth=1.5)

        # 基準線
        ax2.axhline(1.33, color='green', linestyle='--',
                   linewidth=2, label='Good (1.33)', alpha=0.7)
        ax2.axhline(1.0, color='orange', linestyle='--',
                   linewidth=2, label='Minimum (1.0)', alpha=0.7)

        # 値をバーの上に表示
        for bar, value in zip(bars, values):
            height = bar.get_height()
            ax2.text(bar.get_x() + bar.get_width()/2., height,
                    f'{value:.3f}',
                    ha='center', va='bottom', fontweight='bold')

        ax2.set_ylabel('Index Value', fontsize=11)
        ax2.set_title('Process Capability Indices',
                     fontsize=13, fontweight='bold')
        ax2.legend(loc='best')
        ax2.grid(True, alpha=0.3, axis='y')
        ax2.set_ylim(0, max(values) * 1.3)

        plt.tight_layout()
        plt.show()


# 使用例: 射出成形品の寸法管理
np.random.seed(42)

# 規格設定
LSL = 49.5  # mm
USL = 50.5  # mm
target = 50.0  # mm

# データ生成(40サブグループ、サイズ5)
n_subgroups = 40
subgroup_size = 5

# 最初の30サブグループは安定(平均50.0、標準偏差0.15)
data_stable = np.random.normal(50.0, 0.15, (30, subgroup_size))

# 後の10サブグループで平均が若干シフト(50.0 → 50.1)
data_shift = np.random.normal(50.1, 0.15, (10, subgroup_size))

data = np.vstack([data_stable, data_shift])

# 統合分析の実行
analysis = ProcessCapabilityAnalysis(LSL, USL, target)
results = analysis.analyze(data, subgroup_size)

# レポート出力
analysis.print_report()

# 可視化
analysis.control_chart.plot(data,
    title="Injection Molding - Dimension Control")
analysis.plot_capability(data)

# 期待される出力:
# ============================================================
# 統合プロセス能力分析レポート
# ============================================================
#
# 【1. プロセス安定性(管理図)】
#   ⚠️ プロセスが不安定です(管理外れ点あり)
#     X-bar管理外れ: 6点
#     R管理外れ: 0点
#   → まずプロセスを安定化させてください
#
# 【2. プロセス特性】
#   プロセス平均: 50.0266
#   プロセス標準偏差: 0.1505
#   規格範囲: [49.50, 50.50]
#   目標値: 50.00
#
# 【3. プロセス能力指数】
#   Cp  = 1.107  (最低限)
#   Cpk = 1.049  (最低限)
#     (Cpu=1.049, Cpl=1.165)
#   Cpm = 1.089  (最低限)
#
# 【4. 規格外れ率推定】
#   下限外れ: 0.1 ppm
#   上限外れ: 8.6 ppm
#   合計: 8.7 ppm (0.001%)
#
# 【5. 総合判定】
#   ⚠️ プロセスは安定していますが、能力改善が必要です
# ============================================================

2.5 管理図の判定ルール

2.5.1 Western Electric Rules(ウエスタン・エレクトリック・ルール)

単一点の管理限界超過だけでなく、データのパターンから異常を検知する高度なルール集です。

📊 Example 7: Western Electric Rules Implementation

8つの判定ルールの実装

# ===================================
# Example 7: Western Electric Rules for Pattern Detection
# ===================================

class WesternElectricRules:
    """Western Electric Rulesの実装

    管理図上のデータパターンから異常を検知する8つのルール。
    単一点の管理限界超過だけでなく、傾向やランなども検出。

    Rules:
    ------
    Rule 1: 1点が3σを超える
    Rule 2: 連続3点中2点が2σと3σの間
    Rule 3: 連続5点中4点が1σと2σの間
    Rule 4: 連続8点が同じ側(中心線から)
    Rule 5: 連続6点が単調増加または減少
    Rule 6: 連続14点が交互に上下
    Rule 7: 連続15点が1σ以内
    Rule 8: 連続8点が1σ外側
    """

    def __init__(self, center_line: float, ucl: float, lcl: float):
        """
        Parameters:
        -----------
        center_line : float
            中心線(平均)
        ucl : float
            上方管理限界(CL + 3σ)
        lcl : float
            下方管理限界(CL - 3σ)
        """
        self.cl = center_line
        self.ucl = ucl
        self.lcl = lcl

        # ゾーン境界の計算
        sigma = (ucl - center_line) / 3

        self.zone_a_upper = center_line + 2 * sigma
        self.zone_a_lower = center_line - 2 * sigma
        self.zone_b_upper = center_line + sigma
        self.zone_b_lower = center_line - sigma

    def apply_rules(self, data: np.ndarray) -> Dict[str, np.ndarray]:
        """全ルールを適用

        Parameters:
        -----------
        data : np.ndarray
            時系列データ

        Returns:
        --------
        violations : dict
            各ルールの違反点を示すブール配列
        """
        n = len(data)
        violations = {f'Rule{i}': np.zeros(n, dtype=bool) for i in range(1, 9)}

        for i in range(n):
            # Rule 1: 1点が3σを超える
            if data[i] > self.ucl or data[i] < self.lcl:
                violations['Rule1'][i] = True

            # Rule 2: 連続3点中2点が2σと3σの間
            if i >= 2:
                window = data[i-2:i+1]
                in_zone_a = np.sum((window > self.zone_a_upper) |
                                   (window < self.zone_a_lower))
                if in_zone_a >= 2:
                    violations['Rule2'][i] = True

            # Rule 3: 連続5点中4点が1σと2σの間
            if i >= 4:
                window = data[i-4:i+1]
                in_zone_b = np.sum(
                    ((window > self.zone_b_upper) & (window <= self.zone_a_upper)) |
                    ((window < self.zone_b_lower) & (window >= self.zone_a_lower))
                )
                if in_zone_b >= 4:
                    violations['Rule3'][i] = True

            # Rule 4: 連続8点が同じ側
            if i >= 7:
                window = data[i-7:i+1]
                if np.all(window > self.cl) or np.all(window < self.cl):
                    violations['Rule4'][i] = True

            # Rule 5: 連続6点が単調増加または減少
            if i >= 5:
                window = data[i-5:i+1]
                diffs = np.diff(window)
                if np.all(diffs > 0) or np.all(diffs < 0):
                    violations['Rule5'][i] = True

            # Rule 6: 連続14点が交互に上下
            if i >= 13:
                window = data[i-13:i+1]
                diffs = np.diff(window)
                signs = np.sign(diffs)
                # 符号が交互に変わるか確認
                alternating = True
                for j in range(len(signs) - 1):
                    if signs[j] * signs[j+1] >= 0:
                        alternating = False
                        break
                if alternating:
                    violations['Rule6'][i] = True

            # Rule 7: 連続15点が1σ以内
            if i >= 14:
                window = data[i-14:i+1]
                if np.all((window >= self.zone_b_lower) &
                         (window <= self.zone_b_upper)):
                    violations['Rule7'][i] = True

            # Rule 8: 連続8点が1σ外側
            if i >= 7:
                window = data[i-7:i+1]
                if np.all((window > self.zone_b_upper) |
                         (window < self.zone_b_lower)):
                    violations['Rule8'][i] = True

        return violations

    def plot_with_zones(self, data: np.ndarray,
                        title: str = "Control Chart with Western Electric Rules"):
        """ゾーン付き管理図のプロット"""
        violations = self.apply_rules(data)
        n = len(data)

        # 全違反点を統合
        any_violation = np.zeros(n, dtype=bool)
        for rule_violations in violations.values():
            any_violation |= rule_violations

        plt.figure(figsize=(14, 7))

        # ゾーンを色分け表示
        plt.axhspan(self.ucl, self.zone_a_upper,
                   color='#e74c3c', alpha=0.1, label='Zone A (2-3σ)')
        plt.axhspan(self.zone_a_upper, self.zone_b_upper,
                   color='#f39c12', alpha=0.1, label='Zone B (1-2σ)')
        plt.axhspan(self.zone_b_upper, self.cl,
                   color='#2ecc71', alpha=0.1, label='Zone C (0-1σ)')
        plt.axhspan(self.cl, self.zone_b_lower,
                   color='#2ecc71', alpha=0.1)
        plt.axhspan(self.zone_b_lower, self.zone_a_lower,
                   color='#f39c12', alpha=0.1)
        plt.axhspan(self.zone_a_lower, self.lcl,
                   color='#e74c3c', alpha=0.1)

        # データプロット
        plt.plot(range(1, n+1), data, marker='o', color='#2c3e50',
                linewidth=1.5, markersize=5, label='Data', zorder=3)

        # 管理限界
        plt.axhline(self.cl, color='#11998e', linestyle='-',
                   linewidth=2, label='CL', zorder=2)
        plt.axhline(self.ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='UCL (3σ)', zorder=2)
        plt.axhline(self.lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='LCL (3σ)', zorder=2)

        # ゾーン境界(点線)
        plt.axhline(self.zone_a_upper, color='gray', linestyle=':',
                   linewidth=1, alpha=0.5, zorder=1)
        plt.axhline(self.zone_a_lower, color='gray', linestyle=':',
                   linewidth=1, alpha=0.5, zorder=1)
        plt.axhline(self.zone_b_upper, color='gray', linestyle=':',
                   linewidth=1, alpha=0.5, zorder=1)
        plt.axhline(self.zone_b_lower, color='gray', linestyle=':',
                   linewidth=1, alpha=0.5, zorder=1)

        # 違反点を強調
        if any_violation.any():
            violation_indices = np.where(any_violation)[0]
            plt.scatter(violation_indices + 1, data[violation_indices],
                       color='red', s=150, marker='X', zorder=5,
                       label='Rule Violation')

        plt.xlabel('Observation Number', fontsize=11)
        plt.ylabel('Value', fontsize=11)
        plt.title(title, fontsize=13, fontweight='bold')
        plt.legend(loc='best', ncol=2)
        plt.grid(True, alpha=0.3, zorder=0)
        plt.tight_layout()
        plt.show()

        # 違反サマリーの出力
        print("\n=== Western Electric Rules Violations ===")
        for rule_name, rule_violations in violations.items():
            n_violations = rule_violations.sum()
            if n_violations > 0:
                indices = np.where(rule_violations)[0] + 1
                print(f"{rule_name}: {n_violations}回 "
                      f"(観測 {list(indices[:5])}{'...' if n_violations > 5 else ''})")


# 使用例: 様々な異常パターンの検出
np.random.seed(42)

# データ生成(100観測)
n = 100
data = np.random.normal(0, 1, n)

# 異常パターンを挿入
# Rule 1違反: 3σ超過
data[20] = 3.5

# Rule 4違反: 連続8点が上側
data[40:48] += 0.5

# Rule 5違反: 単調増加傾向
data[65:71] = np.linspace(0, 2, 6)

# Rule 7違反: 変動縮小(1σ以内)
data[85:100] = np.random.normal(0, 0.2, 15)

# Western Electric Rulesの適用
cl = 0.0
ucl = 3.0
lcl = -3.0

we_rules = WesternElectricRules(cl, ucl, lcl)
we_rules.plot_with_zones(data,
    title="Western Electric Rules - Anomaly Pattern Detection")

# 期待される出力:
# === Western Electric Rules Violations ===
# Rule1: 1回 (観測 [21])
# Rule4: 1回 (観測 [48])
# Rule5: 1回 (観測 [71])
# Rule7: 1回 (観測 [100])

2.6 リアルタイム監視システムの実装

📊 Example 8: Real-time SPC Monitoring System

実際の製造現場で使える逐次更新型の監視システム

# ===================================
# Example 8: Real-time SPC Monitoring System
# ===================================

import time
from datetime import datetime
from collections import deque

class RealTimeSPCMonitor:
    """リアルタイムSPC監視システム

    新しいデータが到着するたびに逐次的に管理図を更新し、
    異常を即座に検出・通知するシステム。

    Features:
    ---------
    - EWMA、CUSUM、Shewhart管理図を同時監視
    - Western Electric Rulesによる高感度検出
    - アラート通知機能
    - 履歴データの保持と可視化
    """

    def __init__(self, target: float, std: float,
                 window_size: int = 100):
        """
        Parameters:
        -----------
        target : float
            目標値
        std : float
            プロセス標準偏差
        window_size : int
            表示する履歴データの最大数
        """
        self.target = target
        self.std = std
        self.window_size = window_size

        # 管理限界
        self.ucl = target + 3 * std
        self.lcl = target - 3 * std

        # 各手法の初期化
        self.ewma_chart = EWMAControlChart(target, std, lambda_=0.2)
        self.cusum_chart = CUSUMControlChart(target, std, k=0.5, h=5.0)
        self.we_rules = WesternElectricRules(target, self.ucl, self.lcl)

        # データ履歴(dequeで効率的に管理)
        self.data_history = deque(maxlen=window_size)
        self.timestamps = deque(maxlen=window_size)
        self.ewma_history = deque(maxlen=window_size)

        # アラート履歴
        self.alerts = []

    def add_observation(self, value: float,
                       timestamp: datetime = None) -> Dict:
        """新しい観測値を追加し、異常検出を実行

        Parameters:
        -----------
        value : float
            新しい観測値
        timestamp : datetime, optional
            タイムスタンプ(省略時は現在時刻)

        Returns:
        --------
        status : dict
            検出結果とアラート情報
        """
        if timestamp is None:
            timestamp = datetime.now()

        # データを追加
        self.data_history.append(value)
        self.timestamps.append(timestamp)

        # 異常検出
        alerts = []

        # 1. Shewhart管理図(3σルール)
        if value > self.ucl:
            alerts.append({
                'method': 'Shewhart',
                'type': 'UCL violation',
                'value': value,
                'severity': 'HIGH'
            })
        elif value < self.lcl:
            alerts.append({
                'method': 'Shewhart',
                'type': 'LCL violation',
                'value': value,
                'severity': 'HIGH'
            })

        # 2. EWMA管理図
        if len(self.data_history) >= 1:
            data_array = np.array(self.data_history)
            ewma_results = self.ewma_chart.fit_predict(data_array)

            current_ewma = ewma_results['ewma'][-1]
            current_ucl = ewma_results['ucl'][-1]
            current_lcl = ewma_results['lcl'][-1]

            self.ewma_history.append(current_ewma)

            if ewma_results['ooc'][-1]:
                alerts.append({
                    'method': 'EWMA',
                    'type': 'EWMA limit violation',
                    'value': current_ewma,
                    'severity': 'MEDIUM'
                })

        # 3. CUSUM管理図
        if len(self.data_history) >= 1:
            data_array = np.array(self.data_history)
            cusum_results = self.cusum_chart.fit_predict(data_array)

            if cusum_results['ooc_high'][-1]:
                alerts.append({
                    'method': 'CUSUM',
                    'type': 'Upward shift detected',
                    'value': cusum_results['cusum_high'][-1],
                    'severity': 'MEDIUM'
                })

            if cusum_results['ooc_low'][-1]:
                alerts.append({
                    'method': 'CUSUM',
                    'type': 'Downward shift detected',
                    'value': cusum_results['cusum_low'][-1],
                    'severity': 'MEDIUM'
                })

        # 4. Western Electric Rules
        if len(self.data_history) >= 15:  # 最低限のデータが必要
            data_array = np.array(self.data_history)
            violations = self.we_rules.apply_rules(data_array)

            for rule_name, rule_violations in violations.items():
                if rule_violations[-1]:  # 最新点で違反
                    alerts.append({
                        'method': 'Western Electric',
                        'type': f'{rule_name} violation',
                        'value': value,
                        'severity': 'LOW'
                    })

        # アラート保存
        if alerts:
            for alert in alerts:
                alert['timestamp'] = timestamp
                self.alerts.append(alert)

        # ステータス返却
        status = {
            'value': value,
            'timestamp': timestamp,
            'in_control': len(alerts) == 0,
            'alerts': alerts,
            'n_observations': len(self.data_history)
        }

        return status

    def get_summary(self) -> Dict:
        """現在の監視状況のサマリー"""
        if len(self.data_history) == 0:
            return {'status': 'No data'}

        data_array = np.array(self.data_history)

        return {
            'n_observations': len(self.data_history),
            'current_value': data_array[-1],
            'process_mean': data_array.mean(),
            'process_std': data_array.std(ddof=1),
            'target': self.target,
            'n_alerts_total': len(self.alerts),
            'n_alerts_last_10': sum(1 for a in self.alerts
                                   if len(self.data_history) -
                                   list(self.timestamps).index(a['timestamp']) <= 10),
            'alert_methods': list(set(a['method'] for a in self.alerts))
        }

    def plot_dashboard(self):
        """リアルタイム監視ダッシュボード"""
        if len(self.data_history) < 2:
            print("データが不足しています(最低2点必要)")
            return

        data_array = np.array(self.data_history)
        n = len(data_array)

        fig, axes = plt.subplots(3, 1, figsize=(14, 10))

        # 1. Shewhart管理図
        ax1 = axes[0]
        x_indices = range(1, n+1)

        ax1.plot(x_indices, data_array, marker='o', color='#2c3e50',
                linewidth=1.5, markersize=4, label='Data')
        ax1.axhline(self.target, color='#11998e', linestyle='-',
                   linewidth=2, label='Target')
        ax1.axhline(self.ucl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='UCL')
        ax1.axhline(self.lcl, color='#e74c3c', linestyle='--',
                   linewidth=2, label='LCL')

        # Shewhart違反点を強調
        ooc_indices = (data_array > self.ucl) | (data_array < self.lcl)
        if ooc_indices.any():
            ooc_x = np.where(ooc_indices)[0] + 1
            ax1.scatter(ooc_x, data_array[ooc_indices],
                       color='red', s=100, zorder=5)

        ax1.set_ylabel('Value', fontsize=10)
        ax1.set_title('Shewhart Control Chart (3σ)',
                     fontsize=12, fontweight='bold')
        ax1.legend(loc='best', fontsize=9)
        ax1.grid(True, alpha=0.3)

        # 2. EWMA管理図
        ax2 = axes[1]

        if len(self.ewma_history) > 0:
            ewma_array = np.array(self.ewma_history)
            ewma_results = self.ewma_chart.fit_predict(data_array)

            ax2.plot(x_indices, data_array, marker='.', color='gray',
                    linewidth=0.5, markersize=3, alpha=0.5, label='Original')
            ax2.plot(x_indices, ewma_array, marker='o', color='#3498db',
                    linewidth=2, markersize=4, label='EWMA')
            ax2.axhline(self.target, color='#11998e', linestyle='-',
                       linewidth=2, label='Target')
            ax2.plot(x_indices, ewma_results['ucl'],
                    color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
            ax2.plot(x_indices, ewma_results['lcl'],
                    color='#e74c3c', linestyle='--', linewidth=2, label='LCL')

            # EWMA違反点を強調
            if ewma_results['ooc'].any():
                ooc_x = np.where(ewma_results['ooc'])[0] + 1
                ax2.scatter(ooc_x, ewma_array[ewma_results['ooc']],
                           color='red', s=100, zorder=5)

        ax2.set_ylabel('EWMA Value', fontsize=10)
        ax2.set_title('EWMA Control Chart (λ=0.2)',
                     fontsize=12, fontweight='bold')
        ax2.legend(loc='best', fontsize=9)
        ax2.grid(True, alpha=0.3)

        # 3. CUSUM管理図
        ax3 = axes[2]

        cusum_results = self.cusum_chart.fit_predict(data_array)

        ax3.plot(x_indices, cusum_results['cusum_high'],
                marker='^', color='#e74c3c', linewidth=1.5,
                markersize=4, label='CUSUM High')
        ax3.plot(x_indices, cusum_results['cusum_low'],
                marker='v', color='#3498db', linewidth=1.5,
                markersize=4, label='CUSUM Low')
        ax3.axhline(self.cusum_chart.h, color='#2c3e50',
                   linestyle='--', linewidth=2, label='Decision Interval (h)')
        ax3.axhline(0, color='gray', linestyle='-', linewidth=1)

        # CUSUM違反点を強調
        if cusum_results['ooc_high'].any():
            ooc_x = np.where(cusum_results['ooc_high'])[0] + 1
            ax3.scatter(ooc_x, cusum_results['cusum_high'][cusum_results['ooc_high']],
                       color='red', s=100, zorder=5, marker='^')
        if cusum_results['ooc_low'].any():
            ooc_x = np.where(cusum_results['ooc_low'])[0] + 1
            ax3.scatter(ooc_x, cusum_results['cusum_low'][cusum_results['ooc_low']],
                       color='red', s=100, zorder=5, marker='v')

        ax3.set_xlabel('Observation Number', fontsize=10)
        ax3.set_ylabel('CUSUM Value', fontsize=10)
        ax3.set_title('CUSUM Control Chart',
                     fontsize=12, fontweight='bold')
        ax3.legend(loc='best', fontsize=9)
        ax3.grid(True, alpha=0.3)

        plt.suptitle('Real-time SPC Monitoring Dashboard',
                    fontsize=15, fontweight='bold', y=0.995)
        plt.tight_layout()
        plt.show()


# 使用例: リアルタイム監視のシミュレーション
np.random.seed(42)

# 監視システムの初期化
target = 100.0
std = 1.0
monitor = RealTimeSPCMonitor(target, std, window_size=150)

print("=== リアルタイムSPC監視システム ===")
print(f"目標値: {target}")
print(f"標準偏差: {std}")
print("\n観測データを逐次追加中...\n")

# シミュレーション: 150観測を逐次追加
for i in range(150):
    # 正常データ
    if i < 60:
        value = np.random.normal(target, std)
    # 観測60-80: +1σシフト
    elif i < 80:
        value = np.random.normal(target + std, std)
    # 観測80-120: 正常に戻る
    elif i < 120:
        value = np.random.normal(target, std)
    # 観測120-150: 変動増大
    else:
        value = np.random.normal(target, std * 1.5)

    # 観測値を追加
    status = monitor.add_observation(value)

    # アラートがあれば表示
    if not status['in_control']:
        print(f"⚠️ 観測 #{i+1}: アラート発生")
        for alert in status['alerts']:
            print(f"  - {alert['method']}: {alert['type']} "
                  f"(重要度: {alert['severity']})")

# サマリー表示
print("\n=== 監視サマリー ===")
summary = monitor.get_summary()
for key, value in summary.items():
    print(f"{key}: {value}")

# ダッシュボード表示
monitor.plot_dashboard()

# 期待される出力:
# === リアルタイムSPC監視システム ===
# 目標値: 100.0
# 標準偏差: 1.0
#
# 観測データを逐次追加中...
#
# ⚠️ 観測 #67: アラート発生
#   - EWMA: EWMA limit violation (重要度: MEDIUM)
#   - CUSUM: Upward shift detected (重要度: MEDIUM)
# ...(複数のアラート)
#
# === 監視サマリー ===
# n_observations: 150
# current_value: 101.234
# process_mean: 100.567
# process_std: 1.234
# target: 100.0
# n_alerts_total: 45
# n_alerts_last_10: 8
# alert_methods: ['EWMA', 'CUSUM', 'Western Electric']

学習目標の確認

この章を完了すると、以下を説明・実装できるようになります:

基本理解

実践スキル

応用力

演習問題

Easy(基礎確認)

Q1: X-bar管理図の上方管理限界(UCL)が80.5、中心線(CL)が79.0の場合、下方管理限界(LCL)はいくつですか?

解答を見る

正解: 77.5

解説:
UCL = CL + 3σ
LCL = CL - 3σ
UCL - CL = 80.5 - 79.0 = 1.5 = 3σ
したがって、LCL = CL - 1.5 = 79.0 - 1.5 = 77.5

Medium(応用)

Q2: あるプロセスの平均不良率が2%、サンプルサイズが100個の場合、P管理図の上方管理限界(UCL)を計算してください。

解答を見る

正解: UCL = 0.062 (6.2%)

計算:
p̄ = 0.02
n = 100
σ_p = √(p̄(1-p̄)/n) = √(0.02 × 0.98 / 100) = √0.000196 = 0.014
UCL = p̄ + 3σ_p = 0.02 + 3 × 0.014 = 0.02 + 0.042 = 0.062

重要ポイント: サンプルサイズが大きいほど管理限界は中心線に近づきます(検出力向上)。

Hard(発展)

Q3: CUSUMとEWMAはどちらも小さなシフト検出に優れていますが、それぞれの長所と短所を比較し、どのような状況でどちらを選ぶべきか説明してください。

解答を見る

比較表:

項目 CUSUM EWMA
検出速度 非常に高速(ARL優秀) 高速
パラメータ設定 やや難(k, h) 比較的容易(λ)
解釈のしやすさ 難(累積統計量) 易(平滑化された値)
シフト方向検出 上下別々(明確) 単一統計量
実装の複雑さ 中程度 シンプル

選択基準:

  • CUSUMを選ぶ場合:
    • 最速の検出が必要(医薬品、半導体など)
    • シフト方向の特定が重要
    • 専門的なSPC知識がある
  • EWMAを選ぶ場合:
    • 実装のシンプルさが重要
    • 現場オペレーターが理解しやすい
    • パラメータ調整が頻繁に必要

ベストプラクティス: 重要なプロセスでは両方を併用し、相互補完的に監視することを推奨します。