第5章:Pythonによる品質管理の自動化

リアルタイム監視、ダッシュボード、機械学習による予測

📚 シリーズ: 品質管理・QA入門 ⏱️ 読了時間: 40-45分 🎯 難易度: 中級-上級

この章で学ぶこと

5.1 リアルタイム品質監視システム

5.1.1 自動SPC監視とアラートシステム

製造現場でリアルタイムにプロセスを監視し、管理限界を超えた際に自動でアラートを発生させるシステムを構築します。

例1: リアルタイムSPC監視システム
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Dict, Optional
import json

class RealTimeSPCMonitor:
    """リアルタイムSPC監視システム

    製造プロセスをリアルタイムで監視し、管理限界超過時に
    自動アラートを発生させる。異常パターンの検出も実施。
    """

    def __init__(self, process_name: str, target: float,
                 ucl: float, lcl: float,
                 email_recipients: List[str] = None):
        """
        Args:
            process_name: プロセス名
            target: 目標値
            ucl: 上方管理限界
            lcl: 下方管理限界
            email_recipients: アラート送信先メールアドレスリスト
        """
        self.process_name = process_name
        self.target = target
        self.ucl = ucl
        self.lcl = lcl
        self.email_recipients = email_recipients or []

        self.data_points = []
        self.alerts = []

        # 異常パターン検出用のルール設定(Western Electric Rules)
        self.rules = {
            "rule1": "1点が管理限界外",
            "rule2": "連続3点中2点が2σ外(片側)",
            "rule3": "連続5点中4点が1σ外(片側)",
            "rule4": "連続8点が片側",
            "rule5": "連続6点が単調増加または減少",
            "rule6": "連続14点が交互に上下"
        }

        # σ(標準偏差)を計算
        self.sigma = (ucl - target) / 3

    def add_measurement(self, value: float, timestamp: datetime = None) -> Dict:
        """測定値を追加してリアルタイム評価

        Args:
            value: 測定値
            timestamp: 測定時刻(省略時は現在時刻)

        Returns:
            評価結果
        """
        if timestamp is None:
            timestamp = datetime.now()

        data_point = {
            "timestamp": timestamp,
            "value": value,
            "in_control": self.lcl <= value <= self.ucl
        }

        self.data_points.append(data_point)

        # 異常検出
        violations = self._check_violations()

        if violations:
            alert = self._generate_alert(data_point, violations)
            self.alerts.append(alert)

            # メール送信
            if self.email_recipients:
                self._send_alert_email(alert)

            return {
                "status": "ALERT",
                "value": value,
                "violations": violations,
                "alert_id": alert["alert_id"]
            }
        else:
            return {
                "status": "OK",
                "value": value,
                "violations": []
            }

    def _check_violations(self) -> List[str]:
        """Western Electric Rulesに基づく異常検出

        Returns:
            違反したルールのリスト
        """
        violations = []

        if len(self.data_points) == 0:
            return violations

        recent_data = self.data_points[-14:]  # 最新14点を評価

        # Rule 1: 1点が管理限界外
        if not recent_data[-1]["in_control"]:
            violations.append(self.rules["rule1"])

        if len(recent_data) < 3:
            return violations

        # Rule 2: 連続3点中2点が2σ外(片側)
        last_3 = recent_data[-3:]
        values_3 = [p["value"] for p in last_3]

        upper_2sigma = self.target + 2 * self.sigma
        lower_2sigma = self.target - 2 * self.sigma

        upper_violations = sum(1 for v in values_3 if v > upper_2sigma)
        lower_violations = sum(1 for v in values_3 if v < lower_2sigma)

        if upper_violations >= 2 or lower_violations >= 2:
            violations.append(self.rules["rule2"])

        if len(recent_data) < 5:
            return violations

        # Rule 3: 連続5点中4点が1σ外(片側)
        last_5 = recent_data[-5:]
        values_5 = [p["value"] for p in last_5]

        upper_1sigma = self.target + self.sigma
        lower_1sigma = self.target - self.sigma

        upper_violations_5 = sum(1 for v in values_5 if v > upper_1sigma)
        lower_violations_5 = sum(1 for v in values_5 if v < lower_1sigma)

        if upper_violations_5 >= 4 or lower_violations_5 >= 4:
            violations.append(self.rules["rule3"])

        if len(recent_data) < 8:
            return violations

        # Rule 4: 連続8点が片側
        last_8 = recent_data[-8:]
        values_8 = [p["value"] for p in last_8]

        all_above = all(v > self.target for v in values_8)
        all_below = all(v < self.target for v in values_8)

        if all_above or all_below:
            violations.append(self.rules["rule4"])

        # Rule 5: 連続6点が単調増加または減少
        if len(recent_data) >= 6:
            last_6 = recent_data[-6:]
            values_6 = [p["value"] for p in last_6]

            increasing = all(values_6[i] < values_6[i+1] for i in range(5))
            decreasing = all(values_6[i] > values_6[i+1] for i in range(5))

            if increasing or decreasing:
                violations.append(self.rules["rule5"])

        # Rule 6: 連続14点が交互に上下
        if len(recent_data) >= 14:
            values_14 = [p["value"] for p in recent_data]

            alternating = all(
                (values_14[i] - values_14[i-1]) * (values_14[i+1] - values_14[i]) < 0
                for i in range(1, 13)
            )

            if alternating:
                violations.append(self.rules["rule6"])

        return violations

    def _generate_alert(self, data_point: Dict, violations: List[str]) -> Dict:
        """アラートを生成

        Args:
            data_point: データポイント
            violations: 違反ルールのリスト

        Returns:
            アラート情報
        """
        alert_id = f"ALERT-{datetime.now().strftime('%Y%m%d%H%M%S')}"

        alert = {
            "alert_id": alert_id,
            "timestamp": data_point["timestamp"],
            "process_name": self.process_name,
            "value": data_point["value"],
            "target": self.target,
            "ucl": self.ucl,
            "lcl": self.lcl,
            "violations": violations,
            "severity": "Critical" if self.rules["rule1"] in violations else "Warning",
            "acknowledged": False
        }

        return alert

    def _send_alert_email(self, alert: Dict):
        """アラートメールを送信

        Args:
            alert: アラート情報
        """
        # 実際の実装ではSMTPサーバー設定が必要
        # ここではログ出力のみ
        print(f"\n{'='*60}")
        print(f"📧 アラートメール送信")
        print(f"{'='*60}")
        print(f"送信先: {', '.join(self.email_recipients)}")
        print(f"件名: [{alert['severity']}] {self.process_name} プロセス異常検出")
        print(f"\n--- メール本文 ---")
        print(f"アラートID: {alert['alert_id']}")
        print(f"プロセス: {self.process_name}")
        print(f"発生時刻: {alert['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"測定値: {alert['value']:.3f}")
        print(f"目標値: {self.target:.3f}")
        print(f"管理限界: {self.lcl:.3f} - {self.ucl:.3f}")
        print(f"\n検出された異常パターン:")
        for violation in alert['violations']:
            print(f"  • {violation}")
        print(f"\n即座にプロセスを確認し、必要に応じて処置を実施してください。")
        print(f"{'='*60}\n")

    def get_control_chart_data(self) -> pd.DataFrame:
        """管理図用データを取得

        Returns:
            管理図描画用のDataFrame
        """
        if not self.data_points:
            return pd.DataFrame()

        df = pd.DataFrame(self.data_points)

        # 管理限界線を追加
        df['target'] = self.target
        df['ucl'] = self.ucl
        df['lcl'] = self.lcl
        df['upper_2sigma'] = self.target + 2 * self.sigma
        df['lower_2sigma'] = self.target - 2 * self.sigma
        df['upper_1sigma'] = self.target + self.sigma
        df['lower_1sigma'] = self.target - self.sigma

        return df

    def get_alert_summary(self) -> Dict:
        """アラートサマリーを取得

        Returns:
            アラート統計情報
        """
        if not self.alerts:
            return {"total_alerts": 0}

        df = pd.DataFrame(self.alerts)

        summary = {
            "total_alerts": len(self.alerts),
            "critical_alerts": (df['severity'] == 'Critical').sum(),
            "warning_alerts": (df['severity'] == 'Warning').sum(),
            "latest_alert": self.alerts[-1]['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),
            "most_common_violation": df['violations'].explode().mode()[0]
                                     if len(df) > 0 else None
        }

        return summary


# 使用例
# リアルタイムSPC監視システムを起動
monitor = RealTimeSPCMonitor(
    process_name="成形温度",
    target=200.0,
    ucl=210.0,
    lcl=190.0,
    email_recipients=["quality@example.com", "production@example.com"]
)

# シミュレーション: 測定値を連続追加
np.random.seed(42)

print("=== リアルタイムSPC監視開始 ===\n")

# 正常なデータ
for i in range(10):
    value = np.random.normal(200.0, 2.5)
    result = monitor.add_measurement(value, datetime.now() + timedelta(minutes=i))
    print(f"[{i+1:2d}] 測定値: {value:.2f} → {result['status']}")

# 異常発生: 管理限界超過
for i in range(10, 13):
    value = np.random.normal(212.0, 1.0)  # UCL超過
    result = monitor.add_measurement(value, datetime.now() + timedelta(minutes=i))
    print(f"[{i+1:2d}] 測定値: {value:.2f} → {result['status']}")
    if result['status'] == 'ALERT':
        print(f"     ⚠️  アラート発生: {result['alert_id']}")

# アラートサマリー表示
print("\n=== アラートサマリー ===")
summary = monitor.get_alert_summary()
for key, value in summary.items():
    print(f"{key}: {value}")

💡 実装のポイント

5.1.2 インタラクティブ品質ダッシュボード(Plotly)

Plotlyを使用して、リアルタイムで更新されるインタラクティブな品質ダッシュボードを構築します。

例2: Plotlyインタラクティブダッシュボード
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class QualityDashboard:
    """インタラクティブ品質ダッシュボード

    Plotlyを使用してリアルタイム品質メトリクスを可視化。
    管理図、ヒストグラム、パレート図、時系列トレンドを統合。
    """

    def __init__(self):
        self.data = {
            "control_chart": [],
            "defect_data": [],
            "kpi_data": []
        }

    def add_control_chart_data(self, timestamp: datetime, value: float,
                              target: float, ucl: float, lcl: float):
        """管理図データを追加

        Args:
            timestamp: 時刻
            value: 測定値
            target: 目標値
            ucl: 上方管理限界
            lcl: 下方管理限界
        """
        self.data["control_chart"].append({
            "timestamp": timestamp,
            "value": value,
            "target": target,
            "ucl": ucl,
            "lcl": lcl
        })

    def add_defect_data(self, defect_type: str, count: int):
        """不良データを追加

        Args:
            defect_type: 不良タイプ
            count: 件数
        """
        self.data["defect_data"].append({
            "type": defect_type,
            "count": count
        })

    def add_kpi_data(self, kpi_name: str, actual: float,
                    target: float, unit: str):
        """KPIデータを追加

        Args:
            kpi_name: KPI名称
            actual: 実績値
            target: 目標値
            unit: 単位
        """
        self.data["kpi_data"].append({
            "kpi": kpi_name,
            "actual": actual,
            "target": target,
            "unit": unit,
            "achievement": (actual / target * 100) if target != 0 else 0
        })

    def create_dashboard(self, title: str = "品質ダッシュボード") -> go.Figure:
        """ダッシュボードを生成

        Args:
            title: ダッシュボードタイトル

        Returns:
            Plotly Figureオブジェクト
        """
        # 2x2のサブプロットを作成
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=(
                "Xbar管理図",
                "不良パレート図",
                "KPI達成状況",
                "時系列トレンド(7日間)"
            ),
            specs=[
                [{"type": "scatter"}, {"type": "bar"}],
                [{"type": "bar"}, {"type": "scatter"}]
            ],
            vertical_spacing=0.12,
            horizontal_spacing=0.1
        )

        # 1. 管理図(左上)
        if self.data["control_chart"]:
            self._add_control_chart(fig, row=1, col=1)

        # 2. パレート図(右上)
        if self.data["defect_data"]:
            self._add_pareto_chart(fig, row=1, col=2)

        # 3. KPI達成状況(左下)
        if self.data["kpi_data"]:
            self._add_kpi_chart(fig, row=2, col=1)

        # 4. 時系列トレンド(右下)
        if self.data["control_chart"]:
            self._add_trend_chart(fig, row=2, col=2)

        # レイアウト設定
        fig.update_layout(
            title_text=title,
            title_font_size=20,
            showlegend=True,
            height=800,
            template="plotly_white"
        )

        return fig

    def _add_control_chart(self, fig: go.Figure, row: int, col: int):
        """管理図を追加(内部用)"""
        df = pd.DataFrame(self.data["control_chart"])

        # 測定値
        fig.add_trace(
            go.Scatter(
                x=df['timestamp'],
                y=df['value'],
                mode='lines+markers',
                name='測定値',
                line=dict(color='blue', width=2),
                marker=dict(size=6)
            ),
            row=row, col=col
        )

        # 目標値
        fig.add_trace(
            go.Scatter(
                x=df['timestamp'],
                y=df['target'],
                mode='lines',
                name='目標値',
                line=dict(color='green', width=2, dash='dash')
            ),
            row=row, col=col
        )

        # UCL
        fig.add_trace(
            go.Scatter(
                x=df['timestamp'],
                y=df['ucl'],
                mode='lines',
                name='UCL',
                line=dict(color='red', width=1, dash='dot')
            ),
            row=row, col=col
        )

        # LCL
        fig.add_trace(
            go.Scatter(
                x=df['timestamp'],
                y=df['lcl'],
                mode='lines',
                name='LCL',
                line=dict(color='red', width=1, dash='dot')
            ),
            row=row, col=col
        )

        fig.update_xaxes(title_text="時刻", row=row, col=col)
        fig.update_yaxes(title_text="測定値", row=row, col=col)

    def _add_pareto_chart(self, fig: go.Figure, row: int, col: int):
        """パレート図を追加(内部用)"""
        df = pd.DataFrame(self.data["defect_data"])
        df = df.sort_values('count', ascending=False)

        # 累積比率を計算
        df['cumulative'] = df['count'].cumsum()
        df['cumulative_pct'] = df['cumulative'] / df['count'].sum() * 100

        # 棒グラフ
        fig.add_trace(
            go.Bar(
                x=df['type'],
                y=df['count'],
                name='不良件数',
                marker_color='lightblue',
                yaxis='y'
            ),
            row=row, col=col
        )

        # 累積線グラフ
        fig.add_trace(
            go.Scatter(
                x=df['type'],
                y=df['cumulative_pct'],
                name='累積比率',
                mode='lines+markers',
                line=dict(color='red', width=2),
                marker=dict(size=8),
                yaxis='y2'
            ),
            row=row, col=col
        )

        fig.update_xaxes(title_text="不良タイプ", row=row, col=col)
        fig.update_yaxes(title_text="件数", row=row, col=col)

    def _add_kpi_chart(self, fig: go.Figure, row: int, col: int):
        """KPI達成状況を追加(内部用)"""
        df = pd.DataFrame(self.data["kpi_data"])

        # 色分け(達成率に応じて)
        colors = ['green' if ach >= 100 else 'orange' if ach >= 80 else 'red'
                 for ach in df['achievement']]

        fig.add_trace(
            go.Bar(
                x=df['kpi'],
                y=df['achievement'],
                name='達成率',
                marker_color=colors,
                text=df['achievement'].round(1).astype(str) + '%',
                textposition='outside'
            ),
            row=row, col=col
        )

        # 100%ライン
        fig.add_hline(
            y=100,
            line_dash="dash",
            line_color="green",
            row=row, col=col
        )

        fig.update_xaxes(title_text="KPI", row=row, col=col)
        fig.update_yaxes(title_text="達成率 (%)", row=row, col=col)

    def _add_trend_chart(self, fig: go.Figure, row: int, col: int):
        """時系列トレンドを追加(内部用)"""
        df = pd.DataFrame(self.data["control_chart"])

        # 直近7日間のデータのみ
        if len(df) > 0:
            cutoff = datetime.now() - timedelta(days=7)
            df = df[df['timestamp'] >= cutoff]

        if len(df) > 0:
            fig.add_trace(
                go.Scatter(
                    x=df['timestamp'],
                    y=df['value'],
                    mode='lines',
                    name='トレンド',
                    line=dict(color='purple', width=2),
                    fill='tonexty'
                ),
                row=row, col=col
            )

            fig.update_xaxes(title_text="日時", row=row, col=col)
            fig.update_yaxes(title_text="測定値", row=row, col=col)

    def export_html(self, filename: str):
        """HTMLファイルとしてエクスポート

        Args:
            filename: 出力ファイル名
        """
        fig = self.create_dashboard()
        fig.write_html(filename)
        print(f"ダッシュボードを {filename} に出力しました")


# 使用例
dashboard = QualityDashboard()

# サンプルデータを追加
np.random.seed(42)

# 管理図データ
for i in range(20):
    timestamp = datetime.now() - timedelta(hours=20-i)
    value = np.random.normal(100, 2)
    dashboard.add_control_chart_data(
        timestamp=timestamp,
        value=value,
        target=100.0,
        ucl=106.0,
        lcl=94.0
    )

# 不良データ
defect_types = ["寸法不良", "外観不良", "機能不良", "梱包不良", "その他"]
defect_counts = [45, 28, 15, 8, 4]

for defect_type, count in zip(defect_types, defect_counts):
    dashboard.add_defect_data(defect_type, count)

# KPIデータ
kpis = [
    ("良品率", 98.5, 99.0, "%"),
    ("納期遵守率", 96.0, 95.0, "%"),
    ("歩留まり", 92.0, 90.0, "%"),
    ("クレーム件数", 8, 10, "件")
]

for kpi_name, actual, target, unit in kpis:
    dashboard.add_kpi_data(kpi_name, actual, target, unit)

# ダッシュボードを表示
fig = dashboard.create_dashboard(title="製造品質ダッシュボード - 2025年10月")
# fig.show()  # Jupyter等で表示

# HTMLファイルとして保存
dashboard.export_html("quality_dashboard.html")
print("インタラクティブダッシュボードを生成しました")

5.2 自動レポート生成とデータ統合

5.2.1 PDF/Excel自動レポート生成

品質データから自動的にレポートを生成し、PDFまたはExcel形式で出力するシステムです。

例3: 自動レポート生成システム
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.units import cm
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from datetime import datetime
import pandas as pd
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.chart import BarChart, Reference

class AutoReportGenerator:
    """自動品質レポート生成システム

    品質データを基にPDF/Excel形式のレポートを自動生成。
    管理図、統計サマリー、是正処置リストを含む。
    """

    def __init__(self, report_period: str):
        self.report_period = report_period
        self.data = {
            "summary": {},
            "spc_data": pd.DataFrame(),
            "capa_data": [],
            "kpi_data": []
        }

    def add_summary_data(self, total_production: int, defect_count: int,
                        yield_rate: float, customer_complaints: int):
        """サマリーデータを追加

        Args:
            total_production: 総生産数
            defect_count: 不良数
            yield_rate: 歩留まり率
            customer_complaints: 顧客クレーム件数
        """
        self.data["summary"] = {
            "total_production": total_production,
            "defect_count": defect_count,
            "yield_rate": yield_rate,
            "customer_complaints": customer_complaints,
            "defect_rate": (defect_count / total_production * 100)
                          if total_production > 0 else 0
        }

    def add_spc_data(self, df: pd.DataFrame):
        """SPCデータを追加

        Args:
            df: SPC測定データのDataFrame
        """
        self.data["spc_data"] = df

    def add_capa_data(self, capa_list: list):
        """CAPAデータを追加

        Args:
            capa_list: CAPA情報のリスト
        """
        self.data["capa_data"] = capa_list

    def add_kpi_data(self, kpi_list: list):
        """KPIデータを追加

        Args:
            kpi_list: KPI情報のリスト
        """
        self.data["kpi_data"] = kpi_list

    def generate_excel_report(self, filename: str):
        """Excelレポートを生成

        Args:
            filename: 出力ファイル名
        """
        wb = openpyxl.Workbook()

        # シート1: サマリー
        ws_summary = wb.active
        ws_summary.title = "サマリー"

        # ヘッダー
        ws_summary['A1'] = f"品質レポート - {self.report_period}"
        ws_summary['A1'].font = Font(size=16, bold=True)
        ws_summary['A1'].fill = PatternFill(start_color="4472C4", fill_type="solid")
        ws_summary['A1'].font = Font(size=16, bold=True, color="FFFFFF")

        # サマリーデータ
        summary_data = [
            ["項目", "値"],
            ["総生産数", self.data["summary"]["total_production"]],
            ["不良数", self.data["summary"]["defect_count"]],
            ["不良率", f"{self.data['summary']['defect_rate']:.2f}%"],
            ["歩留まり率", f"{self.data['summary']['yield_rate']:.2f}%"],
            ["顧客クレーム", self.data["summary"]["customer_complaints"]]
        ]

        for row_idx, row_data in enumerate(summary_data, start=3):
            for col_idx, value in enumerate(row_data, start=1):
                cell = ws_summary.cell(row=row_idx, column=col_idx, value=value)
                if row_idx == 3:  # ヘッダー行
                    cell.font = Font(bold=True)
                    cell.fill = PatternFill(start_color="D9E1F2", fill_type="solid")
                cell.alignment = Alignment(horizontal="left")

        # シート2: SPC データ
        if not self.data["spc_data"].empty:
            ws_spc = wb.create_sheet(title="SPC データ")

            # DataFrameをシートに書き込み
            for r_idx, row in enumerate(
                dataframe_to_rows(self.data["spc_data"], index=False, header=True),
                start=1
            ):
                for c_idx, value in enumerate(row, start=1):
                    cell = ws_spc.cell(row=r_idx, column=c_idx, value=value)
                    if r_idx == 1:  # ヘッダー
                        cell.font = Font(bold=True)
                        cell.fill = PatternFill(start_color="D9E1F2", fill_type="solid")

        # シート3: CAPA
        if self.data["capa_data"]:
            ws_capa = wb.create_sheet(title="CAPA")

            capa_headers = ["CAPA ID", "説明", "ステータス", "期限"]
            for col_idx, header in enumerate(capa_headers, start=1):
                cell = ws_capa.cell(row=1, column=col_idx, value=header)
                cell.font = Font(bold=True)
                cell.fill = PatternFill(start_color="D9E1F2", fill_type="solid")

            for row_idx, capa in enumerate(self.data["capa_data"], start=2):
                ws_capa.cell(row=row_idx, column=1, value=capa["capa_id"])
                ws_capa.cell(row=row_idx, column=2, value=capa["description"])
                ws_capa.cell(row=row_idx, column=3, value=capa["status"])
                ws_capa.cell(row=row_idx, column=4, value=capa["due_date"])

        # シート4: KPI
        if self.data["kpi_data"]:
            ws_kpi = wb.create_sheet(title="KPI")

            kpi_headers = ["KPI名", "目標値", "実績値", "達成率"]
            for col_idx, header in enumerate(kpi_headers, start=1):
                cell = ws_kpi.cell(row=1, column=col_idx, value=header)
                cell.font = Font(bold=True)
                cell.fill = PatternFill(start_color="D9E1F2", fill_type="solid")

            for row_idx, kpi in enumerate(self.data["kpi_data"], start=2):
                ws_kpi.cell(row=row_idx, column=1, value=kpi["name"])
                ws_kpi.cell(row=row_idx, column=2, value=kpi["target"])
                ws_kpi.cell(row=row_idx, column=3, value=kpi["actual"])
                ws_kpi.cell(row=row_idx, column=4,
                           value=f"{kpi['achievement']:.1f}%")

                # 達成率に応じて色付け
                achievement = kpi['achievement']
                cell = ws_kpi.cell(row=row_idx, column=4)
                if achievement >= 100:
                    cell.fill = PatternFill(start_color="C6EFCE", fill_type="solid")
                elif achievement >= 80:
                    cell.fill = PatternFill(start_color="FFEB9C", fill_type="solid")
                else:
                    cell.fill = PatternFill(start_color="FFC7CE", fill_type="solid")

        wb.save(filename)
        print(f"Excelレポートを {filename} に出力しました")

    def generate_pdf_report(self, filename: str):
        """PDFレポートを生成

        Args:
            filename: 出力ファイル名
        """
        doc = SimpleDocTemplate(filename, pagesize=A4)
        story = []
        styles = getSampleStyleSheet()

        # タイトル
        title_style = ParagraphStyle(
            'CustomTitle',
            parent=styles['Heading1'],
            fontSize=24,
            textColor=colors.HexColor("#2c3e50"),
            spaceAfter=30
        )

        title = Paragraph(f"品質レポート
{self.report_period}", title_style) story.append(title) story.append(Spacer(1, 0.5*cm)) # サマリーセクション heading_style = styles['Heading2'] story.append(Paragraph("1. サマリー", heading_style)) story.append(Spacer(1, 0.3*cm)) summary_data = [ ["項目", "値"], ["総生産数", f"{self.data['summary']['total_production']:,}"], ["不良数", f"{self.data['summary']['defect_count']:,}"], ["不良率", f"{self.data['summary']['defect_rate']:.2f}%"], ["歩留まり率", f"{self.data['summary']['yield_rate']:.2f}%"], ["顧客クレーム", f"{self.data['summary']['customer_complaints']}件"] ] summary_table = Table(summary_data, colWidths=[8*cm, 6*cm]) summary_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 12), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ('GRID', (0, 0), (-1, -1), 1, colors.black) ])) story.append(summary_table) story.append(Spacer(1, 1*cm)) # CAPAセクション if self.data["capa_data"]: story.append(Paragraph("2. 是正処置(CAPA)", heading_style)) story.append(Spacer(1, 0.3*cm)) capa_data = [["CAPA ID", "説明", "ステータス", "期限"]] for capa in self.data["capa_data"]: capa_data.append([ capa["capa_id"], capa["description"][:40] + "..." if len(capa["description"]) > 40 else capa["description"], capa["status"], capa["due_date"] ]) capa_table = Table(capa_data, colWidths=[3*cm, 7*cm, 2.5*cm, 2.5*cm]) capa_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, -1), 8), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.lightgrey), ('GRID', (0, 0), (-1, -1), 1, colors.black) ])) story.append(capa_table) story.append(Spacer(1, 1*cm)) # KPIセクション if self.data["kpi_data"]: story.append(Paragraph("3. KPI達成状況", heading_style)) story.append(Spacer(1, 0.3*cm)) kpi_data = [["KPI名", "目標", "実績", "達成率"]] for kpi in self.data["kpi_data"]: kpi_data.append([ kpi["name"], str(kpi["target"]), str(kpi["actual"]), f"{kpi['achievement']:.1f}%" ]) kpi_table = Table(kpi_data, colWidths=[5*cm, 3*cm, 3*cm, 3*cm]) kpi_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 10), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.lightblue), ('GRID', (0, 0), (-1, -1), 1, colors.black) ])) story.append(kpi_table) # PDFビルド doc.build(story) print(f"PDFレポートを {filename} に出力しました") # openpyxl用のヘルパー関数 from openpyxl.utils.dataframe import dataframe_to_rows # 使用例 report_gen = AutoReportGenerator(report_period="2025年10月") # サマリーデータを追加 report_gen.add_summary_data( total_production=50000, defect_count=750, yield_rate=98.5, customer_complaints=8 ) # CAPAデータを追加 capa_list = [ { "capa_id": "CAPA-20251001-001", "description": "成形温度の管理限界超過", "status": "進行中", "due_date": "2025-10-31" }, { "capa_id": "CAPA-20251005-002", "description": "検査漏れによる不良流出", "status": "完了", "due_date": "2025-10-25" } ] report_gen.add_capa_data(capa_list) # KPIデータを追加 kpi_list = [ {"name": "良品率", "target": 99.0, "actual": 98.5, "achievement": 99.5}, {"name": "納期遵守率", "target": 95.0, "actual": 96.5, "achievement": 101.6}, {"name": "クレーム削減", "target": 10, "actual": 8, "achievement": 125.0} ] report_gen.add_kpi_data(kpi_list) # レポート生成 report_gen.generate_excel_report("quality_report_202510.xlsx") # report_gen.generate_pdf_report("quality_report_202510.pdf") # 日本語フォント設定が必要 print("\n自動レポート生成完了")

5.2.2 データベース統合による品質データ管理

SQLiteデータベースを使用して品質データを永続化し、履歴管理とクエリ機能を実装します。

例4: データベース統合品質管理システム
import sqlite3
import pandas as pd
from datetime import datetime
from typing import List, Dict, Optional

class QualityDatabase:
    """品質データベース管理システム

    SQLiteを使用して品質データを永続化。
    測定データ、不適合、CAPA、監査結果を一元管理。
    """

    def __init__(self, db_file: str = "quality_data.db"):
        self.db_file = db_file
        self.conn = None
        self._initialize_database()

    def _initialize_database(self):
        """データベースとテーブルを初期化"""
        self.conn = sqlite3.connect(self.db_file)
        cursor = self.conn.cursor()

        # 測定データテーブル
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS measurements (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp DATETIME NOT NULL,
            process_name TEXT NOT NULL,
            parameter_name TEXT NOT NULL,
            value REAL NOT NULL,
            target REAL,
            ucl REAL,
            lcl REAL,
            in_control BOOLEAN,
            operator TEXT,
            lot_number TEXT,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
        """)

        # 不適合テーブル
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS nonconformances (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            nc_id TEXT UNIQUE NOT NULL,
            date_detected DATE NOT NULL,
            product TEXT,
            defect_type TEXT NOT NULL,
            quantity INTEGER,
            severity TEXT,
            detected_by TEXT,
            status TEXT DEFAULT 'Open',
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
        """)

        # CAPAテーブル
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS capas (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            capa_id TEXT UNIQUE NOT NULL,
            nc_id TEXT,
            description TEXT NOT NULL,
            root_cause TEXT,
            corrective_action TEXT,
            preventive_action TEXT,
            assigned_to TEXT,
            due_date DATE,
            status TEXT DEFAULT 'Open',
            effectiveness_verified BOOLEAN DEFAULT 0,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (nc_id) REFERENCES nonconformances(nc_id)
        )
        """)

        # 監査テーブル
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS audits (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            audit_id TEXT UNIQUE NOT NULL,
            audit_date DATE NOT NULL,
            department TEXT NOT NULL,
            auditor TEXT NOT NULL,
            findings_count INTEGER DEFAULT 0,
            nc_count INTEGER DEFAULT 0,
            status TEXT DEFAULT 'Planned',
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
        """)

        self.conn.commit()
        print(f"データベース初期化完了: {self.db_file}")

    def add_measurement(self, timestamp: datetime, process_name: str,
                       parameter_name: str, value: float,
                       target: float = None, ucl: float = None,
                       lcl: float = None, operator: str = None,
                       lot_number: str = None):
        """測定データを記録

        Args:
            timestamp: 測定時刻
            process_name: プロセス名
            parameter_name: パラメータ名
            value: 測定値
            target: 目標値
            ucl: 上方管理限界
            lcl: 下方管理限界
            operator: 作業者
            lot_number: ロット番号
        """
        cursor = self.conn.cursor()

        in_control = True
        if ucl is not None and lcl is not None:
            in_control = lcl <= value <= ucl

        cursor.execute("""
        INSERT INTO measurements
        (timestamp, process_name, parameter_name, value, target, ucl, lcl,
         in_control, operator, lot_number)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (timestamp, process_name, parameter_name, value, target, ucl, lcl,
              in_control, operator, lot_number))

        self.conn.commit()

    def add_nonconformance(self, nc_id: str, date_detected: datetime,
                          product: str, defect_type: str,
                          quantity: int, severity: str,
                          detected_by: str) -> int:
        """不適合を記録

        Args:
            nc_id: 不適合ID
            date_detected: 検出日
            product: 製品名
            defect_type: 不良タイプ
            quantity: 数量
            severity: 重大度
            detected_by: 検出者

        Returns:
            挿入されたレコードのID
        """
        cursor = self.conn.cursor()

        cursor.execute("""
        INSERT INTO nonconformances
        (nc_id, date_detected, product, defect_type, quantity, severity, detected_by)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (nc_id, date_detected, product, defect_type, quantity, severity, detected_by))

        self.conn.commit()
        return cursor.lastrowid

    def add_capa(self, capa_id: str, nc_id: str, description: str,
                assigned_to: str, due_date: datetime) -> int:
        """CAPAを記録

        Args:
            capa_id: CAPA ID
            nc_id: 関連する不適合ID
            description: 説明
            assigned_to: 担当者
            due_date: 期限

        Returns:
            挿入されたレコードのID
        """
        cursor = self.conn.cursor()

        cursor.execute("""
        INSERT INTO capas
        (capa_id, nc_id, description, assigned_to, due_date)
        VALUES (?, ?, ?, ?, ?)
        """, (capa_id, nc_id, description, assigned_to, due_date))

        self.conn.commit()
        return cursor.lastrowid

    def update_capa(self, capa_id: str, root_cause: str = None,
                   corrective_action: str = None,
                   preventive_action: str = None,
                   status: str = None,
                   effectiveness_verified: bool = None):
        """CAPAを更新

        Args:
            capa_id: CAPA ID
            root_cause: 根本原因
            corrective_action: 是正処置
            preventive_action: 予防処置
            status: ステータス
            effectiveness_verified: 有効性確認済み
        """
        cursor = self.conn.cursor()

        updates = []
        params = []

        if root_cause is not None:
            updates.append("root_cause = ?")
            params.append(root_cause)

        if corrective_action is not None:
            updates.append("corrective_action = ?")
            params.append(corrective_action)

        if preventive_action is not None:
            updates.append("preventive_action = ?")
            params.append(preventive_action)

        if status is not None:
            updates.append("status = ?")
            params.append(status)

        if effectiveness_verified is not None:
            updates.append("effectiveness_verified = ?")
            params.append(effectiveness_verified)

        if updates:
            params.append(capa_id)
            sql = f"UPDATE capas SET {', '.join(updates)} WHERE capa_id = ?"
            cursor.execute(sql, params)
            self.conn.commit()

    def get_measurements(self, process_name: str = None,
                        start_date: datetime = None,
                        end_date: datetime = None) -> pd.DataFrame:
        """測定データを取得

        Args:
            process_name: プロセス名(フィルタ用)
            start_date: 開始日
            end_date: 終了日

        Returns:
            測定データのDataFrame
        """
        query = "SELECT * FROM measurements WHERE 1=1"
        params = []

        if process_name:
            query += " AND process_name = ?"
            params.append(process_name)

        if start_date:
            query += " AND timestamp >= ?"
            params.append(start_date)

        if end_date:
            query += " AND timestamp <= ?"
            params.append(end_date)

        query += " ORDER BY timestamp"

        df = pd.read_sql_query(query, self.conn, params=params)
        return df

    def get_open_capas(self) -> pd.DataFrame:
        """未完了CAPAを取得

        Returns:
            未完了CAPAのDataFrame
        """
        query = """
        SELECT c.capa_id, c.description, c.assigned_to, c.due_date,
               c.status, n.defect_type, n.severity
        FROM capas c
        LEFT JOIN nonconformances n ON c.nc_id = n.nc_id
        WHERE c.status != 'Closed'
        ORDER BY c.due_date
        """

        df = pd.read_sql_query(query, self.conn)
        return df

    def get_defect_analysis(self, start_date: datetime = None,
                           end_date: datetime = None) -> pd.DataFrame:
        """不良分析(パレート図用データ)を取得

        Args:
            start_date: 開始日
            end_date: 終了日

        Returns:
            不良タイプ別集計のDataFrame
        """
        query = """
        SELECT defect_type, SUM(quantity) as total_quantity, COUNT(*) as occurrence
        FROM nonconformances
        WHERE 1=1
        """
        params = []

        if start_date:
            query += " AND date_detected >= ?"
            params.append(start_date)

        if end_date:
            query += " AND date_detected <= ?"
            params.append(end_date)

        query += " GROUP BY defect_type ORDER BY total_quantity DESC"

        df = pd.read_sql_query(query, self.conn, params=params)
        return df

    def get_dashboard_metrics(self) -> Dict:
        """ダッシュボード用メトリクスを取得

        Returns:
            主要メトリクスの辞書
        """
        cursor = self.conn.cursor()

        # 今月のデータのみ
        start_of_month = datetime.now().replace(day=1, hour=0, minute=0, second=0)

        # 総測定数
        cursor.execute("""
        SELECT COUNT(*) FROM measurements WHERE timestamp >= ?
        """, (start_of_month,))
        total_measurements = cursor.fetchone()[0]

        # 管理外の測定数
        cursor.execute("""
        SELECT COUNT(*) FROM measurements
        WHERE timestamp >= ? AND in_control = 0
        """, (start_of_month,))
        out_of_control = cursor.fetchone()[0]

        # 不適合件数
        cursor.execute("""
        SELECT COUNT(*), COALESCE(SUM(quantity), 0)
        FROM nonconformances WHERE date_detected >= ?
        """, (start_of_month,))
        nc_count, nc_quantity = cursor.fetchone()

        # 未完了CAPA
        cursor.execute("""
        SELECT COUNT(*) FROM capas WHERE status != 'Closed'
        """)
        open_capas = cursor.fetchone()[0]

        metrics = {
            "total_measurements": total_measurements,
            "out_of_control_rate": (out_of_control / total_measurements * 100)
                                   if total_measurements > 0 else 0,
            "nc_count": nc_count,
            "nc_quantity": nc_quantity,
            "open_capas": open_capas
        }

        return metrics

    def close(self):
        """データベース接続を閉じる"""
        if self.conn:
            self.conn.close()
            print("データベース接続を閉じました")


# 使用例
db = QualityDatabase("quality_management.db")

# 測定データを記録
for i in range(10):
    timestamp = datetime.now() - timedelta(hours=10-i)
    value = np.random.normal(100, 2)

    db.add_measurement(
        timestamp=timestamp,
        process_name="成形温度",
        parameter_name="温度",
        value=value,
        target=100.0,
        ucl=106.0,
        lcl=94.0,
        operator="作業者A",
        lot_number=f"LOT-2025{i:03d}"
    )

# 不適合を記録
nc_id = "NC-20251026-001"
db.add_nonconformance(
    nc_id=nc_id,
    date_detected=datetime.now(),
    product="製品X",
    defect_type="寸法不良",
    quantity=50,
    severity="Major",
    detected_by="検査員B"
)

# CAPAを記録
capa_id = "CAPA-20251026-001"
db.add_capa(
    capa_id=capa_id,
    nc_id=nc_id,
    description="成形金型の摩耗による寸法不良",
    assigned_to="製造部長",
    due_date=datetime.now() + timedelta(days=30)
)

# CAPAを更新
db.update_capa(
    capa_id=capa_id,
    root_cause="金型のメンテナンス不足",
    corrective_action="金型を交換し、測定を実施",
    preventive_action="定期メンテナンススケジュールを確立",
    status="進行中"
)

# データ取得
print("\n=== 測定データ ===")
measurements_df = db.get_measurements(process_name="成形温度")
print(measurements_df.head())

print("\n=== 未完了CAPA ===")
open_capas_df = db.get_open_capas()
print(open_capas_df)

print("\n=== ダッシュボードメトリクス ===")
metrics = db.get_dashboard_metrics()
for key, value in metrics.items():
    print(f"{key}: {value}")

# データベース接続を閉じる
db.close()

5.3 機械学習による品質予測

5.3.1 不良予測システム(機械学習モデル)

製造パラメータから不良を予測する機械学習モデルを構築します。

例5: 機械学習による不良予測システム
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.preprocessing import StandardScaler
import joblib
from typing import Dict, Tuple

class DefectPredictionSystem:
    """機械学習による不良予測システム

    製造プロセスパラメータから不良発生を予測。
    Random Forestモデルで高精度な予測を実現。
    """

    def __init__(self):
        self.model = None
        self.scaler = None
        self.feature_names = None
        self.is_trained = False

    def prepare_training_data(self, df: pd.DataFrame,
                             target_column: str = 'is_defect',
                             feature_columns: List[str] = None) -> Tuple:
        """訓練データを準備

        Args:
            df: 製造データのDataFrame
            target_column: 目的変数カラム名
            feature_columns: 特徴量カラム名のリスト

        Returns:
            (X_train, X_test, y_train, y_test)のタプル
        """
        if feature_columns is None:
            # target_columnを除く全数値カラムを使用
            feature_columns = df.select_dtypes(include=[np.number]).columns.tolist()
            feature_columns.remove(target_column)

        self.feature_names = feature_columns

        X = df[feature_columns]
        y = df[target_column]

        # データ分割
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # 標準化
        self.scaler = StandardScaler()
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        return X_train_scaled, X_test_scaled, y_train, y_test

    def train(self, X_train, y_train, n_estimators: int = 100,
             max_depth: int = 10):
        """モデルを訓練

        Args:
            X_train: 訓練データ(特徴量)
            y_train: 訓練データ(目的変数)
            n_estimators: 決定木の数
            max_depth: 木の最大深さ
        """
        self.model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            random_state=42,
            class_weight='balanced'  # 不均衡データ対策
        )

        self.model.fit(X_train, y_train)
        self.is_trained = True

        print(f"モデル訓練完了")
        print(f"  決定木数: {n_estimators}")
        print(f"  最大深さ: {max_depth}")

    def evaluate(self, X_test, y_test) -> Dict:
        """モデルを評価

        Args:
            X_test: テストデータ(特徴量)
            y_test: テストデータ(目的変数)

        Returns:
            評価メトリクスの辞書
        """
        if not self.is_trained:
            raise ValueError("モデルが訓練されていません")

        y_pred = self.model.predict(X_test)
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]

        # 混同行列
        cm = confusion_matrix(y_test, y_pred)
        tn, fp, fn, tp = cm.ravel()

        # メトリクス計算
        accuracy = (tp + tn) / (tp + tn + fp + fn)
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_score = (2 * precision * recall / (precision + recall)
                   if (precision + recall) > 0 else 0)
        roc_auc = roc_auc_score(y_test, y_pred_proba)

        metrics = {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1_score": f1_score,
            "roc_auc": roc_auc,
            "confusion_matrix": cm
        }

        # 結果を表示
        print("\n=== モデル評価結果 ===")
        print(f"正解率: {accuracy:.3f}")
        print(f"適合率: {precision:.3f}")
        print(f"再現率: {recall:.3f}")
        print(f"F1スコア: {f1_score:.3f}")
        print(f"ROC-AUC: {roc_auc:.3f}")
        print("\n混同行列:")
        print(f"  真陰性: {tn}, 偽陽性: {fp}")
        print(f"  偽陰性: {fn}, 真陽性: {tp}")

        return metrics

    def predict(self, X: pd.DataFrame) -> Dict:
        """不良を予測

        Args:
            X: 製造パラメータのDataFrame

        Returns:
            予測結果(予測クラス、確率、リスクレベル)
        """
        if not self.is_trained:
            raise ValueError("モデルが訓練されていません")

        # 特徴量の順序を確認
        X_ordered = X[self.feature_names]

        # 標準化
        X_scaled = self.scaler.transform(X_ordered)

        # 予測
        prediction = self.model.predict(X_scaled)[0]
        probability = self.model.predict_proba(X_scaled)[0, 1]

        # リスクレベルを判定
        if probability >= 0.7:
            risk_level = "High"
        elif probability >= 0.4:
            risk_level = "Medium"
        else:
            risk_level = "Low"

        result = {
            "prediction": "不良" if prediction == 1 else "良品",
            "defect_probability": probability,
            "risk_level": risk_level
        }

        return result

    def get_feature_importance(self) -> pd.DataFrame:
        """特徴量の重要度を取得

        Returns:
            特徴量重要度のDataFrame
        """
        if not self.is_trained:
            raise ValueError("モデルが訓練されていません")

        importance_df = pd.DataFrame({
            'feature': self.feature_names,
            'importance': self.model.feature_importances_
        })

        importance_df = importance_df.sort_values('importance', ascending=False)

        return importance_df

    def save_model(self, filename: str):
        """モデルを保存

        Args:
            filename: 保存ファイル名
        """
        if not self.is_trained:
            raise ValueError("モデルが訓練されていません")

        model_data = {
            'model': self.model,
            'scaler': self.scaler,
            'feature_names': self.feature_names
        }

        joblib.dump(model_data, filename)
        print(f"モデルを {filename} に保存しました")

    def load_model(self, filename: str):
        """モデルを読み込み

        Args:
            filename: モデルファイル名
        """
        model_data = joblib.load(filename)

        self.model = model_data['model']
        self.scaler = model_data['scaler']
        self.feature_names = model_data['feature_names']
        self.is_trained = True

        print(f"モデルを {filename} から読み込みました")


# 使用例
# サンプルデータ生成
np.random.seed(42)

n_samples = 1000

# 製造パラメータ(特徴量)
temperature = np.random.normal(200, 5, n_samples)
pressure = np.random.normal(50, 3, n_samples)
speed = np.random.normal(100, 10, n_samples)
humidity = np.random.normal(60, 10, n_samples)

# 不良発生ロジック(温度が高く、圧力が低い場合に不良が発生しやすい)
defect_score = (
    (temperature - 200) * 0.3 +
    (50 - pressure) * 0.4 +
    (speed - 100) * 0.1 +
    np.random.normal(0, 2, n_samples)
)

is_defect = (defect_score > 2).astype(int)

# DataFrameを作成
df = pd.DataFrame({
    'temperature': temperature,
    'pressure': pressure,
    'speed': speed,
    'humidity': humidity,
    'is_defect': is_defect
})

print(f"データ生成完了: {len(df)}サンプル")
print(f"不良率: {is_defect.mean() * 100:.2f}%")

# 不良予測システムを初期化
defect_predictor = DefectPredictionSystem()

# 訓練データを準備
X_train, X_test, y_train, y_test = defect_predictor.prepare_training_data(df)

# モデルを訓練
defect_predictor.train(X_train, y_train, n_estimators=100, max_depth=10)

# モデルを評価
metrics = defect_predictor.evaluate(X_test, y_test)

# 特徴量の重要度を表示
print("\n=== 特徴量の重要度 ===")
importance_df = defect_predictor.get_feature_importance()
print(importance_df)

# 新しいデータで予測
new_data = pd.DataFrame({
    'temperature': [210],  # 高温
    'pressure': [45],      # 低圧
    'speed': [105],
    'humidity': [65]
})

prediction = defect_predictor.predict(new_data)
print("\n=== 予測結果 ===")
print(f"予測: {prediction['prediction']}")
print(f"不良確率: {prediction['defect_probability']:.3f}")
print(f"リスクレベル: {prediction['risk_level']}")

# モデルを保存
defect_predictor.save_model("defect_prediction_model.pkl")

5.3.2 API統合とデータ収集自動化

外部API(MES、ERPシステム)との統合により、データ収集を自動化します。

例6: API統合データ収集システム
import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import json
import time

class MESAPIIntegration:
    """MES(製造実行システム)API統合

    MESシステムから製造データを自動収集し、
    品質データベースに統合。
    """

    def __init__(self, api_base_url: str, api_key: str):
        """
        Args:
            api_base_url: MES API のベースURL
            api_key: 認証用APIキー
        """
        self.api_base_url = api_base_url
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def get_production_data(self, start_time: datetime,
                           end_time: datetime,
                           line_id: str = None) -> pd.DataFrame:
        """生産データを取得

        Args:
            start_time: 開始時刻
            end_time: 終了時刻
            line_id: 製造ラインID(オプション)

        Returns:
            生産データのDataFrame
        """
        endpoint = f"{self.api_base_url}/production/data"

        params = {
            "start_time": start_time.isoformat(),
            "end_time": end_time.isoformat()
        }

        if line_id:
            params["line_id"] = line_id

        try:
            response = requests.get(
                endpoint,
                headers=self.headers,
                params=params,
                timeout=30
            )

            response.raise_for_status()
            data = response.json()

            # DataFrameに変換
            df = pd.DataFrame(data['records'])

            print(f"生産データ取得成功: {len(df)}レコード")
            return df

        except requests.exceptions.RequestException as e:
            print(f"API呼び出しエラー: {e}")
            return pd.DataFrame()

    def get_quality_inspections(self, start_date: datetime,
                               end_date: datetime) -> pd.DataFrame:
        """品質検査データを取得

        Args:
            start_date: 開始日
            end_date: 終了日

        Returns:
            検査データのDataFrame
        """
        endpoint = f"{self.api_base_url}/quality/inspections"

        params = {
            "start_date": start_date.strftime("%Y-%m-%d"),
            "end_date": end_date.strftime("%Y-%m-%d")
        }

        try:
            response = requests.get(
                endpoint,
                headers=self.headers,
                params=params,
                timeout=30
            )

            response.raise_for_status()
            data = response.json()

            df = pd.DataFrame(data['inspections'])

            print(f"検査データ取得成功: {len(df)}レコード")
            return df

        except requests.exceptions.RequestException as e:
            print(f"API呼び出しエラー: {e}")
            return pd.DataFrame()

    def post_quality_alert(self, alert_data: Dict) -> bool:
        """品質アラートをMESに送信

        Args:
            alert_data: アラート情報

        Returns:
            送信成功の可否
        """
        endpoint = f"{self.api_base_url}/quality/alerts"

        try:
            response = requests.post(
                endpoint,
                headers=self.headers,
                json=alert_data,
                timeout=30
            )

            response.raise_for_status()

            print(f"アラート送信成功: {alert_data['alert_id']}")
            return True

        except requests.exceptions.RequestException as e:
            print(f"アラート送信エラー: {e}")
            return False


class AutomatedDataCollector:
    """自動データ収集システム

    定期的にMES APIからデータを収集し、
    品質データベースに格納。異常検出と通知も実施。
    """

    def __init__(self, mes_api: MESAPIIntegration,
                quality_db: QualityDatabase,
                spc_monitor: RealTimeSPCMonitor):
        self.mes_api = mes_api
        self.quality_db = quality_db
        self.spc_monitor = spc_monitor

    def collect_and_process(self, interval_minutes: int = 15):
        """データ収集と処理を実行

        Args:
            interval_minutes: データ収集間隔(分)
        """
        end_time = datetime.now()
        start_time = end_time - timedelta(minutes=interval_minutes)

        print(f"\n{'='*60}")
        print(f"データ収集開始: {start_time} - {end_time}")
        print(f"{'='*60}")

        # 生産データを取得
        production_df = self.mes_api.get_production_data(
            start_time, end_time
        )

        if production_df.empty:
            print("収集するデータがありません")
            return

        # 各レコードを処理
        for idx, row in production_df.iterrows():
            # データベースに保存
            self.quality_db.add_measurement(
                timestamp=pd.to_datetime(row['timestamp']),
                process_name=row['process_name'],
                parameter_name=row['parameter'],
                value=row['value'],
                target=row.get('target'),
                ucl=row.get('ucl'),
                lcl=row.get('lcl'),
                operator=row.get('operator'),
                lot_number=row.get('lot_number')
            )

            # SPCモニタリング
            result = self.spc_monitor.add_measurement(
                value=row['value'],
                timestamp=pd.to_datetime(row['timestamp'])
            )

            # アラート発生時はMESに通知
            if result['status'] == 'ALERT':
                alert_data = {
                    "alert_id": result['alert_id'],
                    "timestamp": datetime.now().isoformat(),
                    "process_name": row['process_name'],
                    "severity": "Critical",
                    "violations": result['violations'],
                    "value": row['value']
                }

                self.mes_api.post_quality_alert(alert_data)

        print(f"データ処理完了: {len(production_df)}レコード")

    def run_continuous_collection(self, interval_minutes: int = 15,
                                 max_iterations: int = None):
        """継続的データ収集を実行

        Args:
            interval_minutes: 収集間隔(分)
            max_iterations: 最大繰り返し回数(None = 無限)
        """
        iteration = 0

        print("継続的データ収集を開始します")
        print(f"収集間隔: {interval_minutes}分")

        try:
            while True:
                iteration += 1

                if max_iterations and iteration > max_iterations:
                    print(f"{max_iterations}回の収集が完了しました")
                    break

                self.collect_and_process(interval_minutes)

                # 次の収集まで待機
                print(f"\n次の収集まで{interval_minutes}分待機...")
                time.sleep(interval_minutes * 60)

        except KeyboardInterrupt:
            print("\n\n収集を手動で停止しました")


# 使用例(シミュレーション)
# 注意: 実際のAPI URLとキーが必要

# MES APIを初期化(ダミーURL)
# mes_api = MESAPIIntegration(
#     api_base_url="https://mes.example.com/api/v1",
#     api_key="your_api_key_here"
# )

# 品質データベースとSPCモニターを初期化
# quality_db = QualityDatabase("quality_data.db")
# spc_monitor = RealTimeSPCMonitor(
#     process_name="成形温度",
#     target=200.0,
#     ucl=210.0,
#     lcl=190.0
# )

# 自動収集システムを初期化
# collector = AutomatedDataCollector(mes_api, quality_db, spc_monitor)

# 継続的データ収集を開始
# collector.run_continuous_collection(interval_minutes=15)

print("\nAPI統合システムの例を示しました")
print("実際の使用には、MESシステムのAPI仕様に合わせた実装が必要です")

5.3.3 自動サンプリング計画生成

統計的手法に基づいて、最適なサンプリング計画を自動生成します。

例7: 自動サンプリング計画生成システム
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, Tuple

class SamplingPlanGenerator:
    """自動サンプリング計画生成システム

    AQL(Acceptable Quality Level)に基づいて、
    統計的に有意なサンプリング計画を自動生成。
    """

    def __init__(self):
        # MIL-STD-105E サンプリングコード表(簡略版)
        self.sample_size_codes = {
            "S-1": {"range": (2, 8), "code_letter": "A"},
            "S-2": {"range": (9, 15), "code_letter": "A"},
            "S-3": {"range": (16, 25), "code_letter": "B"},
            "S-4": {"range": (26, 50), "code_letter": "C"},
            "S-5": {"range": (51, 90), "code_letter": "D"},
            "S-6": {"range": (91, 150), "code_letter": "E"},
            "S-7": {"range": (151, 280), "code_letter": "F"},
            "S-8": {"range": (281, 500), "code_letter": "G"},
            "S-9": {"range": (501, 1200), "code_letter": "H"},
            "S-10": {"range": (1201, 3200), "code_letter": "J"},
            "S-11": {"range": (3201, 10000), "code_letter": "K"},
        }

        # シングルサンプリング計画(簡略版)
        self.single_sampling_plans = {
            "A": {"1.0": {"n": 2, "Ac": 0, "Re": 1}},
            "B": {"1.0": {"n": 3, "Ac": 0, "Re": 1}},
            "C": {"1.0": {"n": 5, "Ac": 0, "Re": 1}},
            "D": {"1.0": {"n": 8, "Ac": 0, "Re": 1}},
            "E": {"1.0": {"n": 13, "Ac": 0, "Re": 1}},
            "F": {"1.0": {"n": 20, "Ac": 0, "Re": 1}},
            "G": {"1.0": {"n": 32, "Ac": 1, "Re": 2}},
            "H": {"1.0": {"n": 50, "Ac": 1, "Re": 2}},
            "J": {"1.0": {"n": 80, "Ac": 2, "Re": 3}},
            "K": {"1.0": {"n": 125, "Ac": 3, "Re": 4}},
        }

    def generate_single_sampling_plan(self, lot_size: int,
                                     aql: float = 1.0,
                                     inspection_level: str = "II") -> Dict:
        """シングルサンプリング計画を生成

        Args:
            lot_size: ロットサイズ
            aql: 合格品質水準(%)
            inspection_level: 検査レベル("I", "II", "III")

        Returns:
            サンプリング計画(サンプルサイズ、合格判定個数、不合格判定個数)
        """
        # サンプルサイズコード文字を決定
        code_letter = None
        for code_info in self.sample_size_codes.values():
            lot_range = code_info["range"]
            if lot_range[0] <= lot_size <= lot_range[1]:
                code_letter = code_info["code_letter"]
                break

        if code_letter is None:
            raise ValueError(f"ロットサイズ {lot_size} が範囲外です")

        # サンプリング計画を取得
        aql_str = str(aql)
        if code_letter in self.single_sampling_plans:
            if aql_str in self.single_sampling_plans[code_letter]:
                plan = self.single_sampling_plans[code_letter][aql_str]
            else:
                # デフォルトとして1.0を使用
                plan = self.single_sampling_plans[code_letter]["1.0"]
        else:
            raise ValueError(f"コード文字 {code_letter} のプランがありません")

        sampling_plan = {
            "lot_size": lot_size,
            "aql": aql,
            "inspection_level": inspection_level,
            "code_letter": code_letter,
            "sample_size": plan["n"],
            "acceptance_number": plan["Ac"],
            "rejection_number": plan["Re"]
        }

        return sampling_plan

    def calculate_oc_curve(self, n: int, Ac: int,
                          p_range: np.ndarray = None) -> pd.DataFrame:
        """OC曲線(検査特性曲線)を計算

        Args:
            n: サンプルサイズ
            Ac: 合格判定個数
            p_range: 不良率の範囲(配列)

        Returns:
            OC曲線データのDataFrame
        """
        if p_range is None:
            p_range = np.linspace(0, 0.15, 100)

        # 各不良率での合格確率を計算(二項分布)
        pa_values = []
        for p in p_range:
            pa = sum(stats.binom.pmf(k, n, p) for k in range(Ac + 1))
            pa_values.append(pa)

        oc_df = pd.DataFrame({
            "defect_rate": p_range * 100,  # %表示
            "probability_acceptance": pa_values
        })

        return oc_df

    def generate_double_sampling_plan(self, lot_size: int,
                                     aql: float = 1.0) -> Dict:
        """ダブルサンプリング計画を生成

        Args:
            lot_size: ロットサイズ
            aql: 合格品質水準(%)

        Returns:
            ダブルサンプリング計画
        """
        # シングルプランから導出(簡易版)
        single_plan = self.generate_single_sampling_plan(lot_size, aql)

        n1 = int(single_plan["sample_size"] * 0.6)
        n2 = int(single_plan["sample_size"] * 0.6)

        Ac1 = 0
        Re1 = max(2, single_plan["rejection_number"])
        Ac2 = single_plan["acceptance_number"]
        Re2 = Ac2 + 1

        double_plan = {
            "lot_size": lot_size,
            "aql": aql,
            "first_sample_size": n1,
            "first_acceptance_number": Ac1,
            "first_rejection_number": Re1,
            "second_sample_size": n2,
            "second_acceptance_number": Ac2,
            "second_rejection_number": Re2,
            "total_max_sample_size": n1 + n2
        }

        return double_plan

    def calculate_sample_size_for_estimation(self,
                                            confidence_level: float = 0.95,
                                            margin_of_error: float = 0.05,
                                            population_std: float = 1.0,
                                            population_size: int = None) -> int:
        """母集団推定のためのサンプルサイズを計算

        Args:
            confidence_level: 信頼水準(0.9, 0.95, 0.99)
            margin_of_error: 許容誤差
            population_std: 母集団の標準偏差(推定値)
            population_size: 母集団サイズ(有限母集団補正用)

        Returns:
            必要サンプルサイズ
        """
        # Z値を取得
        z = stats.norm.ppf((1 + confidence_level) / 2)

        # 無限母集団の場合のサンプルサイズ
        n = (z * population_std / margin_of_error) ** 2

        # 有限母集団補正
        if population_size:
            n = n / (1 + (n - 1) / population_size)

        return int(np.ceil(n))


# 使用例
sampler = SamplingPlanGenerator()

# シングルサンプリング計画を生成
print("=== シングルサンプリング計画 ===")
plan = sampler.generate_single_sampling_plan(
    lot_size=500,
    aql=1.0,
    inspection_level="II"
)

print(f"ロットサイズ: {plan['lot_size']}")
print(f"AQL: {plan['aql']}%")
print(f"コード文字: {plan['code_letter']}")
print(f"サンプルサイズ: {plan['sample_size']}")
print(f"合格判定個数 (Ac): {plan['acceptance_number']}")
print(f"不合格判定個数 (Re): {plan['rejection_number']}")

# OC曲線を計算
print("\n=== OC曲線データ ===")
oc_curve = sampler.calculate_oc_curve(
    n=plan['sample_size'],
    Ac=plan['acceptance_number']
)
print(oc_curve.head(10))

# ダブルサンプリング計画を生成
print("\n=== ダブルサンプリング計画 ===")
double_plan = sampler.generate_double_sampling_plan(
    lot_size=500,
    aql=1.0
)

print(f"第1サンプル: n1={double_plan['first_sample_size']}, "
      f"Ac1={double_plan['first_acceptance_number']}, "
      f"Re1={double_plan['first_rejection_number']}")
print(f"第2サンプル: n2={double_plan['second_sample_size']}, "
      f"Ac2={double_plan['second_acceptance_number']}, "
      f"Re2={double_plan['second_rejection_number']}")
print(f"最大サンプル総数: {double_plan['total_max_sample_size']}")

# 推定のためのサンプルサイズ計算
print("\n=== 推定用サンプルサイズ ===")
sample_size_estimation = sampler.calculate_sample_size_for_estimation(
    confidence_level=0.95,
    margin_of_error=0.05,
    population_std=2.0,
    population_size=1000
)
print(f"必要サンプルサイズ: {sample_size_estimation}")

5.3.4 統合品質マネジメントシステムワークフロー

これまでの全コンポーネントを統合した、完全な品質マネジメントシステムのワークフローを示します。

例8: 統合品質マネジメントシステム
from datetime import datetime, timedelta
import pandas as pd
import numpy as np

class IntegratedQualityManagementSystem:
    """統合品質マネジメントシステム

    リアルタイム監視、データベース管理、機械学習予測、
    レポート生成を統合したエンドツーエンドのQMSワークフロー。
    """

    def __init__(self):
        # 各コンポーネントを初期化
        self.quality_db = QualityDatabase("integrated_qms.db")

        self.spc_monitor = RealTimeSPCMonitor(
            process_name="統合プロセス",
            target=100.0,
            ucl=106.0,
            lcl=94.0,
            email_recipients=["quality@example.com"]
        )

        self.defect_predictor = DefectPredictionSystem()

        self.dashboard = QualityDashboard()

        self.report_generator = AutoReportGenerator(
            report_period=datetime.now().strftime("%Y年%m月")
        )

        self.sampler = SamplingPlanGenerator()

        print("統合品質マネジメントシステムを初期化しました")

    def workflow_step1_data_collection(self):
        """ステップ1: データ収集"""
        print("\n" + "="*60)
        print("ステップ1: データ収集")
        print("="*60)

        # シミュレーション: 製造データ収集
        np.random.seed(42)

        for i in range(20):
            timestamp = datetime.now() - timedelta(hours=20-i)
            value = np.random.normal(100, 2.5)

            # データベースに保存
            self.quality_db.add_measurement(
                timestamp=timestamp,
                process_name="統合プロセス",
                parameter_name="主要パラメータ",
                value=value,
                target=100.0,
                ucl=106.0,
                lcl=94.0,
                operator="作業者A",
                lot_number=f"LOT-{i:03d}"
            )

            # ダッシュボードにデータ追加
            self.dashboard.add_control_chart_data(
                timestamp=timestamp,
                value=value,
                target=100.0,
                ucl=106.0,
                lcl=94.0
            )

        print(f"データ収集完了: 20レコード")

    def workflow_step2_realtime_monitoring(self):
        """ステップ2: リアルタイム監視"""
        print("\n" + "="*60)
        print("ステップ2: リアルタイム監視とアラート")
        print("="*60)

        # 測定データを取得
        df = self.quality_db.get_measurements(process_name="統合プロセス")

        # SPC監視を実行
        alert_count = 0
        for _, row in df.iterrows():
            result = self.spc_monitor.add_measurement(
                value=row['value'],
                timestamp=pd.to_datetime(row['timestamp'])
            )

            if result['status'] == 'ALERT':
                alert_count += 1

        print(f"SPC監視完了: {alert_count}件のアラート発生")

    def workflow_step3_defect_prediction(self):
        """ステップ3: 不良予測(機械学習)"""
        print("\n" + "="*60)
        print("ステップ3: 機械学習による不良予測")
        print("="*60)

        # 訓練データ生成(簡易版)
        np.random.seed(42)
        n_samples = 1000

        temperature = np.random.normal(200, 5, n_samples)
        pressure = np.random.normal(50, 3, n_samples)
        speed = np.random.normal(100, 10, n_samples)

        defect_score = (
            (temperature - 200) * 0.3 +
            (50 - pressure) * 0.4 +
            np.random.normal(0, 2, n_samples)
        )
        is_defect = (defect_score > 2).astype(int)

        train_df = pd.DataFrame({
            'temperature': temperature,
            'pressure': pressure,
            'speed': speed,
            'is_defect': is_defect
        })

        # モデル訓練
        X_train, X_test, y_train, y_test = \
            self.defect_predictor.prepare_training_data(train_df)

        self.defect_predictor.train(X_train, y_train)

        # モデル評価
        metrics = self.defect_predictor.evaluate(X_test, y_test)

        print(f"予測モデル訓練完了: ROC-AUC = {metrics['roc_auc']:.3f}")

    def workflow_step4_sampling_plan(self):
        """ステップ4: サンプリング計画生成"""
        print("\n" + "="*60)
        print("ステップ4: 自動サンプリング計画")
        print("="*60)

        plan = self.sampler.generate_single_sampling_plan(
            lot_size=500,
            aql=1.0
        )

        print(f"サンプルサイズ: {plan['sample_size']}")
        print(f"合格判定個数: {plan['acceptance_number']}")

    def workflow_step5_dashboard_generation(self):
        """ステップ5: ダッシュボード生成"""
        print("\n" + "="*60)
        print("ステップ5: インタラクティブダッシュボード生成")
        print("="*60)

        # 不良データを追加
        defect_types = ["寸法不良", "外観不良", "機能不良", "梱包不良"]
        defect_counts = [45, 28, 15, 8]

        for defect_type, count in zip(defect_types, defect_counts):
            self.dashboard.add_defect_data(defect_type, count)

        # KPIデータを追加
        kpis = [
            ("良品率", 98.5, 99.0, "%"),
            ("納期遵守率", 96.0, 95.0, "%"),
        ]

        for kpi_name, actual, target, unit in kpis:
            self.dashboard.add_kpi_data(kpi_name, actual, target, unit)

        # ダッシュボードをエクスポート
        self.dashboard.export_html("integrated_dashboard.html")

        print("ダッシュボード生成完了: integrated_dashboard.html")

    def workflow_step6_report_generation(self):
        """ステップ6: 自動レポート生成"""
        print("\n" + "="*60)
        print("ステップ6: 自動レポート生成")
        print("="*60)

        # サマリーデータを追加
        self.report_generator.add_summary_data(
            total_production=50000,
            defect_count=750,
            yield_rate=98.5,
            customer_complaints=8
        )

        # KPIデータを追加
        kpi_list = [
            {"name": "良品率", "target": 99.0, "actual": 98.5, "achievement": 99.5},
            {"name": "納期遵守率", "target": 95.0, "actual": 96.5, "achievement": 101.6}
        ]
        self.report_generator.add_kpi_data(kpi_list)

        # Excelレポート生成
        self.report_generator.generate_excel_report("integrated_report.xlsx")

        print("レポート生成完了: integrated_report.xlsx")

    def run_complete_workflow(self):
        """完全ワークフローを実行"""
        print("\n" + "#"*60)
        print("# 統合品質マネジメントシステム - 完全ワークフロー")
        print("#"*60)

        # 全ステップを順次実行
        self.workflow_step1_data_collection()
        self.workflow_step2_realtime_monitoring()
        self.workflow_step3_defect_prediction()
        self.workflow_step4_sampling_plan()
        self.workflow_step5_dashboard_generation()
        self.workflow_step6_report_generation()

        print("\n" + "="*60)
        print("ワークフロー完了")
        print("="*60)

        # メトリクス表示
        metrics = self.quality_db.get_dashboard_metrics()
        print("\n=== 最終メトリクス ===")
        for key, value in metrics.items():
            print(f"{key}: {value}")

        # データベース接続を閉じる
        self.quality_db.close()


# 使用例
qms = IntegratedQualityManagementSystem()

# 完全ワークフローを実行
qms.run_complete_workflow()

print("\n統合品質マネジメントシステムの実行が完了しました")

まとめ

この章では、Pythonを使った品質管理の自動化について、実践的な実装方法を学びました。

📚 重要ポイント

💡 実務への応用

これらのシステムを組み合わせることで、以下のような高度な品質マネジメントが実現できます: