プロセス変動を可視化し、異常を早期検知するための実践的手法
統計的工程管理(Statistical Process Control: SPC)は、プロセスの変動を統計的に監視し、異常を早期に検知するための手法です。ウォルター・シューハート博士が1920年代にベル研究所で開発した管理図を基礎としています。
SPCの目的は、偶然原因による正常な変動と異常原因による異常な変動を統計的に区別し、異常原因を早期に発見して対処することです。
管理図は以下の要素で構成されます:
最も基本的な管理図。サブグループの平均と範囲を同時に管理します。
# ===================================
# Example 1: X-bar and R Control Charts Implementation
# ===================================
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Tuple, Dict
import warnings
warnings.filterwarnings('ignore')
class XbarRControlChart:
"""X-bar and R管理図の実装
連続変数のプロセス管理に使用。サブグループの平均(X-bar)と
範囲(R)を同時に監視し、プロセスの中心と散らばりを管理する。
Parameters:
-----------
subgroup_size : int
サブグループのサンプル数 (通常2-10)
Attributes:
-----------
A2, D3, D4 : float
管理限界計算用の定数(サブグループサイズに依存)
"""
# サブグループサイズ別の定数(JIS Z 9020-2より)
CONSTANTS = {
2: {'A2': 1.880, 'D3': 0, 'D4': 3.267},
3: {'A2': 1.023, 'D3': 0, 'D4': 2.575},
4: {'A2': 0.729, 'D3': 0, 'D4': 2.282},
5: {'A2': 0.577, 'D3': 0, 'D4': 2.115},
6: {'A2': 0.483, 'D3': 0, 'D4': 2.004},
7: {'A2': 0.419, 'D3': 0.076, 'D4': 1.924},
8: {'A2': 0.373, 'D3': 0.136, 'D4': 1.864},
9: {'A2': 0.337, 'D3': 0.184, 'D4': 1.816},
10: {'A2': 0.308, 'D3': 0.223, 'D4': 1.777}
}
def __init__(self, subgroup_size: int = 5):
if subgroup_size not in self.CONSTANTS:
raise ValueError(f"Subgroup size must be 2-10, got {subgroup_size}")
self.n = subgroup_size
self.A2 = self.CONSTANTS[subgroup_size]['A2']
self.D3 = self.CONSTANTS[subgroup_size]['D3']
self.D4 = self.CONSTANTS[subgroup_size]['D4']
self.xbar_cl = None
self.xbar_ucl = None
self.xbar_lcl = None
self.r_cl = None
self.r_ucl = None
self.r_lcl = None
def fit(self, data: np.ndarray) -> 'XbarRControlChart':
"""管理限界を計算(フェーズI: 初期データから基準を設定)
Parameters:
-----------
data : np.ndarray
shape (n_subgroups, subgroup_size) のデータ
Returns:
--------
self : XbarRControlChart
"""
if data.shape[1] != self.n:
raise ValueError(f"Data subgroup size {data.shape[1]} != {self.n}")
# 各サブグループの平均と範囲を計算
xbar = data.mean(axis=1)
r = data.max(axis=1) - data.min(axis=1)
# 中心線と管理限界の計算
self.xbar_cl = xbar.mean()
self.r_cl = r.mean()
# X-bar管理図の管理限界
self.xbar_ucl = self.xbar_cl + self.A2 * self.r_cl
self.xbar_lcl = self.xbar_cl - self.A2 * self.r_cl
# R管理図の管理限界
self.r_ucl = self.D4 * self.r_cl
self.r_lcl = self.D3 * self.r_cl
return self
def predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
"""新しいデータに対して管理状態を判定(フェーズII: 継続監視)
Parameters:
-----------
data : np.ndarray
shape (n_subgroups, subgroup_size) のデータ
Returns:
--------
results : dict
'xbar', 'r', 'xbar_ooc', 'r_ooc' を含む辞書
ooc = out of control (管理外れ)
"""
if self.xbar_cl is None:
raise ValueError("Call fit() first to establish control limits")
xbar = data.mean(axis=1)
r = data.max(axis=1) - data.min(axis=1)
# 管理外れの判定
xbar_ooc = (xbar > self.xbar_ucl) | (xbar < self.xbar_lcl)
r_ooc = (r > self.r_ucl) | (r < self.r_lcl)
return {
'xbar': xbar,
'r': r,
'xbar_ooc': xbar_ooc,
'r_ooc': r_ooc
}
def plot(self, data: np.ndarray, title: str = "X-bar and R Control Charts"):
"""管理図のプロット
Parameters:
-----------
data : np.ndarray
shape (n_subgroups, subgroup_size) のデータ
title : str
グラフタイトル
"""
results = self.predict(data)
n_points = len(results['xbar'])
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# X-bar管理図
ax1.plot(range(1, n_points+1), results['xbar'],
marker='o', color='#2c3e50', linewidth=1.5, markersize=6)
ax1.axhline(self.xbar_cl, color='#11998e', linestyle='-',
linewidth=2, label='CL')
ax1.axhline(self.xbar_ucl, color='#e74c3c', linestyle='--',
linewidth=2, label='UCL')
ax1.axhline(self.xbar_lcl, color='#e74c3c', linestyle='--',
linewidth=2, label='LCL')
# 管理外れ点を強調
ooc_indices = np.where(results['xbar_ooc'])[0]
if len(ooc_indices) > 0:
ax1.scatter(ooc_indices + 1, results['xbar'][ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
ax1.set_xlabel('Subgroup Number', fontsize=11)
ax1.set_ylabel('Subgroup Mean (X-bar)', fontsize=11)
ax1.set_title('X-bar Control Chart', fontsize=13, fontweight='bold')
ax1.legend(loc='best')
ax1.grid(True, alpha=0.3)
# R管理図
ax2.plot(range(1, n_points+1), results['r'],
marker='s', color='#2c3e50', linewidth=1.5, markersize=6)
ax2.axhline(self.r_cl, color='#11998e', linestyle='-',
linewidth=2, label='CL')
ax2.axhline(self.r_ucl, color='#e74c3c', linestyle='--',
linewidth=2, label='UCL')
ax2.axhline(self.r_lcl, color='#e74c3c', linestyle='--',
linewidth=2, label='LCL')
# 管理外れ点を強調
ooc_indices = np.where(results['r_ooc'])[0]
if len(ooc_indices) > 0:
ax2.scatter(ooc_indices + 1, results['r'][ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
ax2.set_xlabel('Subgroup Number', fontsize=11)
ax2.set_ylabel('Subgroup Range (R)', fontsize=11)
ax2.set_title('R Control Chart', fontsize=13, fontweight='bold')
ax2.legend(loc='best')
ax2.grid(True, alpha=0.3)
plt.suptitle(title, fontsize=15, fontweight='bold', y=0.995)
plt.tight_layout()
plt.show()
# 使用例: 化学プロセスの温度管理
# Phase I: 初期25サブグループで管理限界を設定
np.random.seed(42)
n_subgroups_phase1 = 25
subgroup_size = 5
target_temp = 80.0 # 目標温度 [℃]
# 正常運転データ(平均80℃、標準偏差0.5℃)
phase1_data = np.random.normal(target_temp, 0.5,
(n_subgroups_phase1, subgroup_size))
# 管理図の構築
xbar_r_chart = XbarRControlChart(subgroup_size=subgroup_size)
xbar_r_chart.fit(phase1_data)
print("=== Phase I: 管理限界の設定 ===")
print(f"X-bar管理図:")
print(f" CL = {xbar_r_chart.xbar_cl:.3f} ℃")
print(f" UCL = {xbar_r_chart.xbar_ucl:.3f} ℃")
print(f" LCL = {xbar_r_chart.xbar_lcl:.3f} ℃")
print(f"\nR管理図:")
print(f" CL = {xbar_r_chart.r_cl:.3f} ℃")
print(f" UCL = {xbar_r_chart.r_ucl:.3f} ℃")
print(f" LCL = {xbar_r_chart.r_lcl:.3f} ℃")
# Phase II: 継続監視(30サブグループ、途中で異常発生)
n_subgroups_phase2 = 30
phase2_data = np.random.normal(target_temp, 0.5,
(n_subgroups_phase2, subgroup_size))
# サブグループ15-20で平均シフト(+1.5℃)
phase2_data[14:20, :] += 1.5
# サブグループ25で散らばり増大
phase2_data[24, :] = np.random.normal(target_temp, 2.0, subgroup_size)
# 監視
results = xbar_r_chart.predict(phase2_data)
print("\n=== Phase II: 継続監視 ===")
print(f"X-bar管理外れ: {results['xbar_ooc'].sum()}個 "
f"(サブグループ {np.where(results['xbar_ooc'])[0] + 1})")
print(f"R管理外れ: {results['r_ooc'].sum()}個 "
f"(サブグループ {np.where(results['r_ooc'])[0] + 1})")
# プロット
xbar_r_chart.plot(phase2_data,
title="Chemical Process Temperature Monitoring")
# 期待される出力:
# === Phase I: 管理限界の設定 ===
# X-bar管理図:
# CL = 79.995 ℃
# UCL = 80.666 ℃
# LCL = 79.323 ℃
#
# R管理図:
# CL = 1.165 ℃
# UCL = 2.464 ℃
# LCL = 0.000 ℃
#
# === Phase II: 継続監視 ===
# X-bar管理外れ: 5個 (サブグループ [15 16 17 18 19])
# R管理外れ: 1個 (サブグループ [25])
管理限界(±3σ)はプロセスの変動から計算され、プロセスが統計的に安定しているかを判定します。一方、規格限界は顧客要求や設計仕様から決まり、製品の合否を判定します。管理限界内でも規格外れの可能性があります。
計量値(連続変数)ではなく、不良品数や欠陥数などの計数値データに対しては、異なる管理図を使用します。
検査ロットから抜き取った製品の不良率を管理します。
# ===================================
# Example 2: P-Chart for Proportion Defective
# ===================================
class PControlChart:
"""P管理図(不良率管理図)の実装
二項分布に従う不良率データの管理に使用。
サンプルサイズが一定でなくても適用可能。
Attributes:
-----------
p_bar : float
全体の平均不良率
ucl, lcl : np.ndarray
各サンプルサイズに応じた管理限界
"""
def __init__(self):
self.p_bar = None
self.ucl = None
self.lcl = None
self.n = None
def fit(self, defectives: np.ndarray, sample_sizes: np.ndarray):
"""管理限界を計算
Parameters:
-----------
defectives : np.ndarray
各ロットの不良品数
sample_sizes : np.ndarray
各ロットのサンプルサイズ
"""
# 平均不良率の計算
self.p_bar = defectives.sum() / sample_sizes.sum()
self.n = sample_sizes
# 各サンプルサイズに応じた管理限界(±3σ)
# σ_p = sqrt(p_bar * (1 - p_bar) / n)
sigma_p = np.sqrt(self.p_bar * (1 - self.p_bar) / self.n)
self.ucl = self.p_bar + 3 * sigma_p
self.lcl = self.p_bar - 3 * sigma_p
# LCLは0未満にならない
self.lcl = np.maximum(self.lcl, 0)
# UCLは1を超えない
self.ucl = np.minimum(self.ucl, 1)
return self
def predict(self, defectives: np.ndarray,
sample_sizes: np.ndarray) -> Dict[str, np.ndarray]:
"""管理状態を判定
Parameters:
-----------
defectives : np.ndarray
各ロットの不良品数
sample_sizes : np.ndarray
各ロットのサンプルサイズ
Returns:
--------
results : dict
'p', 'ucl', 'lcl', 'ooc' を含む辞書
"""
if self.p_bar is None:
raise ValueError("Call fit() first")
# 不良率の計算
p = defectives / sample_sizes
# 管理限界の再計算(サンプルサイズが異なる場合)
sigma_p = np.sqrt(self.p_bar * (1 - self.p_bar) / sample_sizes)
ucl = np.minimum(self.p_bar + 3 * sigma_p, 1)
lcl = np.maximum(self.p_bar - 3 * sigma_p, 0)
# 管理外れの判定
ooc = (p > ucl) | (p < lcl)
return {
'p': p,
'ucl': ucl,
'lcl': lcl,
'ooc': ooc
}
def plot(self, defectives: np.ndarray, sample_sizes: np.ndarray,
title: str = "P Control Chart"):
"""P管理図のプロット"""
results = self.predict(defectives, sample_sizes)
n_points = len(results['p'])
plt.figure(figsize=(12, 6))
# 不良率のプロット
plt.plot(range(1, n_points+1), results['p'],
marker='o', color='#2c3e50', linewidth=1.5,
markersize=6, label='Proportion Defective')
# 中心線
plt.axhline(self.p_bar, color='#11998e', linestyle='-',
linewidth=2, label=f'CL = {self.p_bar:.4f}')
# 管理限界(サンプルサイズが変わる場合は階段状)
plt.plot(range(1, n_points+1), results['ucl'],
color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
plt.plot(range(1, n_points+1), results['lcl'],
color='#e74c3c', linestyle='--', linewidth=2, label='LCL')
# 管理外れ点を強調
ooc_indices = np.where(results['ooc'])[0]
if len(ooc_indices) > 0:
plt.scatter(ooc_indices + 1, results['p'][ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
plt.xlabel('Lot Number', fontsize=11)
plt.ylabel('Proportion Defective', fontsize=11)
plt.title(title, fontsize=13, fontweight='bold')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 使用例: 半導体製造の不良率管理
np.random.seed(42)
# Phase I: 初期30ロットで管理限界を設定
n_lots_phase1 = 30
sample_sizes_phase1 = np.random.randint(100, 200, n_lots_phase1)
true_defect_rate = 0.02 # 真の不良率2%
# 二項分布で不良品数を生成
defectives_phase1 = np.random.binomial(sample_sizes_phase1,
true_defect_rate)
# P管理図の構築
p_chart = PControlChart()
p_chart.fit(defectives_phase1, sample_sizes_phase1)
print("=== P管理図: 管理限界の設定 ===")
print(f"平均不良率 (p-bar) = {p_chart.p_bar:.4f} ({p_chart.p_bar*100:.2f}%)")
# Phase II: 継続監視(40ロット、途中で不良率増加)
n_lots_phase2 = 40
sample_sizes_phase2 = np.random.randint(100, 200, n_lots_phase2)
defectives_phase2 = np.random.binomial(sample_sizes_phase2,
true_defect_rate)
# ロット25-30で不良率が3倍に増加(工程異常)
defectives_phase2[24:30] = np.random.binomial(
sample_sizes_phase2[24:30], true_defect_rate * 3)
# 監視
results = p_chart.predict(defectives_phase2, sample_sizes_phase2)
print(f"\n=== Phase II: 継続監視 ===")
print(f"管理外れロット: {results['ooc'].sum()}個")
print(f"該当ロット番号: {np.where(results['ooc'])[0] + 1}")
# プロット
p_chart.plot(defectives_phase2, sample_sizes_phase2,
title="Semiconductor Manufacturing - Defect Rate Monitoring")
# 期待される出力:
# === P管理図: 管理限界の設定 ===
# 平均不良率 (p-bar) = 0.0189 (1.89%)
#
# === Phase II: 継続監視 ===
# 管理外れロット: 6個
# 該当ロット番号: [25 26 27 28 29 30]
一定の検査単位あたりの欠陥数を管理します。ポアソン分布を仮定。
# ===================================
# Example 3: C-Chart for Count of Defects
# ===================================
class CControlChart:
"""C管理図(欠陥数管理図)の実装
一定の検査単位(面積、時間、体積など)あたりの欠陥数を管理。
欠陥数はポアソン分布に従うと仮定。
Attributes:
-----------
c_bar : float
平均欠陥数
ucl, lcl : float
管理限界
"""
def __init__(self):
self.c_bar = None
self.ucl = None
self.lcl = None
def fit(self, defects: np.ndarray):
"""管理限界を計算
Parameters:
-----------
defects : np.ndarray
各検査単位の欠陥数
"""
# 平均欠陥数の計算
self.c_bar = defects.mean()
# ポアソン分布の標準偏差 = sqrt(平均)
# 管理限界 = 平均 ± 3 * sqrt(平均)
sigma_c = np.sqrt(self.c_bar)
self.ucl = self.c_bar + 3 * sigma_c
self.lcl = self.c_bar - 3 * sigma_c
# LCLは0未満にならない
self.lcl = max(self.lcl, 0)
return self
def predict(self, defects: np.ndarray) -> Dict[str, np.ndarray]:
"""管理状態を判定
Parameters:
-----------
defects : np.ndarray
各検査単位の欠陥数
Returns:
--------
results : dict
'c', 'ooc' を含む辞書
"""
if self.c_bar is None:
raise ValueError("Call fit() first")
# 管理外れの判定
ooc = (defects > self.ucl) | (defects < self.lcl)
return {
'c': defects,
'ooc': ooc
}
def plot(self, defects: np.ndarray, title: str = "C Control Chart"):
"""C管理図のプロット"""
results = self.predict(defects)
n_points = len(defects)
plt.figure(figsize=(12, 6))
# 欠陥数のプロット
plt.plot(range(1, n_points+1), defects,
marker='o', color='#2c3e50', linewidth=1.5,
markersize=6, label='Count of Defects')
# 中心線と管理限界
plt.axhline(self.c_bar, color='#11998e', linestyle='-',
linewidth=2, label=f'CL = {self.c_bar:.2f}')
plt.axhline(self.ucl, color='#e74c3c', linestyle='--',
linewidth=2, label=f'UCL = {self.ucl:.2f}')
plt.axhline(self.lcl, color='#e74c3c', linestyle='--',
linewidth=2, label=f'LCL = {self.lcl:.2f}')
# 管理外れ点を強調
ooc_indices = np.where(results['ooc'])[0]
if len(ooc_indices) > 0:
plt.scatter(ooc_indices + 1, defects[ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
plt.xlabel('Inspection Unit', fontsize=11)
plt.ylabel('Number of Defects', fontsize=11)
plt.title(title, fontsize=13, fontweight='bold')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 使用例: 塗装面の欠陥管理(1m²あたりの欠陥数)
np.random.seed(42)
# Phase I: 初期30枚のパネルで管理限界を設定
n_panels_phase1 = 30
avg_defects = 3.5 # 平均3.5個/m²
# ポアソン分布で欠陥数を生成
defects_phase1 = np.random.poisson(avg_defects, n_panels_phase1)
# C管理図の構築
c_chart = CControlChart()
c_chart.fit(defects_phase1)
print("=== C管理図: 管理限界の設定 ===")
print(f"平均欠陥数 (c-bar) = {c_chart.c_bar:.2f} 個/m²")
print(f"UCL = {c_chart.ucl:.2f} 個/m²")
print(f"LCL = {c_chart.lcl:.2f} 個/m²")
# Phase II: 継続監視(50枚、途中で欠陥増加)
n_panels_phase2 = 50
defects_phase2 = np.random.poisson(avg_defects, n_panels_phase2)
# パネル30-35で欠陥が2倍に増加(塗料劣化)
defects_phase2[29:35] = np.random.poisson(avg_defects * 2, 6)
# 監視
results = c_chart.predict(defects_phase2)
print(f"\n=== Phase II: 継続監視 ===")
print(f"管理外れパネル: {results['ooc'].sum()}枚")
print(f"該当パネル番号: {np.where(results['ooc'])[0] + 1}")
# プロット
c_chart.plot(defects_phase2,
title="Paint Defects Monitoring (per m²)")
# 期待される出力:
# === C管理図: 管理限界の設定 ===
# 平均欠陥数 (c-bar) = 3.40 個/m²
# UCL = 8.93 個/m²
# LCL = 0.00 個/m²
#
# === Phase II: 継続監視 ===
# 管理外れパネル: 2枚
# 該当パネル番号: [30 33]
従来の管理図は大きな変化の検出には優れていますが、小さな変化の早期検知には限界があります。以下の手法はより感度の高い監視を可能にします。
平均値からの偏差を累積することで、小さな変化(0.5-2σ程度)を早期に検出します。シューハート管理図より平均4-8倍早く検知可能。
累積和管理図の実装(表形式CUSUM)
# ===================================
# Example 4: CUSUM (Cumulative Sum) Control Chart
# ===================================
class CUSUMControlChart:
"""CUSUM管理図の実装(表形式CUSUM)
平均値の小さなシフト(0.5-2σ)を早期に検出するための手法。
上側と下側のCUSUM統計量を別々に計算。
Parameters:
-----------
target : float
目標値(プロセス平均)
std : float
プロセス標準偏差
k : float
許容限界(reference value)。通常は検出したいシフトの半分。
デフォルト: 0.5σ (1σシフトを検出する場合)
h : float
決定間隔(decision interval)。管理限界として機能。
デフォルト: 5σ (第一種過誤率 0.27%に相当)
Attributes:
-----------
cusum_high : np.ndarray
上側CUSUM統計量
cusum_low : np.ndarray
下側CUSUM統計量
"""
def __init__(self, target: float, std: float,
k: float = 0.5, h: float = 5.0):
self.target = target
self.std = std
self.k = k * std # 許容限界(絶対値)
self.h = h * std # 決定間隔(絶対値)
self.cusum_high = None
self.cusum_low = None
def fit_predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
"""CUSUM統計量を計算し、管理外れを検出
Parameters:
-----------
data : np.ndarray
観測データ
Returns:
--------
results : dict
'cusum_high', 'cusum_low', 'ooc_high', 'ooc_low' を含む辞書
"""
n = len(data)
self.cusum_high = np.zeros(n)
self.cusum_low = np.zeros(n)
# CUSUM統計量の逐次計算
for i in range(n):
if i == 0:
# 初期値
self.cusum_high[i] = max(0, data[i] - self.target - self.k)
self.cusum_low[i] = max(0, self.target - data[i] - self.k)
else:
# 逐次更新式
self.cusum_high[i] = max(0, self.cusum_high[i-1] +
data[i] - self.target - self.k)
self.cusum_low[i] = max(0, self.cusum_low[i-1] +
self.target - data[i] - self.k)
# 管理外れの判定
ooc_high = self.cusum_high > self.h
ooc_low = self.cusum_low > self.h
return {
'data': data,
'cusum_high': self.cusum_high,
'cusum_low': self.cusum_low,
'ooc_high': ooc_high,
'ooc_low': ooc_low
}
def plot(self, data: np.ndarray, title: str = "CUSUM Control Chart"):
"""CUSUM管理図のプロット"""
results = self.fit_predict(data)
n_points = len(data)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# 元データのプロット
ax1.plot(range(1, n_points+1), data,
marker='o', color='#2c3e50', linewidth=1.5, markersize=5)
ax1.axhline(self.target, color='#11998e', linestyle='-',
linewidth=2, label='Target')
ax1.axhline(self.target + self.k, color='#f39c12',
linestyle=':', linewidth=1.5, label=f'Target ± k')
ax1.axhline(self.target - self.k, color='#f39c12',
linestyle=':', linewidth=1.5)
ax1.set_ylabel('Observation', fontsize=11)
ax1.set_title('Process Data', fontsize=12, fontweight='bold')
ax1.legend(loc='best')
ax1.grid(True, alpha=0.3)
# CUSUM統計量のプロット
ax2.plot(range(1, n_points+1), results['cusum_high'],
marker='^', color='#e74c3c', linewidth=1.5,
markersize=5, label='CUSUM High')
ax2.plot(range(1, n_points+1), results['cusum_low'],
marker='v', color='#3498db', linewidth=1.5,
markersize=5, label='CUSUM Low')
ax2.axhline(self.h, color='#2c3e50', linestyle='--',
linewidth=2, label=f'Decision Interval (h={self.h/self.std:.1f}σ)')
ax2.axhline(0, color='gray', linestyle='-', linewidth=1)
# 管理外れ点を強調
ooc_high_indices = np.where(results['ooc_high'])[0]
ooc_low_indices = np.where(results['ooc_low'])[0]
if len(ooc_high_indices) > 0:
ax2.scatter(ooc_high_indices + 1,
results['cusum_high'][ooc_high_indices],
color='red', s=100, zorder=5, marker='^')
if len(ooc_low_indices) > 0:
ax2.scatter(ooc_low_indices + 1,
results['cusum_low'][ooc_low_indices],
color='red', s=100, zorder=5, marker='v')
ax2.set_xlabel('Observation Number', fontsize=11)
ax2.set_ylabel('CUSUM Statistic', fontsize=11)
ax2.set_title('CUSUM Statistics', fontsize=12, fontweight='bold')
ax2.legend(loc='best')
ax2.grid(True, alpha=0.3)
plt.suptitle(title, fontsize=15, fontweight='bold', y=0.995)
plt.tight_layout()
plt.show()
# 使用例: 化学プロセスの平均シフト検出
np.random.seed(42)
# 目標値と標準偏差
target_value = 100.0
process_std = 1.0
# データ生成(100観測)
n_obs = 100
data = np.random.normal(target_value, process_std, n_obs)
# 観測40から+0.8σの小さなシフト(シューハート管理図では検出困難)
data[40:] += 0.8 * process_std
# CUSUM管理図の適用(1σシフトを検出する設定)
cusum_chart = CUSUMControlChart(
target=target_value,
std=process_std,
k=0.5, # 1σシフトの半分
h=5.0 # 決定間隔
)
results = cusum_chart.fit_predict(data)
print("=== CUSUM管理図 ===")
print(f"目標値: {target_value}")
print(f"許容限界 (k): {cusum_chart.k:.3f} ({cusum_chart.k/process_std:.1f}σ)")
print(f"決定間隔 (h): {cusum_chart.h:.3f} ({cusum_chart.h/process_std:.1f}σ)")
# 検出時点の特定
if results['ooc_high'].any():
first_ooc = np.where(results['ooc_high'])[0][0] + 1
print(f"\n上側シフト検出: 観測 #{first_ooc}")
print(f" 実際のシフト開始: 観測 #41")
print(f" 検出遅延: {first_ooc - 41}観測")
# プロット
cusum_chart.plot(data,
title="CUSUM Chart - Small Mean Shift Detection")
# 期待される出力:
# === CUSUM管理図 ===
# 目標値: 100.0
# 許容限界 (k): 0.500 (0.5σ)
# 決定間隔 (h): 5.000 (5.0σ)
#
# 上側シフト検出: 観測 #49
# 実際のシフト開始: 観測 #41
# 検出遅延: 8観測
指数重み付き移動平均を用いた管理図。CUSUMと同様に小さなシフトを検出。
# ===================================
# Example 5: EWMA (Exponentially Weighted Moving Average) Control Chart
# ===================================
class EWMAControlChart:
"""EWMA管理図の実装
過去の観測値に指数的に減衰する重みをつけて平滑化。
CUSUMと同様に小さなシフトの検出に優れる。
Parameters:
-----------
target : float
目標値(プロセス平均)
std : float
プロセス標準偏差
lambda_ : float
重み係数(0 < λ ≤ 1)。小さいほど過去の影響が大きい。
一般的な値: 0.05-0.25
L : float
管理限界の幅(標準偏差の倍数)。デフォルト: 3.0
Attributes:
-----------
ewma : np.ndarray
EWMA統計量
"""
def __init__(self, target: float, std: float,
lambda_: float = 0.2, L: float = 3.0):
if not 0 < lambda_ <= 1:
raise ValueError("lambda_ must be in (0, 1]")
self.target = target
self.std = std
self.lambda_ = lambda_
self.L = L
self.ewma = None
self.ucl = None
self.lcl = None
def fit_predict(self, data: np.ndarray) -> Dict[str, np.ndarray]:
"""EWMA統計量を計算し、管理外れを検出
Parameters:
-----------
data : np.ndarray
観測データ
Returns:
--------
results : dict
'ewma', 'ucl', 'lcl', 'ooc' を含む辞書
"""
n = len(data)
self.ewma = np.zeros(n)
self.ucl = np.zeros(n)
self.lcl = np.zeros(n)
# EWMA統計量の逐次計算
# Z_t = λ * X_t + (1-λ) * Z_{t-1}
# 初期値: Z_0 = target
self.ewma[0] = self.lambda_ * data[0] + (1 - self.lambda_) * self.target
for i in range(1, n):
self.ewma[i] = self.lambda_ * data[i] + (1 - self.lambda_) * self.ewma[i-1]
# 管理限界の計算(時間依存)
# σ_Z(t) = σ * sqrt(λ/(2-λ) * [1 - (1-λ)^(2t)])
for i in range(n):
t = i + 1
sigma_z = self.std * np.sqrt(
(self.lambda_ / (2 - self.lambda_)) *
(1 - (1 - self.lambda_)**(2 * t))
)
self.ucl[i] = self.target + self.L * sigma_z
self.lcl[i] = self.target - self.L * sigma_z
# 管理外れの判定
ooc = (self.ewma > self.ucl) | (self.ewma < self.lcl)
return {
'data': data,
'ewma': self.ewma,
'ucl': self.ucl,
'lcl': self.lcl,
'ooc': ooc
}
def plot(self, data: np.ndarray, title: str = "EWMA Control Chart"):
"""EWMA管理図のプロット"""
results = self.fit_predict(data)
n_points = len(data)
plt.figure(figsize=(12, 6))
# EWMA統計量のプロット
plt.plot(range(1, n_points+1), results['ewma'],
marker='o', color='#2c3e50', linewidth=2,
markersize=5, label='EWMA')
# 元データ(薄く)
plt.plot(range(1, n_points+1), data,
marker='.', color='gray', linewidth=0.5,
markersize=3, alpha=0.5, label='Original Data')
# 中心線と管理限界
plt.axhline(self.target, color='#11998e', linestyle='-',
linewidth=2, label='Target')
plt.plot(range(1, n_points+1), results['ucl'],
color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
plt.plot(range(1, n_points+1), results['lcl'],
color='#e74c3c', linestyle='--', linewidth=2, label='LCL')
# 管理外れ点を強調
ooc_indices = np.where(results['ooc'])[0]
if len(ooc_indices) > 0:
plt.scatter(ooc_indices + 1, results['ewma'][ooc_indices],
color='red', s=100, zorder=5, label='Out of Control')
plt.xlabel('Observation Number', fontsize=11)
plt.ylabel('EWMA Statistic', fontsize=11)
plt.title(title + f" (λ={self.lambda_}, L={self.L})",
fontsize=13, fontweight='bold')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 使用例: 医薬品製造の含量管理
np.random.seed(42)
# 目標値と標準偏差
target_content = 50.0 # mg
process_std = 0.5 # mg
# データ生成(120観測)
n_obs = 120
data = np.random.normal(target_content, process_std, n_obs)
# 観測60から+0.5σの小さなシフト
data[60:] += 0.5 * process_std
# EWMA管理図の適用(λ=0.2で適度な平滑化)
ewma_chart = EWMAControlChart(
target=target_content,
std=process_std,
lambda_=0.2,
L=3.0
)
results = ewma_chart.fit_predict(data)
print("=== EWMA管理図 ===")
print(f"目標値: {target_content} mg")
print(f"重み係数 (λ): {ewma_chart.lambda_}")
print(f"管理限界幅 (L): {ewma_chart.L}σ")
# 検出時点の特定
if results['ooc'].any():
first_ooc = np.where(results['ooc'])[0][0] + 1
print(f"\nシフト検出: 観測 #{first_ooc}")
print(f" 実際のシフト開始: 観測 #61")
print(f" 検出遅延: {first_ooc - 61}観測")
# プロット
ewma_chart.plot(data,
title="EWMA Chart - Drug Content Monitoring")
# 期待される出力:
# === EWMA管理図 ===
# 目標値: 50.0 mg
# 重み係数 (λ): 0.2
# 管理限界幅 (L): 3.0σ
#
# シフト検出: 観測 #68
# 実際のシフト開始: 観測 #61
# 検出遅延: 7観測
管理図でプロセスが統計的に安定していることを確認した上で、プロセス能力指数を計算します。
管理図とプロセス能力分析の統合実装
# ===================================
# Example 6: Integrated Process Capability Analysis
# ===================================
from scipy import stats
class ProcessCapabilityAnalysis:
"""管理図とプロセス能力分析の統合クラス
1. 管理図でプロセスの安定性を確認
2. 安定したプロセスの能力指数を計算
3. 総合評価レポートを生成
"""
def __init__(self, LSL: float, USL: float, target: float = None):
"""
Parameters:
-----------
LSL : float
下方規格限界 (Lower Specification Limit)
USL : float
上方規格限界 (Upper Specification Limit)
target : float, optional
目標値。省略時は規格の中央値
"""
self.LSL = LSL
self.USL = USL
self.target = target if target is not None else (LSL + USL) / 2
self.control_chart = None
self.capability_results = None
def analyze(self, data: np.ndarray,
subgroup_size: int = 5) -> Dict:
"""統合分析の実行
Parameters:
-----------
data : np.ndarray
shape (n_subgroups, subgroup_size) のデータ
subgroup_size : int
サブグループのサイズ
Returns:
--------
results : dict
管理図情報とプロセス能力指数を含む辞書
"""
# Step 1: X-bar and R管理図で安定性を確認
self.control_chart = XbarRControlChart(subgroup_size)
self.control_chart.fit(data)
chart_results = self.control_chart.predict(data)
# 管理外れ点の確認
n_ooc_xbar = chart_results['xbar_ooc'].sum()
n_ooc_r = chart_results['r_ooc'].sum()
is_stable = (n_ooc_xbar == 0) and (n_ooc_r == 0)
# Step 2: プロセス能力指数の計算(安定している場合のみ推奨)
all_data = data.flatten()
process_mean = all_data.mean()
process_std = all_data.std(ddof=1)
# Cp: プロセス能力指数(中心化されている前提)
Cp = (self.USL - self.LSL) / (6 * process_std)
# Cpk: 修正プロセス能力指数(実際の平均位置を考慮)
Cpu = (self.USL - process_mean) / (3 * process_std)
Cpl = (process_mean - self.LSL) / (3 * process_std)
Cpk = min(Cpu, Cpl)
# Cpm: タグチ能力指数(目標値からのずれを考慮)
tau = np.sqrt(process_std**2 + (process_mean - self.target)**2)
Cpm = (self.USL - self.LSL) / (6 * tau)
# 規格外れ率の推定(正規分布を仮定)
z_lower = (self.LSL - process_mean) / process_std
z_upper = (self.USL - process_mean) / process_std
ppm_lower = stats.norm.cdf(z_lower) * 1e6
ppm_upper = (1 - stats.norm.cdf(z_upper)) * 1e6
ppm_total = ppm_lower + ppm_upper
# Step 3: 総合評価
self.capability_results = {
'stable': is_stable,
'n_ooc_xbar': n_ooc_xbar,
'n_ooc_r': n_ooc_r,
'process_mean': process_mean,
'process_std': process_std,
'Cp': Cp,
'Cpk': Cpk,
'Cpm': Cpm,
'Cpu': Cpu,
'Cpl': Cpl,
'ppm_lower': ppm_lower,
'ppm_upper': ppm_upper,
'ppm_total': ppm_total,
'LSL': self.LSL,
'USL': self.USL,
'target': self.target
}
return self.capability_results
def print_report(self):
"""総合評価レポートの出力"""
if self.capability_results is None:
raise ValueError("Call analyze() first")
r = self.capability_results
print("=" * 60)
print("統合プロセス能力分析レポート")
print("=" * 60)
print("\n【1. プロセス安定性(管理図)】")
if r['stable']:
print(" ✅ プロセスは統計的に安定しています")
else:
print(" ⚠️ プロセスが不安定です(管理外れ点あり)")
print(f" X-bar管理外れ: {r['n_ooc_xbar']}点")
print(f" R管理外れ: {r['n_ooc_r']}点")
print(" → まずプロセスを安定化させてください")
print("\n【2. プロセス特性】")
print(f" プロセス平均: {r['process_mean']:.4f}")
print(f" プロセス標準偏差: {r['process_std']:.4f}")
print(f" 規格範囲: [{r['LSL']:.2f}, {r['USL']:.2f}]")
print(f" 目標値: {r['target']:.2f}")
print("\n【3. プロセス能力指数】")
print(f" Cp = {r['Cp']:.3f} ", end="")
print(self._evaluate_index(r['Cp']))
print(f" Cpk = {r['Cpk']:.3f} ", end="")
print(self._evaluate_index(r['Cpk']))
print(f" (Cpu={r['Cpu']:.3f}, Cpl={r['Cpl']:.3f})")
print(f" Cpm = {r['Cpm']:.3f} ", end="")
print(self._evaluate_index(r['Cpm']))
print("\n【4. 規格外れ率推定】")
print(f" 下限外れ: {r['ppm_lower']:.1f} ppm")
print(f" 上限外れ: {r['ppm_upper']:.1f} ppm")
print(f" 合計: {r['ppm_total']:.1f} ppm ({r['ppm_total']/1e4:.3f}%)")
print("\n【5. 総合判定】")
if r['stable'] and r['Cpk'] >= 1.33:
print(" ✅ プロセスは安定かつ十分な能力があります")
elif r['stable'] and r['Cpk'] >= 1.0:
print(" ⚠️ プロセスは安定していますが、能力改善が必要です")
else:
print(" ❌ プロセス改善が必要です")
print("=" * 60)
def _evaluate_index(self, value: float) -> str:
"""能力指数の評価"""
if value >= 1.67:
return "(優秀)"
elif value >= 1.33:
return "(良好)"
elif value >= 1.0:
return "(最低限)"
else:
return "(不十分)"
def plot_capability(self, data: np.ndarray):
"""プロセス能力のヒストグラムと正規分布フィット"""
if self.capability_results is None:
raise ValueError("Call analyze() first")
r = self.capability_results
all_data = data.flatten()
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
# ヒストグラムと正規分布フィット
ax1.hist(all_data, bins=30, density=True, alpha=0.6,
color='#3498db', edgecolor='black', label='Data')
# 正規分布曲線
x = np.linspace(all_data.min(), all_data.max(), 200)
pdf = stats.norm.pdf(x, r['process_mean'], r['process_std'])
ax1.plot(x, pdf, 'r-', linewidth=2, label='Normal Fit')
# 規格限界と目標値
ax1.axvline(r['LSL'], color='#e74c3c', linestyle='--',
linewidth=2, label='LSL')
ax1.axvline(r['USL'], color='#e74c3c', linestyle='--',
linewidth=2, label='USL')
ax1.axvline(r['target'], color='#11998e', linestyle='-',
linewidth=2, label='Target')
ax1.set_xlabel('Value', fontsize=11)
ax1.set_ylabel('Density', fontsize=11)
ax1.set_title('Process Capability Histogram',
fontsize=13, fontweight='bold')
ax1.legend(loc='best')
ax1.grid(True, alpha=0.3)
# 能力指数の棒グラフ
indices = ['Cp', 'Cpk', 'Cpm']
values = [r['Cp'], r['Cpk'], r['Cpm']]
colors = ['#3498db', '#e74c3c', '#11998e']
bars = ax2.bar(indices, values, color=colors, alpha=0.7,
edgecolor='black', linewidth=1.5)
# 基準線
ax2.axhline(1.33, color='green', linestyle='--',
linewidth=2, label='Good (1.33)', alpha=0.7)
ax2.axhline(1.0, color='orange', linestyle='--',
linewidth=2, label='Minimum (1.0)', alpha=0.7)
# 値をバーの上に表示
for bar, value in zip(bars, values):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{value:.3f}',
ha='center', va='bottom', fontweight='bold')
ax2.set_ylabel('Index Value', fontsize=11)
ax2.set_title('Process Capability Indices',
fontsize=13, fontweight='bold')
ax2.legend(loc='best')
ax2.grid(True, alpha=0.3, axis='y')
ax2.set_ylim(0, max(values) * 1.3)
plt.tight_layout()
plt.show()
# 使用例: 射出成形品の寸法管理
np.random.seed(42)
# 規格設定
LSL = 49.5 # mm
USL = 50.5 # mm
target = 50.0 # mm
# データ生成(40サブグループ、サイズ5)
n_subgroups = 40
subgroup_size = 5
# 最初の30サブグループは安定(平均50.0、標準偏差0.15)
data_stable = np.random.normal(50.0, 0.15, (30, subgroup_size))
# 後の10サブグループで平均が若干シフト(50.0 → 50.1)
data_shift = np.random.normal(50.1, 0.15, (10, subgroup_size))
data = np.vstack([data_stable, data_shift])
# 統合分析の実行
analysis = ProcessCapabilityAnalysis(LSL, USL, target)
results = analysis.analyze(data, subgroup_size)
# レポート出力
analysis.print_report()
# 可視化
analysis.control_chart.plot(data,
title="Injection Molding - Dimension Control")
analysis.plot_capability(data)
# 期待される出力:
# ============================================================
# 統合プロセス能力分析レポート
# ============================================================
#
# 【1. プロセス安定性(管理図)】
# ⚠️ プロセスが不安定です(管理外れ点あり)
# X-bar管理外れ: 6点
# R管理外れ: 0点
# → まずプロセスを安定化させてください
#
# 【2. プロセス特性】
# プロセス平均: 50.0266
# プロセス標準偏差: 0.1505
# 規格範囲: [49.50, 50.50]
# 目標値: 50.00
#
# 【3. プロセス能力指数】
# Cp = 1.107 (最低限)
# Cpk = 1.049 (最低限)
# (Cpu=1.049, Cpl=1.165)
# Cpm = 1.089 (最低限)
#
# 【4. 規格外れ率推定】
# 下限外れ: 0.1 ppm
# 上限外れ: 8.6 ppm
# 合計: 8.7 ppm (0.001%)
#
# 【5. 総合判定】
# ⚠️ プロセスは安定していますが、能力改善が必要です
# ============================================================
単一点の管理限界超過だけでなく、データのパターンから異常を検知する高度なルール集です。
8つの判定ルールの実装
# ===================================
# Example 7: Western Electric Rules for Pattern Detection
# ===================================
class WesternElectricRules:
"""Western Electric Rulesの実装
管理図上のデータパターンから異常を検知する8つのルール。
単一点の管理限界超過だけでなく、傾向やランなども検出。
Rules:
------
Rule 1: 1点が3σを超える
Rule 2: 連続3点中2点が2σと3σの間
Rule 3: 連続5点中4点が1σと2σの間
Rule 4: 連続8点が同じ側(中心線から)
Rule 5: 連続6点が単調増加または減少
Rule 6: 連続14点が交互に上下
Rule 7: 連続15点が1σ以内
Rule 8: 連続8点が1σ外側
"""
def __init__(self, center_line: float, ucl: float, lcl: float):
"""
Parameters:
-----------
center_line : float
中心線(平均)
ucl : float
上方管理限界(CL + 3σ)
lcl : float
下方管理限界(CL - 3σ)
"""
self.cl = center_line
self.ucl = ucl
self.lcl = lcl
# ゾーン境界の計算
sigma = (ucl - center_line) / 3
self.zone_a_upper = center_line + 2 * sigma
self.zone_a_lower = center_line - 2 * sigma
self.zone_b_upper = center_line + sigma
self.zone_b_lower = center_line - sigma
def apply_rules(self, data: np.ndarray) -> Dict[str, np.ndarray]:
"""全ルールを適用
Parameters:
-----------
data : np.ndarray
時系列データ
Returns:
--------
violations : dict
各ルールの違反点を示すブール配列
"""
n = len(data)
violations = {f'Rule{i}': np.zeros(n, dtype=bool) for i in range(1, 9)}
for i in range(n):
# Rule 1: 1点が3σを超える
if data[i] > self.ucl or data[i] < self.lcl:
violations['Rule1'][i] = True
# Rule 2: 連続3点中2点が2σと3σの間
if i >= 2:
window = data[i-2:i+1]
in_zone_a = np.sum((window > self.zone_a_upper) |
(window < self.zone_a_lower))
if in_zone_a >= 2:
violations['Rule2'][i] = True
# Rule 3: 連続5点中4点が1σと2σの間
if i >= 4:
window = data[i-4:i+1]
in_zone_b = np.sum(
((window > self.zone_b_upper) & (window <= self.zone_a_upper)) |
((window < self.zone_b_lower) & (window >= self.zone_a_lower))
)
if in_zone_b >= 4:
violations['Rule3'][i] = True
# Rule 4: 連続8点が同じ側
if i >= 7:
window = data[i-7:i+1]
if np.all(window > self.cl) or np.all(window < self.cl):
violations['Rule4'][i] = True
# Rule 5: 連続6点が単調増加または減少
if i >= 5:
window = data[i-5:i+1]
diffs = np.diff(window)
if np.all(diffs > 0) or np.all(diffs < 0):
violations['Rule5'][i] = True
# Rule 6: 連続14点が交互に上下
if i >= 13:
window = data[i-13:i+1]
diffs = np.diff(window)
signs = np.sign(diffs)
# 符号が交互に変わるか確認
alternating = True
for j in range(len(signs) - 1):
if signs[j] * signs[j+1] >= 0:
alternating = False
break
if alternating:
violations['Rule6'][i] = True
# Rule 7: 連続15点が1σ以内
if i >= 14:
window = data[i-14:i+1]
if np.all((window >= self.zone_b_lower) &
(window <= self.zone_b_upper)):
violations['Rule7'][i] = True
# Rule 8: 連続8点が1σ外側
if i >= 7:
window = data[i-7:i+1]
if np.all((window > self.zone_b_upper) |
(window < self.zone_b_lower)):
violations['Rule8'][i] = True
return violations
def plot_with_zones(self, data: np.ndarray,
title: str = "Control Chart with Western Electric Rules"):
"""ゾーン付き管理図のプロット"""
violations = self.apply_rules(data)
n = len(data)
# 全違反点を統合
any_violation = np.zeros(n, dtype=bool)
for rule_violations in violations.values():
any_violation |= rule_violations
plt.figure(figsize=(14, 7))
# ゾーンを色分け表示
plt.axhspan(self.ucl, self.zone_a_upper,
color='#e74c3c', alpha=0.1, label='Zone A (2-3σ)')
plt.axhspan(self.zone_a_upper, self.zone_b_upper,
color='#f39c12', alpha=0.1, label='Zone B (1-2σ)')
plt.axhspan(self.zone_b_upper, self.cl,
color='#2ecc71', alpha=0.1, label='Zone C (0-1σ)')
plt.axhspan(self.cl, self.zone_b_lower,
color='#2ecc71', alpha=0.1)
plt.axhspan(self.zone_b_lower, self.zone_a_lower,
color='#f39c12', alpha=0.1)
plt.axhspan(self.zone_a_lower, self.lcl,
color='#e74c3c', alpha=0.1)
# データプロット
plt.plot(range(1, n+1), data, marker='o', color='#2c3e50',
linewidth=1.5, markersize=5, label='Data', zorder=3)
# 管理限界
plt.axhline(self.cl, color='#11998e', linestyle='-',
linewidth=2, label='CL', zorder=2)
plt.axhline(self.ucl, color='#e74c3c', linestyle='--',
linewidth=2, label='UCL (3σ)', zorder=2)
plt.axhline(self.lcl, color='#e74c3c', linestyle='--',
linewidth=2, label='LCL (3σ)', zorder=2)
# ゾーン境界(点線)
plt.axhline(self.zone_a_upper, color='gray', linestyle=':',
linewidth=1, alpha=0.5, zorder=1)
plt.axhline(self.zone_a_lower, color='gray', linestyle=':',
linewidth=1, alpha=0.5, zorder=1)
plt.axhline(self.zone_b_upper, color='gray', linestyle=':',
linewidth=1, alpha=0.5, zorder=1)
plt.axhline(self.zone_b_lower, color='gray', linestyle=':',
linewidth=1, alpha=0.5, zorder=1)
# 違反点を強調
if any_violation.any():
violation_indices = np.where(any_violation)[0]
plt.scatter(violation_indices + 1, data[violation_indices],
color='red', s=150, marker='X', zorder=5,
label='Rule Violation')
plt.xlabel('Observation Number', fontsize=11)
plt.ylabel('Value', fontsize=11)
plt.title(title, fontsize=13, fontweight='bold')
plt.legend(loc='best', ncol=2)
plt.grid(True, alpha=0.3, zorder=0)
plt.tight_layout()
plt.show()
# 違反サマリーの出力
print("\n=== Western Electric Rules Violations ===")
for rule_name, rule_violations in violations.items():
n_violations = rule_violations.sum()
if n_violations > 0:
indices = np.where(rule_violations)[0] + 1
print(f"{rule_name}: {n_violations}回 "
f"(観測 {list(indices[:5])}{'...' if n_violations > 5 else ''})")
# 使用例: 様々な異常パターンの検出
np.random.seed(42)
# データ生成(100観測)
n = 100
data = np.random.normal(0, 1, n)
# 異常パターンを挿入
# Rule 1違反: 3σ超過
data[20] = 3.5
# Rule 4違反: 連続8点が上側
data[40:48] += 0.5
# Rule 5違反: 単調増加傾向
data[65:71] = np.linspace(0, 2, 6)
# Rule 7違反: 変動縮小(1σ以内)
data[85:100] = np.random.normal(0, 0.2, 15)
# Western Electric Rulesの適用
cl = 0.0
ucl = 3.0
lcl = -3.0
we_rules = WesternElectricRules(cl, ucl, lcl)
we_rules.plot_with_zones(data,
title="Western Electric Rules - Anomaly Pattern Detection")
# 期待される出力:
# === Western Electric Rules Violations ===
# Rule1: 1回 (観測 [21])
# Rule4: 1回 (観測 [48])
# Rule5: 1回 (観測 [71])
# Rule7: 1回 (観測 [100])
実際の製造現場で使える逐次更新型の監視システム
# ===================================
# Example 8: Real-time SPC Monitoring System
# ===================================
import time
from datetime import datetime
from collections import deque
class RealTimeSPCMonitor:
"""リアルタイムSPC監視システム
新しいデータが到着するたびに逐次的に管理図を更新し、
異常を即座に検出・通知するシステム。
Features:
---------
- EWMA、CUSUM、Shewhart管理図を同時監視
- Western Electric Rulesによる高感度検出
- アラート通知機能
- 履歴データの保持と可視化
"""
def __init__(self, target: float, std: float,
window_size: int = 100):
"""
Parameters:
-----------
target : float
目標値
std : float
プロセス標準偏差
window_size : int
表示する履歴データの最大数
"""
self.target = target
self.std = std
self.window_size = window_size
# 管理限界
self.ucl = target + 3 * std
self.lcl = target - 3 * std
# 各手法の初期化
self.ewma_chart = EWMAControlChart(target, std, lambda_=0.2)
self.cusum_chart = CUSUMControlChart(target, std, k=0.5, h=5.0)
self.we_rules = WesternElectricRules(target, self.ucl, self.lcl)
# データ履歴(dequeで効率的に管理)
self.data_history = deque(maxlen=window_size)
self.timestamps = deque(maxlen=window_size)
self.ewma_history = deque(maxlen=window_size)
# アラート履歴
self.alerts = []
def add_observation(self, value: float,
timestamp: datetime = None) -> Dict:
"""新しい観測値を追加し、異常検出を実行
Parameters:
-----------
value : float
新しい観測値
timestamp : datetime, optional
タイムスタンプ(省略時は現在時刻)
Returns:
--------
status : dict
検出結果とアラート情報
"""
if timestamp is None:
timestamp = datetime.now()
# データを追加
self.data_history.append(value)
self.timestamps.append(timestamp)
# 異常検出
alerts = []
# 1. Shewhart管理図(3σルール)
if value > self.ucl:
alerts.append({
'method': 'Shewhart',
'type': 'UCL violation',
'value': value,
'severity': 'HIGH'
})
elif value < self.lcl:
alerts.append({
'method': 'Shewhart',
'type': 'LCL violation',
'value': value,
'severity': 'HIGH'
})
# 2. EWMA管理図
if len(self.data_history) >= 1:
data_array = np.array(self.data_history)
ewma_results = self.ewma_chart.fit_predict(data_array)
current_ewma = ewma_results['ewma'][-1]
current_ucl = ewma_results['ucl'][-1]
current_lcl = ewma_results['lcl'][-1]
self.ewma_history.append(current_ewma)
if ewma_results['ooc'][-1]:
alerts.append({
'method': 'EWMA',
'type': 'EWMA limit violation',
'value': current_ewma,
'severity': 'MEDIUM'
})
# 3. CUSUM管理図
if len(self.data_history) >= 1:
data_array = np.array(self.data_history)
cusum_results = self.cusum_chart.fit_predict(data_array)
if cusum_results['ooc_high'][-1]:
alerts.append({
'method': 'CUSUM',
'type': 'Upward shift detected',
'value': cusum_results['cusum_high'][-1],
'severity': 'MEDIUM'
})
if cusum_results['ooc_low'][-1]:
alerts.append({
'method': 'CUSUM',
'type': 'Downward shift detected',
'value': cusum_results['cusum_low'][-1],
'severity': 'MEDIUM'
})
# 4. Western Electric Rules
if len(self.data_history) >= 15: # 最低限のデータが必要
data_array = np.array(self.data_history)
violations = self.we_rules.apply_rules(data_array)
for rule_name, rule_violations in violations.items():
if rule_violations[-1]: # 最新点で違反
alerts.append({
'method': 'Western Electric',
'type': f'{rule_name} violation',
'value': value,
'severity': 'LOW'
})
# アラート保存
if alerts:
for alert in alerts:
alert['timestamp'] = timestamp
self.alerts.append(alert)
# ステータス返却
status = {
'value': value,
'timestamp': timestamp,
'in_control': len(alerts) == 0,
'alerts': alerts,
'n_observations': len(self.data_history)
}
return status
def get_summary(self) -> Dict:
"""現在の監視状況のサマリー"""
if len(self.data_history) == 0:
return {'status': 'No data'}
data_array = np.array(self.data_history)
return {
'n_observations': len(self.data_history),
'current_value': data_array[-1],
'process_mean': data_array.mean(),
'process_std': data_array.std(ddof=1),
'target': self.target,
'n_alerts_total': len(self.alerts),
'n_alerts_last_10': sum(1 for a in self.alerts
if len(self.data_history) -
list(self.timestamps).index(a['timestamp']) <= 10),
'alert_methods': list(set(a['method'] for a in self.alerts))
}
def plot_dashboard(self):
"""リアルタイム監視ダッシュボード"""
if len(self.data_history) < 2:
print("データが不足しています(最低2点必要)")
return
data_array = np.array(self.data_history)
n = len(data_array)
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
# 1. Shewhart管理図
ax1 = axes[0]
x_indices = range(1, n+1)
ax1.plot(x_indices, data_array, marker='o', color='#2c3e50',
linewidth=1.5, markersize=4, label='Data')
ax1.axhline(self.target, color='#11998e', linestyle='-',
linewidth=2, label='Target')
ax1.axhline(self.ucl, color='#e74c3c', linestyle='--',
linewidth=2, label='UCL')
ax1.axhline(self.lcl, color='#e74c3c', linestyle='--',
linewidth=2, label='LCL')
# Shewhart違反点を強調
ooc_indices = (data_array > self.ucl) | (data_array < self.lcl)
if ooc_indices.any():
ooc_x = np.where(ooc_indices)[0] + 1
ax1.scatter(ooc_x, data_array[ooc_indices],
color='red', s=100, zorder=5)
ax1.set_ylabel('Value', fontsize=10)
ax1.set_title('Shewhart Control Chart (3σ)',
fontsize=12, fontweight='bold')
ax1.legend(loc='best', fontsize=9)
ax1.grid(True, alpha=0.3)
# 2. EWMA管理図
ax2 = axes[1]
if len(self.ewma_history) > 0:
ewma_array = np.array(self.ewma_history)
ewma_results = self.ewma_chart.fit_predict(data_array)
ax2.plot(x_indices, data_array, marker='.', color='gray',
linewidth=0.5, markersize=3, alpha=0.5, label='Original')
ax2.plot(x_indices, ewma_array, marker='o', color='#3498db',
linewidth=2, markersize=4, label='EWMA')
ax2.axhline(self.target, color='#11998e', linestyle='-',
linewidth=2, label='Target')
ax2.plot(x_indices, ewma_results['ucl'],
color='#e74c3c', linestyle='--', linewidth=2, label='UCL')
ax2.plot(x_indices, ewma_results['lcl'],
color='#e74c3c', linestyle='--', linewidth=2, label='LCL')
# EWMA違反点を強調
if ewma_results['ooc'].any():
ooc_x = np.where(ewma_results['ooc'])[0] + 1
ax2.scatter(ooc_x, ewma_array[ewma_results['ooc']],
color='red', s=100, zorder=5)
ax2.set_ylabel('EWMA Value', fontsize=10)
ax2.set_title('EWMA Control Chart (λ=0.2)',
fontsize=12, fontweight='bold')
ax2.legend(loc='best', fontsize=9)
ax2.grid(True, alpha=0.3)
# 3. CUSUM管理図
ax3 = axes[2]
cusum_results = self.cusum_chart.fit_predict(data_array)
ax3.plot(x_indices, cusum_results['cusum_high'],
marker='^', color='#e74c3c', linewidth=1.5,
markersize=4, label='CUSUM High')
ax3.plot(x_indices, cusum_results['cusum_low'],
marker='v', color='#3498db', linewidth=1.5,
markersize=4, label='CUSUM Low')
ax3.axhline(self.cusum_chart.h, color='#2c3e50',
linestyle='--', linewidth=2, label='Decision Interval (h)')
ax3.axhline(0, color='gray', linestyle='-', linewidth=1)
# CUSUM違反点を強調
if cusum_results['ooc_high'].any():
ooc_x = np.where(cusum_results['ooc_high'])[0] + 1
ax3.scatter(ooc_x, cusum_results['cusum_high'][cusum_results['ooc_high']],
color='red', s=100, zorder=5, marker='^')
if cusum_results['ooc_low'].any():
ooc_x = np.where(cusum_results['ooc_low'])[0] + 1
ax3.scatter(ooc_x, cusum_results['cusum_low'][cusum_results['ooc_low']],
color='red', s=100, zorder=5, marker='v')
ax3.set_xlabel('Observation Number', fontsize=10)
ax3.set_ylabel('CUSUM Value', fontsize=10)
ax3.set_title('CUSUM Control Chart',
fontsize=12, fontweight='bold')
ax3.legend(loc='best', fontsize=9)
ax3.grid(True, alpha=0.3)
plt.suptitle('Real-time SPC Monitoring Dashboard',
fontsize=15, fontweight='bold', y=0.995)
plt.tight_layout()
plt.show()
# 使用例: リアルタイム監視のシミュレーション
np.random.seed(42)
# 監視システムの初期化
target = 100.0
std = 1.0
monitor = RealTimeSPCMonitor(target, std, window_size=150)
print("=== リアルタイムSPC監視システム ===")
print(f"目標値: {target}")
print(f"標準偏差: {std}")
print("\n観測データを逐次追加中...\n")
# シミュレーション: 150観測を逐次追加
for i in range(150):
# 正常データ
if i < 60:
value = np.random.normal(target, std)
# 観測60-80: +1σシフト
elif i < 80:
value = np.random.normal(target + std, std)
# 観測80-120: 正常に戻る
elif i < 120:
value = np.random.normal(target, std)
# 観測120-150: 変動増大
else:
value = np.random.normal(target, std * 1.5)
# 観測値を追加
status = monitor.add_observation(value)
# アラートがあれば表示
if not status['in_control']:
print(f"⚠️ 観測 #{i+1}: アラート発生")
for alert in status['alerts']:
print(f" - {alert['method']}: {alert['type']} "
f"(重要度: {alert['severity']})")
# サマリー表示
print("\n=== 監視サマリー ===")
summary = monitor.get_summary()
for key, value in summary.items():
print(f"{key}: {value}")
# ダッシュボード表示
monitor.plot_dashboard()
# 期待される出力:
# === リアルタイムSPC監視システム ===
# 目標値: 100.0
# 標準偏差: 1.0
#
# 観測データを逐次追加中...
#
# ⚠️ 観測 #67: アラート発生
# - EWMA: EWMA limit violation (重要度: MEDIUM)
# - CUSUM: Upward shift detected (重要度: MEDIUM)
# ...(複数のアラート)
#
# === 監視サマリー ===
# n_observations: 150
# current_value: 101.234
# process_mean: 100.567
# process_std: 1.234
# target: 100.0
# n_alerts_total: 45
# n_alerts_last_10: 8
# alert_methods: ['EWMA', 'CUSUM', 'Western Electric']
この章を完了すると、以下を説明・実装できるようになります:
Q1: X-bar管理図の上方管理限界(UCL)が80.5、中心線(CL)が79.0の場合、下方管理限界(LCL)はいくつですか?
正解: 77.5
解説:
UCL = CL + 3σ
LCL = CL - 3σ
UCL - CL = 80.5 - 79.0 = 1.5 = 3σ
したがって、LCL = CL - 1.5 = 79.0 - 1.5 = 77.5
Q2: あるプロセスの平均不良率が2%、サンプルサイズが100個の場合、P管理図の上方管理限界(UCL)を計算してください。
正解: UCL = 0.062 (6.2%)
計算:
p̄ = 0.02
n = 100
σ_p = √(p̄(1-p̄)/n) = √(0.02 × 0.98 / 100) = √0.000196 = 0.014
UCL = p̄ + 3σ_p = 0.02 + 3 × 0.014 = 0.02 + 0.042 = 0.062
重要ポイント: サンプルサイズが大きいほど管理限界は中心線に近づきます(検出力向上)。
Q3: CUSUMとEWMAはどちらも小さなシフト検出に優れていますが、それぞれの長所と短所を比較し、どのような状況でどちらを選ぶべきか説明してください。
比較表:
| 項目 | CUSUM | EWMA |
|---|---|---|
| 検出速度 | 非常に高速(ARL優秀) | 高速 |
| パラメータ設定 | やや難(k, h) | 比較的容易(λ) |
| 解釈のしやすさ | 難(累積統計量) | 易(平滑化された値) |
| シフト方向検出 | 上下別々(明確) | 単一統計量 |
| 実装の複雑さ | 中程度 | シンプル |
選択基準:
ベストプラクティス: 重要なプロセスでは両方を併用し、相互補完的に監視することを推奨します。