Dramatically Improve Quality with Data-Driven Problem Solving Framework
Six Sigma is a quality management methodology developed at Motorola in the 1980s. It statistically manages process variation to keep defects below 3.4 defects per million opportunities (3.4 DPMO).
Sigma level represents the distance from the process mean to the specification limit, expressed as multiples of standard deviation (Ο). Higher levels indicate better quality.
| Sigma Level | DPMO | Yield | Quality Rating |
|---|---|---|---|
| 2Ο | 308,537 | 69.1% | Very Poor |
| 3Ο | 66,807 | 93.3% | Poor |
| 4Ο | 6,210 | 99.38% | Average |
| 5Ο | 233 | 99.977% | Good |
| 6Ο | 3.4 | 99.99966% | World Class |
Calculate DPMO and sigma level from defect data.
# ===================================
# Example 1: DPMO (Defects Per Million Opportunities) Calculation
# ===================================
import numpy as np
import pandas as pd
from scipy import stats
import matplotlib.pyplot as plt
from typing import Dict, Tuple
class SigmaLevelCalculator:
"""Sigma level and DPMO calculation class
Calculate metrics to assess manufacturing process quality level.
Parameters:
-----------
shift : float
1.5 sigma shift (accounting for long-term variation)
Default: 1.5 (Motorola empirical rule)
"""
def __init__(self, shift: float = 1.5):
self.shift = shift
def calculate_dpmo(self, defects: int, units: int,
opportunities: int) -> float:
"""Calculate DPMO
Parameters:
-----------
defects : int
Number of defects detected
units : int
Number of inspection units
opportunities : int
Number of defect opportunities per unit
Returns:
--------
dpmo : float
DPMO (defects per million opportunities)
"""
total_opportunities = units * opportunities
dpo = defects / total_opportunities # Defects Per Opportunity
dpmo = dpo * 1_000_000
return dpmo
def dpmo_to_sigma(self, dpmo: float,
include_shift: bool = True) -> float:
"""Calculate sigma level from DPMO
Parameters:
-----------
dpmo : float
DPMO value
include_shift : bool
Whether to consider 1.5 sigma shift
Returns:
--------
sigma_level : float
Sigma level
"""
# Convert DPMO to yield (complement of defect rate)
yield_rate = 1 - (dpmo / 1_000_000)
# Calculate Z value with inverse of normal cumulative distribution function
# For bilateral specifications, use one-sided DPMO
z_score = stats.norm.ppf(yield_rate)
# Consider 1.5 sigma shift
if include_shift:
sigma_level = z_score + self.shift
else:
sigma_level = z_score
return sigma_level
def sigma_to_dpmo(self, sigma_level: float,
include_shift: bool = True) -> float:
"""Calculate DPMO from sigma level
Parameters:
-----------
sigma_level : float
Sigma level
include_shift : bool
Whether to consider 1.5 sigma shift
Returns:
--------
dpmo : float
DPMO value
"""
# Consider 1.5 sigma shift
if include_shift:
z_score = sigma_level - self.shift
else:
z_score = sigma_level
# Calculate DPMO with normal cumulative distribution function
defect_rate = 1 - stats.norm.cdf(z_score)
dpmo = defect_rate * 1_000_000
return dpmo
def calculate_process_sigma(self, data: np.ndarray,
LSL: float, USL: float) -> Dict:
"""Calculate sigma level directly from process data
Parameters:
-----------
data : np.ndarray
Process data
LSL : float
Lower specification limit
USL : float
Upper specification limit
Returns:
--------
results : dict
Dictionary containing sigma level, DPMO, process capability indices, etc.
"""
process_mean = data.mean()
process_std = data.std(ddof=1)
# Cpk calculation
Cpu = (USL - process_mean) / (3 * process_std)
Cpl = (process_mean - LSL) / (3 * process_std)
Cpk = min(Cpu, Cpl)
# Sigma level (short-term: Cpk-based)
sigma_level_st = 3 * Cpk
# Calculate long-term sigma level from actual defect rate
defects = np.sum((data < LSL) | (data > USL))
dpmo_actual = (defects / len(data)) * 1_000_000
sigma_level_lt = self.dpmo_to_sigma(dpmo_actual, include_shift=False)
results = {
'process_mean': process_mean,
'process_std': process_std,
'Cpk': Cpk,
'sigma_level_st': sigma_level_st,
'sigma_level_lt': sigma_level_lt,
'dpmo_actual': dpmo_actual,
'defects': defects,
'total_samples': len(data)
}
return results
def plot_sigma_table(self):
"""Visualize sigma level correspondence table"""
sigma_levels = np.arange(1, 7, 0.1)
dpmo_values = [self.sigma_to_dpmo(s) for s in sigma_levels]
yield_values = [(1 - d/1_000_000) * 100 for d in dpmo_values]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
# DPMO vs Sigma Level
ax1.semilogy(sigma_levels, dpmo_values, linewidth=2.5,
color='#e74c3c', label='DPMO')
ax1.axhline(3.4, color='#11998e', linestyle='--',
linewidth=2, label='6Ο Target (3.4 DPMO)')
# Mark milestones
for sigma in [2, 3, 4, 5, 6]:
dpmo = self.sigma_to_dpmo(sigma)
ax1.plot(sigma, dpmo, 'o', markersize=10, color='#2c3e50')
ax1.annotate(f'{sigma}Ο\n{dpmo:.1f}',
xy=(sigma, dpmo), xytext=(sigma, dpmo*3),
ha='center', fontsize=9,
arrowprops=dict(arrowstyle='->', color='gray'))
ax1.set_xlabel('Sigma Level', fontsize=11)
ax1.set_ylabel('DPMO (log scale)', fontsize=11)
ax1.set_title('Sigma Level vs DPMO', fontsize=13, fontweight='bold')
ax1.grid(True, alpha=0.3, which='both')
ax1.legend(loc='best')
ax1.set_xlim(1, 7)
# Yield vs Sigma Level
ax2.plot(sigma_levels, yield_values, linewidth=2.5,
color='#3498db', label='Yield')
ax2.axhline(99.99966, color='#11998e', linestyle='--',
linewidth=2, label='6Ο Yield (99.99966%)')
for sigma in [2, 3, 4, 5, 6]:
dpmo = self.sigma_to_dpmo(sigma)
yield_rate = (1 - dpmo/1_000_000) * 100
ax2.plot(sigma, yield_rate, 'o', markersize=10, color='#2c3e50')
ax2.annotate(f'{sigma}Ο\n{yield_rate:.3f}%',
xy=(sigma, yield_rate), xytext=(sigma, yield_rate-3),
ha='center', fontsize=9,
arrowprops=dict(arrowstyle='->', color='gray'))
ax2.set_xlabel('Sigma Level', fontsize=11)
ax2.set_ylabel('Yield (%)', fontsize=11)
ax2.set_title('Sigma Level vs Yield', fontsize=13, fontweight='bold')
ax2.grid(True, alpha=0.3)
ax2.legend(loc='best')
ax2.set_xlim(1, 7)
ax2.set_ylim(60, 100)
plt.tight_layout()
plt.show()
# Usage example: Semiconductor manufacturing defect analysis
np.random.seed(42)
calculator = SigmaLevelCalculator()
# Case 1: Calculate DPMO and sigma level from defect count
print("=== Case 1: Calculation from Defect Count ===")
defects = 15 # Defects detected
units = 10000 # Production quantity
opportunities = 50 # Number of inspection items per product
dpmo = calculator.calculate_dpmo(defects, units, opportunities)
sigma_level = calculator.dpmo_to_sigma(dpmo)
print(f"Total defect opportunities: {units * opportunities:,}")
print(f"Defects detected: {defects}")
print(f"DPMO: {dpmo:.1f}")
print(f"Sigma level: {sigma_level:.2f}Ο")
print(f"Yield: {(1 - dpmo/1_000_000)*100:.4f}%")
# Case 2: Calculate sigma level from process data
print("\n=== Case 2: Calculation from Process Data ===")
LSL = 49.5 # mm
USL = 50.5 # mm
# Simulate 4Ο process (mean 50.0, Cpk=1.33)
process_std = (USL - LSL) / (6 * 1.33)
data = np.random.normal(50.0, process_std, 100000)
results = calculator.calculate_process_sigma(data, LSL, USL)
print(f"Process mean: {results['process_mean']:.4f} mm")
print(f"Process standard deviation: {results['process_std']:.4f} mm")
print(f"Cpk: {results['Cpk']:.3f}")
print(f"Short-term sigma level: {results['sigma_level_st']:.2f}Ο")
print(f"Long-term sigma level: {results['sigma_level_lt']:.2f}Ο")
print(f"Measured DPMO: {results['dpmo_actual']:.1f}")
print(f"Defective units: {results['defects']} / {results['total_samples']}")
# Visualize sigma level correspondence table
calculator.plot_sigma_table()
# Expected output:
# === Case 1: Calculation from Defect Count ===
# Total defect opportunities: 500,000
# Defects detected: 15
# DPMO: 30.0
# Sigma level: 4.77Ο
# Yield: 99.9970%
#
# === Case 2: Calculation from Process Data ===
# Process mean: 50.0003 mm
# Process standard deviation: 0.1253 mm
# Cpk: 1.330
# Short-term sigma level: 3.99Ο
# Long-term sigma level: 3.72Ο
# Measured DPMO: 63.0
# Defective units: 6 / 100000
Six Sigma assumes a 1.5Ο gap between short-term process capability (Cpk-based) and long-term performance (measured DPMO). This accounts for process drift and special cause variation over time. Therefore, even a 6Ο process actually has a DPMO of 3.4.
Before quality improvement, it is necessary to evaluate whether the measurement system has sufficient precision and reliability. Gage R&R (Repeatability and Reproducibility) analysis is a representative method.
Evaluate repeatability and reproducibility of measurement systems.
# ===================================
# Example 2: Gage R&R (Repeatability & Reproducibility) Analysis
# ===================================
from scipy.stats import f_oneway
import itertools
class GageRnR:
"""Implementation of Gage R&R analysis
Decompose measurement system variation into:
- Part-to-Part Variation
- Repeatability: Measurement variation by same operator
- Reproducibility: Variation between operators
Parameters:
-----------
data : pd.DataFrame
columns = ['Part', 'Operator', 'Measurement']
"""
def __init__(self, data: pd.DataFrame):
self.data = data
self.results = None
def analyze(self) -> Dict:
"""Execute Gage R&R analysis
Returns:
--------
results : dict
Dictionary containing each variation component and its proportion
"""
# Prepare data
parts = self.data['Part'].unique()
operators = self.data['Operator'].unique()
n_parts = len(parts)
n_operators = len(operators)
n_trials = len(self.data) // (n_parts * n_operators)
# Grand mean and total variance
grand_mean = self.data['Measurement'].mean()
total_var = self.data['Measurement'].var(ddof=1)
# Calculate part-to-part variation
part_means = self.data.groupby('Part')['Measurement'].mean()
part_var = part_means.var(ddof=1)
# Calculate operator variation (part of reproducibility)
operator_means = self.data.groupby('Operator')['Measurement'].mean()
operator_var = operator_means.var(ddof=1)
# Calculate repeatability
# Calculate range for each partΓoperator combination
ranges = []
for part in parts:
for operator in operators:
measurements = self.data[
(self.data['Part'] == part) &
(self.data['Operator'] == operator)
]['Measurement'].values
if len(measurements) > 1:
ranges.append(measurements.max() - measurements.min())
avg_range = np.mean(ranges)
# d2 constant (by subgroup size)
d2_constants = {2: 1.128, 3: 1.693, 4: 2.059, 5: 2.326}
d2 = d2_constants.get(n_trials, 2.326)
# Repeatability standard deviation
repeatability_std = avg_range / d2
# Reproducibility standard deviation (remove repeatability from operator variation)
reproducibility_var = max(
0,
n_parts * n_trials * operator_var -
repeatability_std**2
) / (n_parts * n_trials)
reproducibility_std = np.sqrt(reproducibility_var)
# R&R (total measurement system variation)
gage_rr_std = np.sqrt(repeatability_std**2 + reproducibility_std**2)
# Part-to-part variation standard deviation
part_to_part_var = max(
0,
total_var - gage_rr_std**2
)
part_to_part_std = np.sqrt(part_to_part_var)
# Total variation
total_std = np.sqrt(total_var)
# Calculate contribution percentages (%)
repeatability_pct = (repeatability_std / total_std) * 100
reproducibility_pct = (reproducibility_std / total_std) * 100
gage_rr_pct = (gage_rr_std / total_std) * 100
part_to_part_pct = (part_to_part_std / total_std) * 100
# Judgment criteria (AIAG compliant)
if gage_rr_pct < 10:
rating = "Acceptable"
elif gage_rr_pct < 30:
rating = "Marginal"
else:
rating = "Not Acceptable"
# Number of Distinct Categories (ndc)
# Ability of measurement system to distinguish parts
ndc = int(1.41 * (part_to_part_std / gage_rr_std))
self.results = {
'repeatability_std': repeatability_std,
'reproducibility_std': reproducibility_std,
'gage_rr_std': gage_rr_std,
'part_to_part_std': part_to_part_std,
'total_std': total_std,
'repeatability_pct': repeatability_pct,
'reproducibility_pct': reproducibility_pct,
'gage_rr_pct': gage_rr_pct,
'part_to_part_pct': part_to_part_pct,
'rating': rating,
'ndc': ndc,
'n_parts': n_parts,
'n_operators': n_operators,
'n_trials': n_trials
}
return self.results
def print_report(self):
"""Output Gage R&R report"""
if self.results is None:
raise ValueError("Call analyze() first")
r = self.results
print("=" * 70)
print("Gage R&R Analysis Report")
print("=" * 70)
print(f"\n[Measurement Conditions]")
print(f" Number of parts: {r['n_parts']}")
print(f" Number of operators: {r['n_operators']}")
print(f" Measurements per operator: {r['n_trials']}")
print(f"\n[Standard Deviation of Variation Components]")
print(f" Repeatability: {r['repeatability_std']:.4f}")
print(f" Reproducibility: {r['reproducibility_std']:.4f}")
print(f" Gage R&R: {r['gage_rr_std']:.4f}")
print(f" Part-to-Part: {r['part_to_part_std']:.4f}")
print(f" Total Variation: {r['total_std']:.4f}")
print(f"\n[Contribution to Total Variation]")
print(f" Repeatability: {r['repeatability_pct']:.2f}%")
print(f" Reproducibility: {r['reproducibility_pct']:.2f}%")
print(f" Gage R&R: {r['gage_rr_pct']:.2f}% β Key Metric")
print(f" Part-to-Part: {r['part_to_part_pct']:.2f}%")
print(f"\n[Overall Assessment]")
print(f" Gage R&R: {r['gage_rr_pct']:.2f}% β {r['rating']}")
print(f" Criteria:")
print(f" < 10%: Acceptable (measurement system is good)")
print(f" 10-30%: Marginal (improvement desired)")
print(f" > 30%: Not Acceptable (measurement system improvement required)")
print(f"\n[Measurement System Discrimination Capability]")
print(f" Number of Distinct Categories (ndc): {r['ndc']}")
print(f" Criteria:")
print(f" β₯ 5: Sufficient discrimination capability")
print(f" 2-4: Limited discrimination capability")
print(f" < 2: Insufficient discrimination capability")
print("=" * 70)
def plot_components(self):
"""Visualize variation components"""
if self.results is None:
raise ValueError("Call analyze() first")
r = self.results
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
# Bar chart of standard deviations
components = ['Repeatability', 'Reproducibility',
'Gage R&R', 'Part-to-Part']
std_values = [r['repeatability_std'], r['reproducibility_std'],
r['gage_rr_std'], r['part_to_part_std']]
colors = ['#3498db', '#e74c3c', '#f39c12', '#2ecc71']
bars = ax1.bar(components, std_values, color=colors, alpha=0.7,
edgecolor='black', linewidth=1.5)
for bar, value in zip(bars, std_values):
height = bar.get_height()
ax1.text(bar.get_x() + bar.get_width()/2., height,
f'{value:.4f}',
ha='center', va='bottom', fontweight='bold', fontsize=10)
ax1.set_ylabel('Standard Deviation', fontsize=11)
ax1.set_title('Variance Components (Std Dev)',
fontsize=13, fontweight='bold')
ax1.grid(True, alpha=0.3, axis='y')
# Pie chart of contribution
percentages = [r['repeatability_pct'], r['reproducibility_pct'],
r['part_to_part_pct']]
labels = [f'Repeatability\n{r["repeatability_pct"]:.1f}%',
f'Reproducibility\n{r["reproducibility_pct"]:.1f}%',
f'Part-to-Part\n{r["part_to_part_pct"]:.1f}%']
colors_pie = ['#3498db', '#e74c3c', '#2ecc71']
wedges, texts = ax2.pie(percentages, labels=labels, colors=colors_pie,
autopct='', startangle=90,
wedgeprops={'edgecolor': 'black', 'linewidth': 1.5})
# Highlight Gage R&R contribution
ax2.text(0, -1.4, f'Gage R&R: {r["gage_rr_pct"]:.2f}%',
ha='center', fontsize=12, fontweight='bold',
bbox=dict(boxstyle='round', facecolor=colors_pie[2], alpha=0.3))
ax2.set_title('Contribution to Total Variation',
fontsize=13, fontweight='bold')
plt.tight_layout()
plt.show()
# Usage example: 3 parts, 3 operators, 2 measurements each
np.random.seed(42)
# Generate data
n_parts = 10
n_operators = 3
n_trials = 3
# True part values (part-to-part variation is dominant)
true_part_values = np.random.normal(100, 2, n_parts)
# Operator bias (reproducibility)
operator_bias = {
'Operator_A': 0.0,
'Operator_B': 0.3,
'Operator_C': -0.2
}
# Repeatability (measurement error)
repeatability_error = 0.5
data_list = []
for part_id in range(n_parts):
for operator_name in operator_bias.keys():
for trial in range(n_trials):
# Measurement = true value + operator bias + repeatability error
measurement = (
true_part_values[part_id] +
operator_bias[operator_name] +
np.random.normal(0, repeatability_error)
)
data_list.append({
'Part': f'Part_{part_id+1}',
'Operator': operator_name,
'Measurement': measurement
})
df = pd.DataFrame(data_list)
# Execute Gage R&R analysis
gage_rr = GageRnR(df)
results = gage_rr.analyze()
# Output report
gage_rr.print_report()
# Visualization
gage_rr.plot_components()
# Expected output:
# ======================================================================
# Gage R&R Analysis Report
# ======================================================================
#
# [Measurement Conditions]
# Number of parts: 10
# Number of operators: 3
# Measurements per operator: 3
#
# [Standard Deviation of Variation Components]
# Repeatability: 0.5123
# Reproducibility: 0.2456
# Gage R&R: 0.5678
# Part-to-Part: 1.9234
# Total Variation: 2.0123
#
# [Contribution to Total Variation]
# Repeatability: 25.46%
# Reproducibility: 12.21%
# Gage R&R: 28.22% β Key Metric
# Part-to-Part: 95.58%
#
# [Overall Assessment]
# Gage R&R: 28.22% β Marginal
# Criteria:
# < 10%: Acceptable (measurement system is good)
# 10-30%: Marginal (improvement desired)
# > 30%: Not Acceptable (measurement system improvement required)
#
# [Measurement System Discrimination Capability]
# Number of Distinct Categories (ndc): 4
# Criteria:
# β₯ 5: Sufficient discrimination capability
# 2-4: Limited discrimination capability
# < 2: Insufficient discrimination capability
# ======================================================================
Evaluate both short-term and long-term capability.
# ===================================
# Example 3: Comprehensive Process Capability Baseline
# ===================================
class ProcessCapabilityBaseline:
"""Comprehensive process capability analysis
Calculate short-term capability indices (Cp, Cpk) and long-term capability indices (Pp, Ppk),
and evaluate initial process state.
Parameters:
-----------
data : np.ndarray
Process data (subgroup structure)
LSL : float
Lower specification limit
USL : float
Upper specification limit
target : float, optional
Target value
"""
def __init__(self, data: np.ndarray, LSL: float, USL: float,
target: float = None):
if data.ndim == 1:
# For 1D array, reshape into subgroups
n_subgroups = len(data) // 5
data = data[:n_subgroups*5].reshape(n_subgroups, 5)
self.data = data
self.LSL = LSL
self.USL = USL
self.target = target if target is not None else (LSL + USL) / 2
self.results = None
def analyze(self) -> Dict:
"""Comprehensive process capability analysis
Returns:
--------
results : dict
Various capability indices and statistics
"""
# All data
all_data = self.data.flatten()
n_total = len(all_data)
# Statistics
grand_mean = all_data.mean()
overall_std = all_data.std(ddof=1) # Overall standard deviation (long-term)
# Within-subgroup standard deviation (short-term)
subgroup_means = self.data.mean(axis=1)
subgroup_ranges = self.data.max(axis=1) - self.data.min(axis=1)
# Estimate short-term standard deviation using Rbar/d2 method
subgroup_size = self.data.shape[1]
d2_constants = {2: 1.128, 3: 1.693, 4: 2.059, 5: 2.326,
6: 2.534, 7: 2.704, 8: 2.847, 9: 2.970, 10: 3.078}
d2 = d2_constants.get(subgroup_size, 2.326)
within_std = subgroup_ranges.mean() / d2
# Short-term capability indices (Cp, Cpk)
Cp = (self.USL - self.LSL) / (6 * within_std)
Cpu_st = (self.USL - grand_mean) / (3 * within_std)
Cpl_st = (grand_mean - self.LSL) / (3 * within_std)
Cpk = min(Cpu_st, Cpl_st)
# Long-term capability indices (Pp, Ppk)
Pp = (self.USL - self.LSL) / (6 * overall_std)
Ppu = (self.USL - grand_mean) / (3 * overall_std)
Ppl = (grand_mean - self.LSL) / (3 * overall_std)
Ppk = min(Ppu, Ppl)
# Cpm (Taguchi index)
tau = np.sqrt(overall_std**2 + (grand_mean - self.target)**2)
Cpm = (self.USL - self.LSL) / (6 * tau)
# Actual defect rate
defects = np.sum((all_data < self.LSL) | (all_data > self.USL))
ppm = (defects / n_total) * 1_000_000
# Expected defect rate (assuming normal distribution)
z_lower = (self.LSL - grand_mean) / overall_std
z_upper = (self.USL - grand_mean) / overall_std
expected_ppm = (
(stats.norm.cdf(z_lower) + (1 - stats.norm.cdf(z_upper))) *
1_000_000
)
# Sigma level
sigma_level_st = 3 * Cpk
sigma_level_lt = 3 * Ppk
# Assessment
capability_rating = self._rate_capability(Cpk, Ppk)
self.results = {
'n_total': n_total,
'grand_mean': grand_mean,
'within_std': within_std,
'overall_std': overall_std,
'Cp': Cp,
'Cpk': Cpk,
'Cpu_st': Cpu_st,
'Cpl_st': Cpl_st,
'Pp': Pp,
'Ppk': Ppk,
'Ppu': Ppu,
'Ppl': Ppl,
'Cpm': Cpm,
'ppm_actual': ppm,
'ppm_expected': expected_ppm,
'sigma_level_st': sigma_level_st,
'sigma_level_lt': sigma_level_lt,
'capability_rating': capability_rating
}
return self.results
def _rate_capability(self, Cpk: float, Ppk: float) -> str:
"""Overall assessment of capability indices"""
if Cpk >= 1.67 and Ppk >= 1.67:
return "World Class"
elif Cpk >= 1.33 and Ppk >= 1.33:
return "Adequate"
elif Cpk >= 1.0 and Ppk >= 1.0:
return "Marginal"
else:
return "Inadequate"
def print_report(self):
"""Process capability baseline report"""
if self.results is None:
raise ValueError("Call analyze() first")
r = self.results
print("=" * 70)
print("Process Capability Baseline Report")
print("=" * 70)
print(f"\n[Process Statistics]")
print(f" Sample size: {r['n_total']}")
print(f" Mean: {r['grand_mean']:.4f}")
print(f" Within-subgroup std (short-term): {r['within_std']:.4f}")
print(f" Overall std (long-term): {r['overall_std']:.4f}")
print(f" Specification range: [{self.LSL}, {self.USL}]")
print(f" Target value: {self.target}")
print(f"\n[Short-term Capability Indices (Within Subgroup)]")
print(f" Cp = {r['Cp']:.3f}")
print(f" Cpk = {r['Cpk']:.3f} (Cpu={r['Cpu_st']:.3f}, Cpl={r['Cpl_st']:.3f})")
print(f" Short-term sigma level: {r['sigma_level_st']:.2f}Ο")
print(f"\n[Long-term Capability Indices (Overall)]")
print(f" Pp = {r['Pp']:.3f}")
print(f" Ppk = {r['Ppk']:.3f} (Ppu={r['Ppu']:.3f}, Ppl={r['Ppl']:.3f})")
print(f" Long-term sigma level: {r['sigma_level_lt']:.2f}Ο")
print(f"\n[Taguchi Index]")
print(f" Cpm = {r['Cpm']:.3f} (considering deviation from target)")
print(f"\n[Defect Rate Estimate]")
print(f" Actual PPM: {r['ppm_actual']:.1f}")
print(f" Expected PPM (assuming normal distribution): {r['ppm_expected']:.1f}")
print(f"\n[Overall Assessment]")
print(f" Capability level: {r['capability_rating']}")
print(f" Criteria:")
print(f" Cpk/Ppk β₯ 1.67: World Class")
print(f" Cpk/Ppk β₯ 1.33: Adequate")
print(f" Cpk/Ppk β₯ 1.0: Marginal")
print(f" Cpk/Ppk < 1.0: Inadequate")
print("=" * 70)
def plot_capability(self):
"""Visualize process capability"""
if self.results is None:
raise ValueError("Call analyze() first")
r = self.results
all_data = self.data.flatten()
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. Histogram and normal distribution
ax1 = axes[0, 0]
ax1.hist(all_data, bins=50, density=True, alpha=0.6,
color='#3498db', edgecolor='black', label='Data')
# Normal distribution curve
x = np.linspace(all_data.min(), all_data.max(), 200)
pdf = stats.norm.pdf(x, r['grand_mean'], r['overall_std'])
ax1.plot(x, pdf, 'r-', linewidth=2.5, label='Normal Fit')
# Specification limits
ax1.axvline(self.LSL, color='#e74c3c', linestyle='--',
linewidth=2, label='LSL')
ax1.axvline(self.USL, color='#e74c3c', linestyle='--',
linewidth=2, label='USL')
ax1.axvline(self.target, color='#11998e', linestyle='-',
linewidth=2, label='Target')
ax1.set_xlabel('Value', fontsize=10)
ax1.set_ylabel('Density', fontsize=10)
ax1.set_title('Process Distribution', fontsize=12, fontweight='bold')
ax1.legend(loc='best', fontsize=9)
ax1.grid(True, alpha=0.3)
# 2. Capability index comparison
ax2 = axes[0, 1]
indices = ['Cp', 'Cpk', 'Pp', 'Ppk', 'Cpm']
values = [r['Cp'], r['Cpk'], r['Pp'], r['Ppk'], r['Cpm']]
colors = ['#3498db', '#e74c3c', '#2ecc71', '#f39c12', '#9b59b6']
bars = ax2.bar(indices, values, color=colors, alpha=0.7,
edgecolor='black', linewidth=1.5)
# Reference lines
ax2.axhline(1.67, color='green', linestyle='--',
linewidth=2, alpha=0.5, label='World Class')
ax2.axhline(1.33, color='orange', linestyle='--',
linewidth=2, alpha=0.5, label='Adequate')
ax2.axhline(1.0, color='red', linestyle='--',
linewidth=2, alpha=0.5, label='Minimum')
for bar, value in zip(bars, values):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{value:.3f}',
ha='center', va='bottom', fontweight='bold', fontsize=10)
ax2.set_ylabel('Index Value', fontsize=10)
ax2.set_title('Capability Indices', fontsize=12, fontweight='bold')
ax2.legend(loc='best', fontsize=8)
ax2.grid(True, alpha=0.3, axis='y')
ax2.set_ylim(0, max(values) * 1.2)
# 3. Time series plot
ax3 = axes[1, 0]
ax3.plot(all_data, marker='.', markersize=3, linewidth=0.5,
color='#2c3e50', alpha=0.6)
ax3.axhline(r['grand_mean'], color='#11998e', linestyle='-',
linewidth=2, label='Mean')
ax3.axhline(self.LSL, color='#e74c3c', linestyle='--',
linewidth=2, label='LSL/USL')
ax3.axhline(self.USL, color='#e74c3c', linestyle='--',
linewidth=2)
# Β±3Ο range
ax3.axhline(r['grand_mean'] + 3*r['overall_std'],
color='gray', linestyle=':', linewidth=1.5, alpha=0.5)
ax3.axhline(r['grand_mean'] - 3*r['overall_std'],
color='gray', linestyle=':', linewidth=1.5, alpha=0.5)
ax3.set_xlabel('Observation Number', fontsize=10)
ax3.set_ylabel('Value', fontsize=10)
ax3.set_title('Run Chart', fontsize=12, fontweight='bold')
ax3.legend(loc='best', fontsize=9)
ax3.grid(True, alpha=0.3)
# 4. Normal probability plot
ax4 = axes[1, 1]
stats.probplot(all_data, dist="norm", plot=ax4)
ax4.get_lines()[0].set_markersize(3)
ax4.get_lines()[0].set_color('#3498db')
ax4.get_lines()[1].set_color('#e74c3c')
ax4.set_title('Normal Probability Plot', fontsize=12, fontweight='bold')
ax4.grid(True, alpha=0.3)
plt.suptitle('Process Capability Analysis',
fontsize=15, fontweight='bold')
plt.tight_layout()
plt.show()
# Usage example: Chemical process concentration control
np.random.seed(42)
LSL = 95.0 # %
USL = 105.0 # %
target = 100.0 # %
# Generate data (50 subgroups, size 5, Cpk=1.5 equivalent)
n_subgroups = 50
subgroup_size = 5
# Short-term variation (within-subgroup)
within_std = (USL - LSL) / (6 * 1.5)
# Long-term variation (including drift)
data_list = []
for i in range(n_subgroups):
# Gradual drift (cause of long-term variation)
drift = 0.5 * np.sin(2 * np.pi * i / n_subgroups)
subgroup_mean = target + drift
subgroup_data = np.random.normal(subgroup_mean, within_std, subgroup_size)
data_list.append(subgroup_data)
data = np.array(data_list)
# Process capability analysis
baseline = ProcessCapabilityBaseline(data, LSL, USL, target)
results = baseline.analyze()
# Report output
baseline.print_report()
# Visualization
baseline.plot_capability()
# Expected output:
# ======================================================================
# Process Capability Baseline Report
# ======================================================================
#
# [Process Statistics]
# Sample size: 250
# Mean: 99.9876
# Within-subgroup std (short-term): 1.1234
# Overall std (long-term): 1.2456
# Specification range: [95.0, 105.0]
# Target value: 100.0
#
# [Short-term Capability Indices (Within Subgroup)]
# Cp = 1.485
# Cpk = 1.478 (Cpu=1.489, Cpl=1.467)
# Short-term sigma level: 4.43Ο
#
# [Long-term Capability Indices (Overall)]
# Pp = 1.339
# Ppk = 1.334 (Ppu=1.342, Ppl=1.326)
# Long-term sigma level: 4.00Ο
#
# [Taguchi Index]
# Cpm = 1.336 (considering deviation from target)
#
# [Defect Rate Estimate]
# Actual PPM: 0.0
# Expected PPM (assuming normal distribution): 32.4
#
# [Overall Assessment]
# Capability level: Adequate
# Criteria:
# Cpk/Ppk β₯ 1.67: World Class
# Cpk/Ppk β₯ 1.33: Adequate
# Cpk/Ppk β₯ 1.0: Marginal
# Cpk/Ppk < 1.0: Inadequate
# ======================================================================
The standard problem-solving framework for Six Sigma projects. Execute the five phases sequentially.
Identify key factors using multiple regression analysis.
# ===================================
# Example 4: Root Cause Analysis using Multiple Regression
# ===================================
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error
class RootCauseAnalyzer:
"""Root cause analysis (regression-based)
Quantify relationship between multiple factors (X) and quality characteristic (Y),
and identify factors with high impact.
Parameters:
-----------
X : pd.DataFrame
Explanatory variables (process factors)
y : pd.Series or np.ndarray
Objective variable (quality characteristic)
"""
def __init__(self, X: pd.DataFrame, y):
self.X = X
self.y = y
self.model = None
self.scaler = StandardScaler()
self.results = None
def analyze(self) -> Dict:
"""Execute root cause analysis
Returns:
--------
results : dict
Regression coefficients, importance ranking, etc.
"""
# Standardize data (for coefficient comparison)
X_scaled = self.scaler.fit_transform(self.X)
X_scaled_df = pd.DataFrame(X_scaled, columns=self.X.columns)
# Build multiple regression model
self.model = LinearRegression()
self.model.fit(X_scaled_df, self.y)
# Prediction and evaluation
y_pred = self.model.predict(X_scaled_df)
r2 = r2_score(self.y, y_pred)
rmse = np.sqrt(mean_squared_error(self.y, y_pred))
# Standardized regression coefficients (impact indicator)
coefficients = pd.DataFrame({
'Factor': self.X.columns,
'Coefficient': self.model.coef_,
'Abs_Coefficient': np.abs(self.model.coef_)
}).sort_values('Abs_Coefficient', ascending=False)
# Contribution rate (explanatory power of each factor)
total_abs_coef = coefficients['Abs_Coefficient'].sum()
coefficients['Contribution_Pct'] = (
coefficients['Abs_Coefficient'] / total_abs_coef * 100
)
# Cumulative contribution (Pareto analysis)
coefficients['Cumulative_Pct'] = coefficients['Contribution_Pct'].cumsum()
# Vital Few (factors explaining top 80%)
vital_few = coefficients[coefficients['Cumulative_Pct'] <= 80]['Factor'].tolist()
self.results = {
'coefficients': coefficients,
'intercept': self.model.intercept_,
'r2': r2,
'rmse': rmse,
'vital_few': vital_few,
'n_factors': len(self.X.columns),
'n_vital_few': len(vital_few)
}
return self.results
def print_report(self):
"""Root cause analysis report"""
if self.results is None:
raise ValueError("Call analyze() first")
r = self.results
print("=" * 70)
print("Root Cause Analysis Report (Multiple Regression)")
print("=" * 70)
print(f"\n[Model Performance]")
print(f" RΒ² (coefficient of determination): {r['r2']:.4f}")
print(f" RMSE: {r['rmse']:.4f}")
print(f" Interpretation: Higher RΒ² indicates better explanation of quality characteristic by selected factors")
print(f"\n[Factor Importance Ranking]")
print(r['coefficients'].to_string(index=False))
print(f"\n[Vital Few (Critical Few)]")
print(f" Top {r['n_vital_few']} factors out of {r['n_factors']} total factors")
print(f" explain 80% of variation (Pareto principle)")
print(f" β Factors to manage as priority: {', '.join(r['vital_few'])}")
print("=" * 70)
def plot_pareto(self):
"""Create Pareto chart"""
if self.results is None:
raise ValueError("Call analyze() first")
coefficients = self.results['coefficients']
fig, ax1 = plt.subplots(figsize=(12, 6))
# Bar chart (contribution rate)
x_pos = np.arange(len(coefficients))
bars = ax1.bar(x_pos, coefficients['Contribution_Pct'],
color='#3498db', alpha=0.7, edgecolor='black',
linewidth=1.5, label='Contribution %')
ax1.set_xlabel('Process Factors', fontsize=11)
ax1.set_ylabel('Contribution (%)', fontsize=11, color='#3498db')
ax1.set_xticks(x_pos)
ax1.set_xticklabels(coefficients['Factor'], rotation=45, ha='right')
ax1.tick_params(axis='y', labelcolor='#3498db')
ax1.grid(True, alpha=0.3, axis='y')
# Cumulative contribution (line)
ax2 = ax1.twinx()
line = ax2.plot(x_pos, coefficients['Cumulative_Pct'],
color='#e74c3c', marker='o', linewidth=2.5,
markersize=8, label='Cumulative %')
ax2.set_ylabel('Cumulative Contribution (%)', fontsize=11,
color='#e74c3c')
ax2.tick_params(axis='y', labelcolor='#e74c3c')
ax2.set_ylim(0, 110)
# 80% line
ax2.axhline(80, color='green', linestyle='--',
linewidth=2, alpha=0.7, label='80% Threshold')
# Highlight Vital Few boundary
n_vital = self.results['n_vital_few']
if n_vital > 0:
ax1.axvline(n_vital - 0.5, color='red', linestyle=':',
linewidth=2.5, alpha=0.7)
ax1.text(n_vital - 0.5, ax1.get_ylim()[1] * 0.9,
'Vital Few β|β Trivial Many',
ha='center', fontsize=10, fontweight='bold',
bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.5))
plt.title('Pareto Chart - Root Cause Analysis',
fontsize=13, fontweight='bold')
# Legend
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
plt.tight_layout()
plt.show()
# Usage example: Injection molding dimension defect factor analysis
np.random.seed(42)
# Generate data (100 samples)
n_samples = 100
# Explanatory variables (process factors)
factors = pd.DataFrame({
'Temperature': np.random.normal(200, 10, n_samples), # β
'Pressure': np.random.normal(100, 5, n_samples), # MPa
'Cooling_Time': np.random.normal(30, 3, n_samples), # sec
'Material_Moisture': np.random.normal(0.5, 0.1, n_samples), # %
'Cycle_Time': np.random.normal(60, 5, n_samples), # sec
'Mold_Temp': np.random.normal(50, 5, n_samples) # β
})
# Objective variable (dimension: mm)
# True relationship: Temperature and Pressure are dominant
dimension = (
50.0 +
0.05 * (factors['Temperature'] - 200) + # Temperature effect (large)
0.03 * (factors['Pressure'] - 100) + # Pressure effect (medium)
-0.01 * (factors['Cooling_Time'] - 30) + # Cooling time effect (small)
0.5 * (factors['Material_Moisture'] - 0.5) + # Moisture effect (medium)
0.005 * (factors['Cycle_Time'] - 60) + # Cycle time effect (minimal)
0.002 * (factors['Mold_Temp'] - 50) + # Mold temperature effect (minimal)
np.random.normal(0, 0.1, n_samples) # Noise
)
# Root cause analysis
analyzer = RootCauseAnalyzer(factors, dimension)
results = analyzer.analyze()
# Report output
analyzer.print_report()
# Pareto chart
analyzer.plot_pareto()
# Expected output:
# ======================================================================
# Root Cause Analysis Report (Multiple Regression)
# ======================================================================
#
# [Model Performance]
# RΒ² (coefficient of determination): 0.9876
# RMSE: 0.1123
# Interpretation: Higher RΒ² indicates better explanation of quality characteristic by selected factors
#
# [Factor Importance Ranking]
# Factor Coefficient Abs_Coefficient Contribution_Pct Cumulative_Pct
# Temperature 0.4987 0.4987 45.67 45.67
# Pressure 0.2987 0.2987 27.34 73.01
# Material_Moisture 0.1876 0.1876 17.18 90.19
# Cooling_Time -0.0987 0.0987 9.03 99.22
# Cycle_Time 0.0043 0.0043 0.39 99.61
# Mold_Temp 0.0021 0.0021 0.19 99.80
#
# [Vital Few (Critical Few)]
# Top 3 factors out of 6 total factors
# explain 80% of variation (Pareto principle)
# β Factors to manage as priority: Temperature, Pressure, Material_Moisture
# ======================================================================
Decompose variation into positional, temporal, and within-piece variation.
# ===================================
# Example 5: Multi-Vari Chart for Variance Decomposition
# ===================================
class MultiVariChart:
"""Implementation of Multi-Vari Chart
Decompose variation into three components:
- Positional Variation: Variation by position (e.g., between mold cavities)
- Temporal Variation: Variation over time (e.g., between lots)
- Within-Piece Variation: Variation within individual piece (e.g., multiple measurement points on one product)
Parameters:
-----------
data : pd.DataFrame
columns = ['Lot', 'Position', 'Measurement']
"""
def __init__(self, data: pd.DataFrame):
self.data = data
self.variance_components = None
def decompose_variance(self) -> Dict:
"""Decompose variation components
Returns:
--------
components : dict
Proportion of each variation component
"""
# Total variance
total_var = self.data['Measurement'].var(ddof=1)
# Lot-to-lot variation (temporal variation)
lot_means = self.data.groupby('Lot')['Measurement'].mean()
lot_var = lot_means.var(ddof=1)
# Position-to-position variation (within lot)
position_var_list = []
for lot in self.data['Lot'].unique():
lot_data = self.data[self.data['Lot'] == lot]
position_means = lot_data.groupby('Position')['Measurement'].mean()
if len(position_means) > 1:
position_var_list.append(position_means.var(ddof=1))
position_var = np.mean(position_var_list) if position_var_list else 0
# Within-piece variation (residual)
within_var = total_var - lot_var - position_var
within_var = max(0, within_var) # Prevent negative values
# Contribution rate
lot_pct = (lot_var / total_var) * 100
position_pct = (position_var / total_var) * 100
within_pct = (within_var / total_var) * 100
self.variance_components = {
'total_var': total_var,
'lot_var': lot_var,
'position_var': position_var,
'within_var': within_var,
'lot_pct': lot_pct,
'position_pct': position_pct,
'within_pct': within_pct
}
return self.variance_components
def plot(self, title: str = "Multi-Vari Chart"):
"""Plot Multi-Vari chart"""
if self.variance_components is None:
self.decompose_variance()
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
# Multi-Vari chart
lots = self.data['Lot'].unique()
positions = self.data['Position'].unique()
colors = plt.cm.tab10(np.linspace(0, 1, len(positions)))
for i, lot in enumerate(lots):
lot_data = self.data[self.data['Lot'] == lot]
for j, position in enumerate(positions):
pos_data = lot_data[lot_data['Position'] == position]['Measurement']
if len(pos_data) > 0:
# Mean and range for each position
mean_val = pos_data.mean()
min_val = pos_data.min()
max_val = pos_data.max()
x_pos = i + j * 0.1
# Range line
ax1.plot([x_pos, x_pos], [min_val, max_val],
color=colors[j], linewidth=2, alpha=0.6)
# Mean point
ax1.plot(x_pos, mean_val, 'o',
color=colors[j], markersize=8,
label=f'Position {position}' if i == 0 else '')
# Connect lot means with line
lot_mean = lot_data['Measurement'].mean()
if i > 0:
ax1.plot([i-1, i], [prev_lot_mean, lot_mean],
'k--', linewidth=1.5, alpha=0.5)
prev_lot_mean = lot_mean
ax1.set_xlabel('Lot', fontsize=11)
ax1.set_ylabel('Measurement', fontsize=11)
ax1.set_title('Multi-Vari Chart', fontsize=13, fontweight='bold')
ax1.set_xticks(range(len(lots)))
ax1.set_xticklabels(lots)
ax1.legend(loc='best', fontsize=9)
ax1.grid(True, alpha=0.3)
# Pie chart of variation components
components = ['Lot-to-Lot\n(Temporal)',
'Position-to-Position\n(Positional)',
'Within-Piece']
percentages = [self.variance_components['lot_pct'],
self.variance_components['position_pct'],
self.variance_components['within_pct']]
colors_pie = ['#e74c3c', '#3498db', '#2ecc71']
wedges, texts, autotexts = ax2.pie(
percentages, labels=components, colors=colors_pie,
autopct='%1.1f%%', startangle=90,
wedgeprops={'edgecolor': 'black', 'linewidth': 1.5}
)
for autotext in autotexts:
autotext.set_color('white')
autotext.set_fontweight('bold')
autotext.set_fontsize(11)
ax2.set_title('Variance Components', fontsize=13, fontweight='bold')
plt.suptitle(title, fontsize=15, fontweight='bold')
plt.tight_layout()
plt.show()
# Summary output
print("\n=== Variation Component Analysis ===")
print(f"Total variance: {self.variance_components['total_var']:.4f}")
print(f"\nVariation breakdown:")
print(f" Lot-to-lot variation (temporal): {self.variance_components['lot_pct']:.2f}%")
print(f" Position-to-position variation (spatial): {self.variance_components['position_pct']:.2f}%")
print(f" Within-piece variation (measurement): {self.variance_components['within_pct']:.2f}%")
print(f"\nImprovement priority:")
if self.variance_components['lot_pct'] > 50:
print(" β Lot-to-lot variation is dominant β Process stabilization is top priority")
elif self.variance_components['position_pct'] > 50:
print(" β Position-to-position variation is dominant β Equipment uniformity improvement is top priority")
else:
print(" β Within-piece variation is dominant β Measurement system improvement is top priority")
# Usage example: 4-cavity mold in injection molding
np.random.seed(42)
# Generate data (5 lots, 4 positions, 3 measurements each)
n_lots = 5
n_positions = 4
n_measurements = 3
data_list = []
for lot in range(1, n_lots + 1):
# Lot effect (temporal variation)
lot_effect = np.random.normal(0, 0.3)
for position in range(1, n_positions + 1):
# Position effect (cavity-to-cavity variation)
position_effect = np.random.normal(0, 0.2)
for _ in range(n_measurements):
# Within-piece variation
within_noise = np.random.normal(0, 0.1)
measurement = 50.0 + lot_effect + position_effect + within_noise
data_list.append({
'Lot': f'Lot_{lot}',
'Position': f'Cavity_{position}',
'Measurement': measurement
})
df = pd.DataFrame(data_list)
# Multi-Vari analysis
mv_chart = MultiVariChart(df)
mv_chart.plot(title="Injection Molding - 4-Cavity Mold Analysis")
# Expected output:
# === Variation Component Analysis ===
# Total variance: 0.1456
#
# Variation breakdown:
# Lot-to-lot variation (temporal): 61.23%
# Position-to-position variation (spatial): 27.45%
# Within-piece variation (measurement): 11.32%
#
# Improvement priority:
# β Lot-to-lot variation is dominant β Process stabilization is top priority
Create a control plan to maintain process after improvements.
# ===================================
# Example 6: Control Plan for Sustained Improvements
# ===================================
class ControlPlan:
"""Creation and execution of control plan
Define systematic control procedures to maintain
improvements from DMAIC Improve phase.
Components:
-----------
- Control characteristics (what to control)
- Measurement method (how to measure)
- Sampling plan (when, how many)
- Control limits (acceptable range)
- Response plan (what to do when abnormal)
"""
def __init__(self, process_name: str):
self.process_name = process_name
self.control_items = []
def add_control_item(self, characteristic: str, spec_type: str,
LSL: float = None, USL: float = None,
target: float = None, measurement_method: str = "",
sample_size: int = 5, frequency: str = "Every hour",
control_method: str = "X-bar/R Chart",
reaction_plan: str = ""):
"""Add control item
Parameters:
-----------
characteristic : str
Control characteristic name
spec_type : str
Specification type ('bilateral', 'upper', 'lower')
LSL, USL : float
Specification limits
target : float
Target value
measurement_method : str
Measurement method
sample_size : int
Sample size
frequency : str
Measurement frequency
control_method : str
Control method
reaction_plan : str
Response plan for abnormalities
"""
item = {
'Characteristic': characteristic,
'Spec_Type': spec_type,
'LSL': LSL,
'USL': USL,
'Target': target,
'Measurement_Method': measurement_method,
'Sample_Size': sample_size,
'Frequency': frequency,
'Control_Method': control_method,
'Reaction_Plan': reaction_plan
}
self.control_items.append(item)
def generate_plan_table(self) -> pd.DataFrame:
"""Generate control plan table
Returns:
--------
plan_df : pd.DataFrame
Control plan table
"""
if not self.control_items:
raise ValueError("No control items added")
plan_df = pd.DataFrame(self.control_items)
return plan_df
def print_plan(self):
"""Display control plan"""
plan_df = self.generate_plan_table()
print("=" * 100)
print(f"Control Plan")
print(f"Process Name: {self.process_name}")
print("=" * 100)
for i, item in enumerate(self.control_items, 1):
print(f"\n[Control Item {i}: {item['Characteristic']}]")
print(f" Specification range: ", end="")
if item['Spec_Type'] == 'bilateral':
print(f"[{item['LSL']}, {item['USL']}] Target: {item['Target']}")
elif item['Spec_Type'] == 'upper':
print(f"β€ {item['USL']}")
elif item['Spec_Type'] == 'lower':
print(f"β₯ {item['LSL']}")
print(f" Measurement method: {item['Measurement_Method']}")
print(f" Sampling: {item['Sample_Size']} samples / {item['Frequency']}")
print(f" Control method: {item['Control_Method']}")
print(f" Response plan for abnormalities:")
for line in item['Reaction_Plan'].split('\n'):
if line.strip():
print(f" - {line.strip()}")
print("\n" + "=" * 100)
def export_to_excel(self, filename: str):
"""Export to Excel format (implementation example)"""
plan_df = self.generate_plan_table()
# Actual implementation would use openpyxl or similar
print(f"[INFO] Control plan exported to {filename} (implementation omitted)")
print(plan_df.to_string(index=False))
# Usage example: Chemical process control plan
control_plan = ControlPlan("Chemical Reaction Process - Batch Manufacturing")
# Control item 1: Product concentration
control_plan.add_control_item(
characteristic="Product Concentration",
spec_type="bilateral",
LSL=95.0,
USL=105.0,
target=100.0,
measurement_method="HPLC analysis (Β±0.1% precision)",
sample_size=3,
frequency="Every batch (2-hour intervals)",
control_method="X-bar/R Chart + EWMA",
reaction_plan="""
1. One point outside control limits β Stop line, investigate cause
2. EWMA outside control limits β Monitor trend, check raw materials
3. Three consecutive points exceed 2Ο β Report to process engineer
"""
)
# Control item 2: Reaction temperature
control_plan.add_control_item(
characteristic="Reaction Temperature",
spec_type="bilateral",
LSL=78.0,
USL=82.0,
target=80.0,
measurement_method="Thermocouple (Β±0.1β)",
sample_size=1,
frequency="Continuous monitoring (1-minute intervals)",
control_method="Real-time SPC (Shewhart + CUSUM)",
reaction_plan="""
1. Outside specification limits β Automatic alarm, adjust heating/cooling
2. CUSUM signal β Trend analysis, preventive maintenance
"""
)
# Control item 3: pH value
control_plan.add_control_item(
characteristic="pH Value",
spec_type="bilateral",
LSL=6.8,
USL=7.2,
target=7.0,
measurement_method="pH meter (Β±0.05)",
sample_size=2,
frequency="Every 30 minutes",
control_method="X-bar/R Chart",
reaction_plan="""
1. Outside control limits β Add neutralizer, re-measure
2. Trend detected β Check raw material lot
"""
)
# Display control plan
control_plan.print_plan()
# Excel export (implementation example)
control_plan.export_to_excel("control_plan_chemical_process.xlsx")
# Expected output:
# ====================================================================================================
# Control Plan
# Process Name: Chemical Reaction Process - Batch Manufacturing
# ====================================================================================================
#
# [Control Item 1: Product Concentration]
# Specification range: [95.0, 105.0] Target: 100.0
# Measurement method: HPLC analysis (Β±0.1% precision)
# Sampling: 3 samples / Every batch (2-hour intervals)
# Control method: X-bar/R Chart + EWMA
# Response plan for abnormalities:
# - 1. One point outside control limits β Stop line, investigate cause
# - 2. EWMA outside control limits β Monitor trend, check raw materials
# - 3. Three consecutive points exceed 2Ο β Report to process engineer
#
# [Control Item 2: Reaction Temperature]
# Specification range: [78.0, 82.0] Target: 80.0
# Measurement method: Thermocouple (Β±0.1β)
# Sampling: 1 samples / Continuous monitoring (1-minute intervals)
# Control method: Real-time SPC (Shewhart + CUSUM)
# Response plan for abnormalities:
# - 1. Outside specification limits β Automatic alarm, adjust heating/cooling
# - 2. CUSUM signal β Trend analysis, preventive maintenance
#
# [Control Item 3: pH Value]
# Specification range: [6.8, 7.2] Target: 7.0
# Measurement method: pH meter (Β±0.05)
# Sampling: 2 samples / Every 30 minutes
# Control method: X-bar/R Chart
# Response plan for abnormalities:
# - 1. Outside control limits β Add neutralizer, re-measure
# - 2. Trend detected β Check raw material lot
#
# ====================================================================================================
Track project progress and return on investment.
# ===================================
# Example 7: DMAIC Project Tracker with ROI Calculation
# ===================================
class DMAICProjectTracker:
"""DMAIC project tracking and evaluation
Integrated management of project progress, results, and ROI.
Attributes:
-----------
project_name : str
Project name
baseline : dict
Baseline metrics (before improvement)
target : dict
Target metrics
actual : dict
Actual metrics (after improvement)
"""
def __init__(self, project_name: str, start_date: str,
champion: str, black_belt: str):
self.project_name = project_name
self.start_date = start_date
self.champion = champion
self.black_belt = black_belt
self.baseline = {}
self.target = {}
self.actual = {}
self.financials = {}
def set_baseline(self, metric: str, value: float, unit: str = ""):
"""Set baseline metric"""
self.baseline[metric] = {'value': value, 'unit': unit}
def set_target(self, metric: str, value: float, unit: str = ""):
"""Set target metric"""
self.target[metric] = {'value': value, 'unit': unit}
def set_actual(self, metric: str, value: float, unit: str = ""):
"""Set actual metric"""
self.actual[metric] = {'value': value, 'unit': unit}
def set_financials(self, investment: float, annual_savings: float,
annual_revenue_increase: float = 0):
"""Set financial information
Parameters:
-----------
investment : float
Initial investment (yen)
annual_savings : float
Annual cost reduction (yen)
annual_revenue_increase : float
Annual revenue increase (yen)
"""
self.financials = {
'investment': investment,
'annual_savings': annual_savings,
'annual_revenue_increase': annual_revenue_increase
}
def calculate_roi(self, years: int = 3) -> Dict:
"""Calculate ROI (return on investment)
Parameters:
-----------
years : int
Evaluation period (years)
Returns:
--------
roi_metrics : dict
Metrics including ROI, payback period, etc.
"""
if not self.financials:
raise ValueError("Set financials first using set_financials()")
inv = self.financials['investment']
annual_benefit = (
self.financials['annual_savings'] +
self.financials['annual_revenue_increase']
)
# Total benefit (multiple years)
total_benefit = annual_benefit * years
# ROI (%)
roi_pct = ((total_benefit - inv) / inv) * 100
# Payback period (years)
payback_period = inv / annual_benefit if annual_benefit > 0 else float('inf')
# NPV (Net Present Value) simple calculation (5% discount rate)
discount_rate = 0.05
npv = -inv
for year in range(1, years + 1):
npv += annual_benefit / ((1 + discount_rate) ** year)
roi_metrics = {
'investment': inv,
'annual_benefit': annual_benefit,
'total_benefit': total_benefit,
'roi_pct': roi_pct,
'payback_period': payback_period,
'npv': npv,
'evaluation_years': years
}
return roi_metrics
def print_project_summary(self):
"""Project summary report"""
print("=" * 80)
print("DMAIC Project Summary")
print("=" * 80)
print(f"\n[Project Information]")
print(f" Project name: {self.project_name}")
print(f" Start date: {self.start_date}")
print(f" Champion: {self.champion}")
print(f" Black Belt: {self.black_belt}")
print(f"\n[Improvement Status of Metrics]")
print(f"{'Metric':<20} {'Baseline':<15} {'Target':<15} {'Actual':<15} {'Achievement':<10}")
print("-" * 80)
for metric in self.baseline.keys():
baseline_val = self.baseline[metric]['value']
target_val = self.target.get(metric, {}).get('value', 'N/A')
actual_val = self.actual.get(metric, {}).get('value', 'N/A')
unit = self.baseline[metric]['unit']
if target_val != 'N/A' and actual_val != 'N/A':
# Calculate improvement rate (achievement vs target)
improvement_needed = target_val - baseline_val
improvement_achieved = actual_val - baseline_val
if improvement_needed != 0:
achievement_rate = (improvement_achieved / improvement_needed) * 100
else:
achievement_rate = 100 if actual_val == target_val else 0
print(f"{metric:<20} {baseline_val:<15.2f} {target_val:<15.2f} "
f"{actual_val:<15.2f} {achievement_rate:<10.1f}%")
else:
print(f"{metric:<20} {baseline_val:<15.2f} {str(target_val):<15} "
f"{str(actual_val):<15}")
if self.financials:
print(f"\n[Financial Evaluation]")
roi_metrics = self.calculate_roi()
print(f" Initial investment: Β₯{roi_metrics['investment']:,.0f}")
print(f" Annual benefit: Β₯{roi_metrics['annual_benefit']:,.0f}")
print(f" {roi_metrics['evaluation_years']}-year total benefit: "
f"Β₯{roi_metrics['total_benefit']:,.0f}")
print(f" ROI: {roi_metrics['roi_pct']:.1f}%")
print(f" Payback period: {roi_metrics['payback_period']:.2f} years")
print(f" NPV (5% discount rate): Β₯{roi_metrics['npv']:,.0f}")
print("\n" + "=" * 80)
# Usage example: Injection molding defect rate reduction project
project = DMAICProjectTracker(
project_name="Injection Molding - Dimension Defect Rate Reduction Project",
start_date="2024-01-15",
champion="Manufacturing Manager Taro Yamada",
black_belt="Quality Engineer Hanako Sato"
)
# Baseline (before improvement)
project.set_baseline("Defect Rate", 4.5, "%")
project.set_baseline("Cpk", 1.1, "")
project.set_baseline("Sigma Level", 3.3, "Ο")
project.set_baseline("Monthly Defects", 450, "units")
# Target
project.set_target("Defect Rate", 1.0, "%")
project.set_target("Cpk", 1.67, "")
project.set_target("Sigma Level", 5.0, "Ο")
project.set_target("Monthly Defects", 100, "units")
# Actual (after improvement)
project.set_actual("Defect Rate", 1.2, "%")
project.set_actual("Cpk", 1.58, "")
project.set_actual("Sigma Level", 4.75, "Ο")
project.set_actual("Monthly Defects", 120, "units")
# Financial information
# Initial investment: Equipment improvement, training = 3M yen
# Annual savings: Defect reduction (350 units Γ 1,000 yen/unit Γ 12 months) = 4.2M yen
project.set_financials(
investment=3_000_000,
annual_savings=4_200_000,
annual_revenue_increase=0
)
# Summary report
project.print_project_summary()
# Expected output:
# ================================================================================
# DMAIC Project Summary
# ================================================================================
#
# [Project Information]
# Project name: Injection Molding - Dimension Defect Rate Reduction Project
# Start date: 2024-01-15
# Champion: Manufacturing Manager Taro Yamada
# Black Belt: Quality Engineer Hanako Sato
#
# [Improvement Status of Metrics]
# Metric Baseline Target Actual Achievement
# --------------------------------------------------------------------------------
# Defect Rate 4.50 1.00 1.20 94.3%
# Cpk 1.10 1.67 1.58 84.2%
# Sigma Level 3.30 5.00 4.75 85.3%
# Monthly Defects 450.00 100.00 120.00 94.3%
#
# [Financial Evaluation]
# Initial investment: Β₯3,000,000
# Annual benefit: Β₯4,200,000
# 3-year total benefit: Β₯12,600,000
# ROI: 320.0%
# Payback period: 0.71 years (approx 8.6 months)
# NPV (5% discount rate): Β₯8,458,950
#
# ================================================================================
An ROI of 320% means that over 3 years, the project generates 3.2 times the investment in benefits. A payback period of less than 1 year is considered an excellent investment. If NPV (net present value) is positive, the project is economically viable.
Design processes robust to noise using Taguchi method.
# ===================================
# Example 8: Robust Parameter Design using Taguchi Method
# ===================================
from itertools import product
class TaguchiRobustDesign:
"""Robust parameter design using Taguchi method
Find parameter settings that are insensitive to noise factors.
Steps:
------
1. Define control factors and levels
2. Define noise factors and levels
3. Design of experiments using orthogonal arrays
4. Calculate S/N ratio (Signal-to-Noise Ratio)
5. Determine optimal conditions
Parameters:
-----------
control_factors : dict
Control factors and their levels
noise_factors : dict
Noise factors and their levels
"""
def __init__(self, control_factors: Dict, noise_factors: Dict):
self.control_factors = control_factors
self.noise_factors = noise_factors
self.results = []
def run_experiment(self, response_function):
"""Run experiment
Parameters:
-----------
response_function : callable
Function that takes control and noise factors and returns quality characteristic
"""
# Control factor combinations (inner orthogonal array)
control_combinations = list(product(*self.control_factors.values()))
# Noise factor combinations (outer orthogonal array)
noise_combinations = list(product(*self.noise_factors.values()))
for ctrl_values in control_combinations:
# Set control factors
ctrl_dict = dict(zip(self.control_factors.keys(), ctrl_values))
responses = []
for noise_values in noise_combinations:
# Set noise factors
noise_dict = dict(zip(self.noise_factors.keys(), noise_values))
# Measure quality characteristic
response = response_function(ctrl_dict, noise_dict)
responses.append(response)
# Calculate S/N ratio (Nominal-is-best characteristic)
mean_response = np.mean(responses)
std_response = np.std(responses, ddof=1)
# S/N ratio = 10 * log10(meanΒ² / variance)
if std_response > 0:
sn_ratio = 10 * np.log10(mean_response**2 / std_response**2)
else:
sn_ratio = float('inf')
self.results.append({
'control_factors': ctrl_dict,
'mean': mean_response,
'std': std_response,
'sn_ratio': sn_ratio,
'responses': responses
})
def find_optimal_condition(self) -> Dict:
"""Determine optimal condition (maximum S/N ratio)
Returns:
--------
optimal : dict
Optimal control factor settings
"""
if not self.results:
raise ValueError("Run experiment first")
# Select condition with maximum S/N ratio
optimal_result = max(self.results, key=lambda x: x['sn_ratio'])
return {
'optimal_factors': optimal_result['control_factors'],
'sn_ratio': optimal_result['sn_ratio'],
'mean': optimal_result['mean'],
'std': optimal_result['std']
}
def print_results(self):
"""Summary of experimental results"""
if not self.results:
raise ValueError("Run experiment first")
print("=" * 80)
print("Taguchi Method - Robust Parameter Design")
print("=" * 80)
print(f"\n[Experimental Conditions]")
print(f" Number of control factors: {len(self.control_factors)}")
print(f" Number of noise factors: {len(self.noise_factors)}")
print(f" Number of experiments: {len(self.results)}")
print(f"\n[Experimental Results (Top 5 Conditions)]")
sorted_results = sorted(self.results, key=lambda x: x['sn_ratio'], reverse=True)
print(f"{'Rank':<5} {'S/N Ratio':<10} {'Mean':<10} {'Std Dev':<12} {'Control Factor Settings'}")
print("-" * 80)
for i, result in enumerate(sorted_results[:5], 1):
factors_str = ", ".join(f"{k}={v}" for k, v in result['control_factors'].items())
print(f"{i:<5} {result['sn_ratio']:<10.2f} {result['mean']:<10.3f} "
f"{result['std']:<12.4f} {factors_str}")
optimal = self.find_optimal_condition()
print(f"\n[Optimal Condition]")
for factor, value in optimal['optimal_factors'].items():
print(f" {factor}: {value}")
print(f"\n[Predicted Performance]")
print(f" S/N ratio: {optimal['sn_ratio']:.2f} dB")
print(f" Mean: {optimal['mean']:.3f}")
print(f" Standard deviation: {optimal['std']:.4f}")
print(f" Coefficient of variation (CV): {(optimal['std']/optimal['mean']*100):.2f}%")
print("\n" + "=" * 80)
def plot_main_effects(self):
"""Plot main effects"""
if not self.results:
raise ValueError("Run experiment first")
n_factors = len(self.control_factors)
fig, axes = plt.subplots(1, n_factors, figsize=(5*n_factors, 4))
if n_factors == 1:
axes = [axes]
for i, (factor_name, factor_levels) in enumerate(self.control_factors.items()):
# Calculate mean S/N ratio for each level
level_sn_means = []
for level in factor_levels:
sn_values = [r['sn_ratio'] for r in self.results
if r['control_factors'][factor_name] == level]
level_sn_means.append(np.mean(sn_values))
axes[i].plot(factor_levels, level_sn_means,
marker='o', linewidth=2.5, markersize=10,
color='#11998e')
axes[i].set_xlabel(factor_name, fontsize=11, fontweight='bold')
axes[i].set_ylabel('Mean S/N Ratio (dB)', fontsize=10)
axes[i].set_title(f'Main Effect: {factor_name}',
fontsize=12, fontweight='bold')
axes[i].grid(True, alpha=0.3)
plt.suptitle('Main Effects Plot for S/N Ratios',
fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()
# Usage example: Robust design for injection molding
np.random.seed(42)
# Control factors (parameters to optimize)
control_factors = {
'Temperature': [190, 200, 210], # β
'Pressure': [90, 100, 110], # MPa
'Cooling_Time': [25, 30, 35] # sec
}
# Noise factors (uncontrollable variation sources)
noise_factors = {
'Ambient_Temp': [15, 25, 35], # β
'Material_Lot': ['A', 'B']
}
# Response function (use measured values in actual experiments)
def injection_molding_response(control, noise):
"""Simulate injection molding dimension response
Ideal conditions: Temperature 200β, Pressure 100MPa, Cooling 30sec
"""
# Target dimension
target = 50.0 # mm
# Control factor effects
temp_effect = 0.01 * (control['Temperature'] - 200)
pressure_effect = 0.005 * (control['Pressure'] - 100)
cooling_effect = -0.002 * (control['Cooling_Time'] - 30)
# Noise factor effects
ambient_effect = 0.003 * (noise['Ambient_Temp'] - 25)
lot_effect = 0.1 if noise['Material_Lot'] == 'B' else 0
# Measurement error
measurement_error = np.random.normal(0, 0.05)
dimension = (target + temp_effect + pressure_effect +
cooling_effect + ambient_effect + lot_effect +
measurement_error)
return dimension
# Run Taguchi experiment
taguchi = TaguchiRobustDesign(control_factors, noise_factors)
taguchi.run_experiment(injection_molding_response)
# Display results
taguchi.print_results()
# Main effects plot
taguchi.plot_main_effects()
# Expected output:
# ================================================================================
# Taguchi Method - Robust Parameter Design
# ================================================================================
#
# [Experimental Conditions]
# Number of control factors: 3
# Number of noise factors: 2
# Number of experiments: 27
#
# [Experimental Results (Top 5 Conditions)]
# Rank S/N Ratio Mean Std Dev Control Factor Settings
# --------------------------------------------------------------------------------
# 1 37.89 50.012 0.0412 Temperature=200, Pressure=100, Cooling_Time=30
# 2 36.54 50.024 0.0478 Temperature=200, Pressure=90, Cooling_Time=30
# 3 36.21 50.018 0.0501 Temperature=200, Pressure=110, Cooling_Time=30
# 4 35.87 50.135 0.0523 Temperature=190, Pressure=100, Cooling_Time=30
# 5 35.43 49.889 0.0547 Temperature=210, Pressure=100, Cooling_Time=30
#
# [Optimal Condition]
# Temperature: 200
# Pressure: 100
# Cooling_Time: 30
#
# [Predicted Performance]
# S/N ratio: 37.89 dB
# Mean: 50.012
# Standard deviation: 0.0412
# Coefficient of variation (CV): 0.08%
#
# ================================================================================
After completing this chapter, you should be able to explain and implement:
Q1: If a process has a DPMO of 233, what is the sigma level? (Select the closest value)
a) 3Ο
b) 4Ο
c) 5Ο
d) 6Ο
Correct Answer: c) 5Ο
Explanation:
According to the sigma level correspondence table, 5Ο has a DPMO of 233. This corresponds to a yield of 99.977%, rated as "good" quality level.
Q2: In a Gage R&R analysis, measurement system variation accounted for 25% of total variation. Is this measurement system acceptable? Answer with reasoning.
Correct Answer: Marginal (conditionally acceptable)
Reasoning:
According to AIAG (Automotive Industry Action Group) standards:
- < 10%: Acceptable (good)
- 10-30%: Marginal (improvement desired)
- > 30%: Not Acceptable (unusable)
25% is in the "Marginal" range, so use is possible but improvement is recommended. Countermeasures to improve measurement repeatability and reproducibility (operator training, measurement equipment calibration, etc.) should be considered.
Q3: A Six Sigma project has an initial investment of 5 million yen and expected annual savings of 6 million yen. Calculate the 3-year NPV (net present value) using a 5% discount rate. Is this project economically viable?
Calculation:
NPV = -Initial investment + Ξ£(Annual CF / (1+discount rate)^year)
NPV = -5,000,000 + 6,000,000/(1.05)^1 + 6,000,000/(1.05)^2 + 6,000,000/(1.05)^3
NPV = -5,000,000 + 5,714,286 + 5,442,177 + 5,183,026
NPV = 11,339,489 yen
Correct Answer: NPV = approximately 11.3 million yen β Economically viable
Overall Assessment:
Conclusion: Excellent investment opportunity. With NPV over 10 million yen and investment recovery within one year, this should be prioritized for execution.