🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 4: Quality Standards and ISO 9001

Understanding International Quality Management Standards and Practical Python Implementation

📚 Series: Introduction to Quality Management and QA ⏱️ Reading Time: 35-40 minutes 🎯 Difficulty: Intermediate

What You'll Learn in This Chapter

4.1 Overview of ISO 9001

4.1.1 What is ISO 9001

ISO 9001 is an international standard for Quality Management Systems (QMS) established by the International Organization for Standardization (ISO). It defines requirements for organizations to improve customer satisfaction and continuously enhance quality.

📊 Global Impact of ISO 9001

4.1.2 Seven Quality Management Principles

ISO 9001:2015 is based on the following seven quality management principles:

Principle Description Practice Examples
1. Customer Focus Understanding customer requirements and exceeding expectations Customer satisfaction surveys, VOC analysis
2. Leadership Top management establishes direction and environment Quality policy formulation, goal setting
3. Engagement of People Everyone demonstrates their capabilities and contributes Training and education, suggestion systems
4. Process Approach Managing activities as processes Creating flow diagrams, setting KPIs
5. Improvement Pursuing continuous improvement PDCA, Kaizen activities
6. Evidence-based Decision Making Decisions based on data and analysis Statistical analysis, dashboards
7. Relationship Management Mutually beneficial relationships with stakeholders Supplier evaluation, partnerships

4.2 ISO 9001 Requirements and Python Implementation

4.2.1 Compliance Checklist System

Implementing a checklist system to manage compliance with ISO 9001 requirements.

Example 1: ISO 9001 Compliance Checker
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import json

class ISO9001ComplianceChecker:
    """System for checking and managing compliance with ISO 9001 requirements

    Evaluates compliance with ISO 9001:2015 requirements based on 10-clause structure,
    manages evidence, and tracks improvement plans.
    """

    def __init__(self):
        # ISO 9001:2015 requirements definition
        self.requirements = {
            "4": {"title": "Context of the Organization", "subclauses": [
                "4.1 Understanding the organization and its context",
                "4.2 Understanding the needs and expectations of interested parties",
                "4.3 Determining the scope of the quality management system",
                "4.4 Quality management system and its processes"
            ]},
            "5": {"title": "Leadership", "subclauses": [
                "5.1 Leadership and commitment",
                "5.2 Policy",
                "5.3 Organizational roles, responsibilities and authorities"
            ]},
            "6": {"title": "Planning", "subclauses": [
                "6.1 Actions to address risks and opportunities",
                "6.2 Quality objectives and planning to achieve them",
                "6.3 Planning of changes"
            ]},
            "7": {"title": "Support", "subclauses": [
                "7.1 Resources",
                "7.2 Competence",
                "7.3 Awareness",
                "7.4 Communication",
                "7.5 Documented information"
            ]},
            "8": {"title": "Operation", "subclauses": [
                "8.1 Operational planning and control",
                "8.2 Requirements for products and services",
                "8.3 Design and development of products and services",
                "8.4 Control of externally provided processes, products and services",
                "8.5 Production and service provision",
                "8.6 Release of products and services",
                "8.7 Control of nonconforming outputs"
            ]},
            "9": {"title": "Performance Evaluation", "subclauses": [
                "9.1 Monitoring, measurement, analysis and evaluation",
                "9.2 Internal audit",
                "9.3 Management review"
            ]},
            "10": {"title": "Improvement", "subclauses": [
                "10.1 General",
                "10.2 Nonconformity and corrective action",
                "10.3 Continual improvement"
            ]}
        }

        self.compliance_data = []

    def add_compliance_record(self, clause: str, subclause: str,
                            status: str, evidence: str,
                            auditor: str, notes: str = "") -> Dict:
        """Add compliance record

        Args:
            clause: Clause number (e.g., "4", "5")
            subclause: Detailed requirement
            status: Compliance status ("Compliant", "Partial", "Non-Compliant", "Not Applicable")
            evidence: Evidence (document number, reference)
            auditor: Auditor name
            notes: Notes

        Returns:
            Added record
        """
        record = {
            "date": datetime.now().strftime("%Y-%m-%d"),
            "clause": clause,
            "subclause": subclause,
            "status": status,
            "evidence": evidence,
            "auditor": auditor,
            "notes": notes,
            "action_required": status in ["Partial", "Non-Compliant"]
        }

        self.compliance_data.append(record)
        return record

    def get_compliance_summary(self) -> pd.DataFrame:
        """Get compliance summary

        Returns:
            DataFrame containing compliance rate by clause
        """
        if not self.compliance_data:
            return pd.DataFrame()

        df = pd.DataFrame(self.compliance_data)

        # Aggregate by clause
        summary = df.groupby('clause').agg({
            'status': lambda x: (x == 'Compliant').sum() / len(x) * 100,
            'subclause': 'count',
            'action_required': 'sum'
        }).round(2)

        summary.columns = ['Compliance Rate (%)', 'Number of Requirements', 'Items Requiring Action']
        summary.index.name = 'Clause'

        return summary

    def get_non_compliant_items(self) -> List[Dict]:
        """Get list of non-compliant items

        Returns:
            List of non-compliant and partially compliant items
        """
        return [
            record for record in self.compliance_data
            if record['status'] in ['Partial', 'Non-Compliant']
        ]

    def export_audit_report(self, filename: str):
        """Export audit report

        Args:
            filename: Output filename (JSON format)
        """
        report = {
            "report_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "total_requirements": len(self.compliance_data),
            "summary": self.get_compliance_summary().to_dict(),
            "non_compliant_items": self.get_non_compliant_items(),
            "overall_compliance": (
                sum(1 for r in self.compliance_data if r['status'] == 'Compliant')
                / len(self.compliance_data) * 100
                if self.compliance_data else 0
            )
        }

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)

        print(f"Audit report exported to {filename}")
        print(f"Overall compliance rate: {report['overall_compliance']:.2f}%")


# Usage example
checker = ISO9001ComplianceChecker()

# Add compliance records
checker.add_compliance_record(
    clause="4",
    subclause="4.1 Understanding the organization and its context",
    status="Compliant",
    evidence="DOC-001: SWOT analysis document",
    auditor="Taro Tanaka"
)

checker.add_compliance_record(
    clause="6",
    subclause="6.1 Actions to address risks and opportunities",
    status="Partial",
    evidence="DOC-015: Risk register (needs update)",
    auditor="Hanako Sato",
    notes="Risk assessment has not been updated for over 6 months"
)

checker.add_compliance_record(
    clause="9",
    subclause="9.2 Internal audit",
    status="Non-Compliant",
    evidence="None",
    auditor="Ichiro Suzuki",
    notes="Annual internal audit has not been conducted"
)

# Display summary
print("\n=== ISO 9001 Compliance Summary ===")
print(checker.get_compliance_summary())

# Check non-compliant items
print("\n=== Items Requiring Action ===")
for item in checker.get_non_compliant_items():
    print(f"[{item['clause']}] {item['subclause']}")
    print(f"  Status: {item['status']}")
    print(f"  Notes: {item['notes']}\n")

# Export report
checker.export_audit_report("iso9001_audit_report.json")

💡 Implementation Points

4.2.2 Document Management and Version Control System

A document management system corresponding to ISO 9001 requirement 7.5 "Documented information".

Example 2: Document Control System
import hashlib
from datetime import datetime
from typing import List, Dict, Optional
import os

class DocumentControlSystem:
    """ISO 9001 compliant document management system

    Manages document version control, approval workflows, revision history,
    and expiration date management.
    """

    def __init__(self):
        self.documents = []
        self.revision_history = []

    def create_document(self, doc_id: str, title: str, doc_type: str,
                       content: str, owner: str,
                       review_period_days: int = 365) -> Dict:
        """Create new document

        Args:
            doc_id: Document number (e.g., "QP-001")
            title: Document title
            doc_type: Document type ("Procedure", "Form", "Record")
            content: Document content
            owner: Document owner
            review_period_days: Review period (days)

        Returns:
            Created document information
        """
        # Calculate document hash (for tamper detection)
        content_hash = hashlib.sha256(content.encode()).hexdigest()

        document = {
            "doc_id": doc_id,
            "title": title,
            "type": doc_type,
            "version": "1.0",
            "status": "Draft",  # Draft, Under Review, Approved, Obsolete
            "content_hash": content_hash,
            "owner": owner,
            "created_date": datetime.now(),
            "approved_date": None,
            "approved_by": None,
            "next_review_date": None,
            "review_period_days": review_period_days,
            "access_level": "Internal"  # Public, Internal, Confidential
        }

        self.documents.append(document)

        # Record in revision history
        self._add_revision_history(
            doc_id, "1.0", "Draft", "New creation", owner
        )

        return document

    def approve_document(self, doc_id: str, approver: str) -> bool:
        """Approve document

        Args:
            doc_id: Document number
            approver: Approver name

        Returns:
            Approval success status
        """
        doc = self._find_document(doc_id)
        if not doc:
            print(f"Document {doc_id} not found")
            return False

        if doc['status'] != 'Draft' and doc['status'] != 'Under Review':
            print(f"Document {doc_id} cannot be approved in current status: {doc['status']}")
            return False

        doc['status'] = 'Approved'
        doc['approved_date'] = datetime.now()
        doc['approved_by'] = approver
        doc['next_review_date'] = (
            datetime.now() +
            pd.Timedelta(days=doc['review_period_days'])
        )

        self._add_revision_history(
            doc_id, doc['version'], "Approved",
            f"Approved by {approver}", approver
        )

        print(f"Document {doc_id} v{doc['version']} approved")
        return True

    def revise_document(self, doc_id: str, new_content: str,
                       revision_note: str, revised_by: str) -> Dict:
        """Revise document (create new version)

        Args:
            doc_id: Document number
            new_content: New document content
            revision_note: Revision reason
            revised_by: Revisor

        Returns:
            Revised document information
        """
        doc = self._find_document(doc_id)
        if not doc:
            raise ValueError(f"Document {doc_id} not found")

        # Update version number
        major, minor = map(int, doc['version'].split('.'))
        doc['version'] = f"{major}.{minor + 1}"

        # Return document to Draft status
        doc['status'] = 'Draft'
        doc['content_hash'] = hashlib.sha256(new_content.encode()).hexdigest()
        doc['approved_date'] = None
        doc['approved_by'] = None

        # Record in revision history
        self._add_revision_history(
            doc_id, doc['version'], "Draft", revision_note, revised_by
        )

        print(f"Document {doc_id} revised to v{doc['version']}")
        return doc

    def get_documents_for_review(self) -> List[Dict]:
        """Get list of documents requiring review

        Returns:
            List of documents with approaching or past review dates
        """
        now = datetime.now()
        review_needed = []

        for doc in self.documents:
            if doc['status'] != 'Approved':
                continue

            if doc['next_review_date'] and doc['next_review_date'] <= now:
                days_overdue = (now - doc['next_review_date']).days
                review_needed.append({
                    **doc,
                    "days_overdue": days_overdue,
                    "priority": "High" if days_overdue > 30 else "Medium"
                })

        return sorted(review_needed, key=lambda x: x['days_overdue'], reverse=True)

    def get_revision_history(self, doc_id: str) -> pd.DataFrame:
        """Get document revision history

        Args:
            doc_id: Document number

        Returns:
            Revision history DataFrame
        """
        history = [
            h for h in self.revision_history if h['doc_id'] == doc_id
        ]

        if not history:
            return pd.DataFrame()

        return pd.DataFrame(history)

    def _find_document(self, doc_id: str) -> Optional[Dict]:
        """Search document by document ID (internal use)"""
        for doc in self.documents:
            if doc['doc_id'] == doc_id:
                return doc
        return None

    def _add_revision_history(self, doc_id: str, version: str,
                             status: str, note: str, user: str):
        """Add revision history (internal use)"""
        self.revision_history.append({
            "doc_id": doc_id,
            "version": version,
            "status": status,
            "note": note,
            "user": user,
            "timestamp": datetime.now()
        })


# Usage example
import pandas as pd

dcs = DocumentControlSystem()

# Create quality procedure
doc = dcs.create_document(
    doc_id="QP-001",
    title="Internal Audit Procedure",
    doc_type="Procedure",
    content="1. Purpose\n2. Scope\n3. Procedure...",
    owner="Quality Manager",
    review_period_days=365
)

print(f"Document created: {doc['doc_id']} - {doc['title']}")

# Approve document
dcs.approve_document("QP-001", "Factory Manager")

# Revise document
dcs.revise_document(
    doc_id="QP-001",
    new_content="1. Purpose\n2. Scope (updated)\n3. Procedure (added)...",
    revision_note="Expanded scope, detailed procedures",
    revised_by="Quality Manager"
)

# Re-approve after revision
dcs.approve_document("QP-001", "Factory Manager")

# Display revision history
print("\n=== Revision History ===")
print(dcs.get_revision_history("QP-001"))

4.2.3 Internal Audit Planning and Tracking System

An audit management system corresponding to ISO 9001 requirement 9.2 "Internal audit".

Example 3: Internal Audit Management System
from datetime import datetime, timedelta
import pandas as pd
from typing import List, Dict

class InternalAuditSystem:
    """Manages internal audit planning, execution, and follow-up

    Creates annual audit plans, records audit execution, tracks findings,
    and verifies corrective actions.
    """

    def __init__(self):
        self.audit_plan = []
        self.audit_findings = []
        self.corrective_actions = []

    def create_annual_audit_plan(self, year: int,
                                departments: List[str],
                                frequency: str = "Annual") -> List[Dict]:
        """Create annual audit plan

        Args:
            year: Target year
            departments: List of departments to audit
            frequency: Audit frequency ("Annual", "Semi-Annual", "Quarterly")

        Returns:
            List of created audit plans
        """
        freq_map = {
            "Annual": 1,
            "Semi-Annual": 2,
            "Quarterly": 4
        }

        audits_per_year = freq_map.get(frequency, 1)

        for dept in departments:
            for i in range(audits_per_year):
                # Distribute audit timing evenly
                month = (i * 12 // audits_per_year) + 1
                planned_date = datetime(year, month, 15)

                audit = {
                    "audit_id": f"IA-{year}-{dept[:3].upper()}-{i+1:02d}",
                    "department": dept,
                    "audit_type": "Internal",
                    "scope": f"Quality management system of {dept} department",
                    "planned_date": planned_date,
                    "status": "Planned",  # Planned, In Progress, Completed
                    "lead_auditor": None,
                    "audit_team": [],
                    "completion_date": None
                }

                self.audit_plan.append(audit)

        print(f"Created audit plan for fiscal year {year}: {len(self.audit_plan)} audits")
        return self.audit_plan

    def assign_auditors(self, audit_id: str, lead_auditor: str,
                       team_members: List[str]):
        """Assign audit team

        Args:
            audit_id: Audit ID
            lead_auditor: Lead auditor
            team_members: Audit team members
        """
        audit = self._find_audit(audit_id)
        if audit:
            audit['lead_auditor'] = lead_auditor
            audit['audit_team'] = team_members
            print(f"Team assigned to audit {audit_id}")

    def record_finding(self, audit_id: str, finding_type: str,
                      clause: str, description: str,
                      severity: str) -> Dict:
        """Record audit finding

        Args:
            audit_id: Audit ID
            finding_type: "Non-Conformance", "Observation", "Opportunity"
            clause: Related ISO clause
            description: Finding details
            severity: Severity ("Major", "Minor")

        Returns:
            Recorded finding
        """
        finding = {
            "finding_id": f"F-{len(self.audit_findings) + 1:04d}",
            "audit_id": audit_id,
            "type": finding_type,
            "clause": clause,
            "description": description,
            "severity": severity,
            "recorded_date": datetime.now(),
            "status": "Open",  # Open, Under Review, Closed
            "ca_required": finding_type == "Non-Conformance"
        }

        self.audit_findings.append(finding)
        return finding

    def complete_audit(self, audit_id: str):
        """Complete audit

        Args:
            audit_id: Audit ID
        """
        audit = self._find_audit(audit_id)
        if audit:
            audit['status'] = 'Completed'
            audit['completion_date'] = datetime.now()

            # Count related findings
            findings = [f for f in self.audit_findings
                       if f['audit_id'] == audit_id]

            print(f"Audit {audit_id} completed")
            print(f"  Findings: {len(findings)}")
            print(f"  Non-conformances: {sum(1 for f in findings if f['type'] == 'Non-Conformance')}")

    def get_audit_dashboard(self) -> pd.DataFrame:
        """Get audit dashboard data

        Returns:
            DataFrame showing audit plan progress
        """
        if not self.audit_plan:
            return pd.DataFrame()

        df = pd.DataFrame(self.audit_plan)

        # Aggregate by status
        status_summary = df['status'].value_counts()

        # Findings statistics
        findings_df = pd.DataFrame(self.audit_findings)
        if not findings_df.empty:
            nc_count = (findings_df['type'] == 'Non-Conformance').sum()
            open_findings = (findings_df['status'] == 'Open').sum()
        else:
            nc_count = 0
            open_findings = 0

        summary = pd.DataFrame({
            "Metric": ["Planned", "Completed", "In Progress", "Non-Conformances", "Open Findings"],
            "Value": [
                (df['status'] == 'Planned').sum(),
                (df['status'] == 'Completed').sum(),
                (df['status'] == 'In Progress').sum(),
                nc_count,
                open_findings
            ]
        })

        return summary

    def get_overdue_audits(self) -> List[Dict]:
        """Get overdue audits

        Returns:
            List of audits past their planned date
        """
        now = datetime.now()
        overdue = []

        for audit in self.audit_plan:
            if (audit['status'] == 'Planned' and
                audit['planned_date'] < now):
                days_overdue = (now - audit['planned_date']).days
                overdue.append({
                    **audit,
                    "days_overdue": days_overdue
                })

        return sorted(overdue, key=lambda x: x['days_overdue'], reverse=True)

    def _find_audit(self, audit_id: str) -> Optional[Dict]:
        """Search audit by audit ID (internal use)"""
        for audit in self.audit_plan:
            if audit['audit_id'] == audit_id:
                return audit
        return None


# Usage example
ias = InternalAuditSystem()

# Create annual audit plan
departments = ["Manufacturing", "Quality Control", "Purchasing", "Design", "Sales"]
ias.create_annual_audit_plan(2025, departments, frequency="Annual")

# Assign auditors
ias.assign_auditors(
    "IA-2025-MAN-01",
    lead_auditor="Taro Tanaka (Lead Auditor)",
    team_members=["Hanako Sato", "Ichiro Suzuki"]
)

# Record findings
finding1 = ias.record_finding(
    audit_id="IA-2025-MAN-01",
    finding_type="Non-Conformance",
    clause="8.5 Production and service provision",
    description="Work procedures not updated to latest version (still 3 years old)",
    severity="Major"
)

finding2 = ias.record_finding(
    audit_id="IA-2025-MAN-01",
    finding_type="Observation",
    clause="7.2 Competence",
    description="Training records partially incomplete",
    severity="Minor"
)

print(f"\nFindings recorded: {finding1['finding_id']}, {finding2['finding_id']}")

# Complete audit
ias.complete_audit("IA-2025-MAN-01")

# Display dashboard
print("\n=== Audit Dashboard ===")
print(ias.get_audit_dashboard())

4.2.4 Nonconformance Management and CAPA System

A system corresponding to ISO 9001 requirement 10.2 "Nonconformity and corrective action".

Example 4: CAPA Management System
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import pandas as pd

class CAPASystem:
    """Corrective Action and Preventive Action (CAPA) management system

    Manages the lifecycle of nonconformance recording, root cause analysis,
    corrective action planning and implementation, and effectiveness verification.
    """

    def __init__(self):
        self.capas = []
        self.root_cause_analyses = []

    def create_capa(self, source: str, description: str,
                   category: str, severity: str,
                   reported_by: str) -> Dict:
        """Create CAPA (Corrective Action/Preventive Action)

        Args:
            source: Source ("Internal Audit", "Customer Complaint",
                            "Process Monitoring", "Supplier Issue")
            description: Nonconformance details
            category: Classification ("Product", "Process", "Documentation", "Training")
            severity: Severity ("Critical", "Major", "Minor")
            reported_by: Reporter

        Returns:
            Created CAPA information
        """
        capa_id = f"CAPA-{datetime.now().strftime('%Y%m%d')}-{len(self.capas) + 1:03d}"

        # Set response deadline based on severity
        due_days = {"Critical": 7, "Major": 30, "Minor": 60}

        capa = {
            "capa_id": capa_id,
            "source": source,
            "description": description,
            "category": category,
            "severity": severity,
            "reported_by": reported_by,
            "reported_date": datetime.now(),
            "status": "Open",  # Open, Investigation, Action, Verification, Closed
            "assigned_to": None,
            "due_date": datetime.now() + timedelta(days=due_days[severity]),
            "root_cause": None,
            "corrective_action": None,
            "preventive_action": None,
            "effectiveness_verified": False,
            "closed_date": None
        }

        self.capas.append(capa)
        print(f"CAPA created: {capa_id} (due date: {capa['due_date'].strftime('%Y-%m-%d')})")
        return capa

    def assign_capa(self, capa_id: str, assignee: str):
        """Assign CAPA to responsible person

        Args:
            capa_id: CAPA number
            assignee: Assignee name
        """
        capa = self._find_capa(capa_id)
        if capa:
            capa['assigned_to'] = assignee
            capa['status'] = 'Investigation'
            print(f"CAPA {capa_id} assigned to {assignee}")

    def record_root_cause_analysis(self, capa_id: str,
                                   method: str, findings: str,
                                   root_cause: str, analyst: str) -> Dict:
        """Record root cause analysis

        Args:
            capa_id: CAPA number
            method: Analysis method ("5 Whys", "Fishbone", "FTA", "FMEA")
            findings: Analysis results
            root_cause: Identified root cause
            analyst: Analyst

        Returns:
            Recorded analysis information
        """
        analysis = {
            "capa_id": capa_id,
            "method": method,
            "findings": findings,
            "root_cause": root_cause,
            "analyst": analyst,
            "analysis_date": datetime.now()
        }

        self.root_cause_analyses.append(analysis)

        # Record root cause in CAPA
        capa = self._find_capa(capa_id)
        if capa:
            capa['root_cause'] = root_cause

        return analysis

    def define_corrective_action(self, capa_id: str,
                                corrective_action: str,
                                preventive_action: str = ""):
        """Define corrective and preventive actions

        Args:
            capa_id: CAPA number
            corrective_action: Corrective action (addressing the problem)
            preventive_action: Preventive action (preventing recurrence)
        """
        capa = self._find_capa(capa_id)
        if not capa:
            print(f"CAPA {capa_id} not found")
            return

        capa['corrective_action'] = corrective_action
        capa['preventive_action'] = preventive_action
        capa['status'] = 'Action'

        print(f"Actions defined for CAPA {capa_id}")

    def verify_effectiveness(self, capa_id: str,
                           verified_by: str,
                           is_effective: bool,
                           verification_note: str):
        """Verify effectiveness of corrective action

        Args:
            capa_id: CAPA number
            verified_by: Verifier
            is_effective: Effectiveness verification result
            verification_note: Verification notes
        """
        capa = self._find_capa(capa_id)
        if not capa:
            print(f"CAPA {capa_id} not found")
            return

        capa['effectiveness_verified'] = is_effective
        capa['status'] = 'Closed' if is_effective else 'Action'

        if is_effective:
            capa['closed_date'] = datetime.now()
            print(f"CAPA {capa_id} completed (effectiveness verified)")
        else:
            print(f"Effectiveness of CAPA {capa_id} is insufficient. Additional actions required")
            print(f"  Reason: {verification_note}")

    def get_open_capas(self) -> pd.DataFrame:
        """Get list of open CAPAs

        Returns:
            DataFrame of open CAPAs
        """
        open_capas = [c for c in self.capas if c['status'] != 'Closed']

        if not open_capas:
            return pd.DataFrame()

        df = pd.DataFrame(open_capas)
        df['days_open'] = df['reported_date'].apply(
            lambda x: (datetime.now() - x).days
        )
        df['overdue'] = df['due_date'] < datetime.now()

        return df[['capa_id', 'severity', 'category', 'status',
                  'assigned_to', 'due_date', 'days_open', 'overdue']]

    def get_capa_metrics(self) -> Dict:
        """Get CAPA metrics

        Returns:
            Key CAPA metrics
        """
        if not self.capas:
            return {}

        df = pd.DataFrame(self.capas)

        # Average resolution days (closed only)
        closed_capas = df[df['status'] == 'Closed'].copy()
        if not closed_capas.empty:
            closed_capas['resolution_days'] = (
                closed_capas['closed_date'] - closed_capas['reported_date']
            ).dt.days
            avg_resolution_days = closed_capas['resolution_days'].mean()
        else:
            avg_resolution_days = 0

        # Overdue rate
        now = datetime.now()
        overdue_count = sum(1 for c in self.capas
                           if c['status'] != 'Closed' and c['due_date'] < now)

        metrics = {
            "Total CAPAs": len(self.capas),
            "Open": (df['status'] != 'Closed').sum(),
            "Closed": (df['status'] == 'Closed').sum(),
            "Overdue": overdue_count,
            "Average Resolution Days": round(avg_resolution_days, 1),
            "Closure Rate (%)": round((df['status'] == 'Closed').sum() / len(self.capas) * 100, 1)
        }

        return metrics

    def _find_capa(self, capa_id: str) -> Optional[Dict]:
        """Search CAPA by CAPA ID (internal use)"""
        for capa in self.capas:
            if capa['capa_id'] == capa_id:
                return capa
        return None


# Usage example
capa_system = CAPASystem()

# Create CAPA
capa1 = capa_system.create_capa(
    source="Internal Audit",
    description="Non-compliance with work procedures found in production line A. Step 3 of procedure is being skipped.",
    category="Process",
    severity="Major",
    reported_by="Auditor Tanaka"
)

# Assign to responsible person
capa_system.assign_capa(capa1['capa_id'], "Manufacturing Manager Sato")

# Conduct root cause analysis
capa_system.record_root_cause_analysis(
    capa_id=capa1['capa_id'],
    method="5 Whys",
    findings="""
    Why was the procedure skipped? → Because it takes time
    Why does it take time? → Equipment setup requires time
    Why does setup take time? → Jig preparation is complicated
    Why is jig preparation complicated? → Not well organized
    Why not well organized? → 5S activities have become formalized
    """,
    root_cause="Formalization of 5S activities led to inadequate jig management. Operators chose to skip procedures.",
    analyst="Quality Control Dept. Suzuki"
)

# Define corrective and preventive actions
capa_system.define_corrective_action(
    capa_id=capa1['capa_id'],
    corrective_action="Immediately organize jigs. Re-train on procedure compliance.",
    preventive_action="Revitalize 5S activities. Implement monthly audits. Introduce jig management system."
)

# Verify effectiveness
capa_system.verify_effectiveness(
    capa_id=capa1['capa_id'],
    verified_by="Quality Manager",
    is_effective=True,
    verification_note="Re-audit conducted after 1 month. Confirmed 100% procedure compliance. Jig management also improved."
)

# Display metrics
print("\n=== CAPA Metrics ===")
metrics = capa_system.get_capa_metrics()
for key, value in metrics.items():
    print(f"{key}: {value}")

4.3 Risk-Based Thinking and FMEA

4.3.1 Implementation of FMEA (Failure Mode and Effects Analysis)

ISO 9001:2015 clause 6.1 "Actions to address risks and opportunities" requires risk-based thinking. FMEA (Failure Mode and Effects Analysis) is a method to identify and prioritize potential failure modes in processes or products.

Example 5: FMEA Implementation System
import pandas as pd
from typing import List, Dict
import numpy as np

class FMEASystem:
    """FMEA (Failure Mode and Effects Analysis) implementation system

    Conducts Process FMEA or Design FMEA and prioritizes risks
    based on RPN (Risk Priority Number).
    """

    def __init__(self, fmea_type: str = "Process"):
        """
        Args:
            fmea_type: FMEA type ("Process" or "Design")
        """
        self.fmea_type = fmea_type
        self.failure_modes = []

    def add_failure_mode(self, process_step: str,
                        potential_failure: str,
                        effects: str, severity: int,
                        causes: str, occurrence: int,
                        current_controls: str, detection: int,
                        responsible_person: str = "") -> Dict:
        """Add failure mode

        Args:
            process_step: Process step/function
            potential_failure: Potential failure mode
            effects: Effects of failure
            severity: Severity (1-10, 10 is most severe)
            causes: Causes of failure
            occurrence: Occurrence frequency (1-10, 10 is most frequent)
            current_controls: Current control methods
            detection: Detection rating (1-10, 10 is hardest to detect)
            responsible_person: Responsible person

        Returns:
            Added failure mode information
        """
        # Calculate RPN (Risk Priority Number)
        rpn = severity * occurrence * detection

        failure_mode = {
            "process_step": process_step,
            "potential_failure": potential_failure,
            "effects": effects,
            "severity": severity,
            "causes": causes,
            "occurrence": occurrence,
            "current_controls": current_controls,
            "detection": detection,
            "rpn": rpn,
            "responsible_person": responsible_person,
            "recommended_actions": "",
            "actions_taken": "",
            "new_severity": None,
            "new_occurrence": None,
            "new_detection": None,
            "new_rpn": None
        }

        self.failure_modes.append(failure_mode)
        return failure_mode

    def prioritize_risks(self, threshold: int = 100) -> pd.DataFrame:
        """Prioritize risks

        Args:
            threshold: RPN threshold requiring action

        Returns:
            DataFrame sorted by RPN
        """
        if not self.failure_modes:
            return pd.DataFrame()

        df = pd.DataFrame(self.failure_modes)

        # Sort by RPN
        df = df.sort_values('rpn', ascending=False)

        # Classify risk level
        df['risk_level'] = df['rpn'].apply(lambda x:
            'High' if x >= 200 else
            'Medium' if x >= 100 else
            'Low'
        )

        # Flag items requiring action
        df['action_required'] = df['rpn'] >= threshold

        return df

    def recommend_actions(self, index: int, actions: str):
        """Record recommended actions

        Args:
            index: Failure mode index
            actions: Recommended actions content
        """
        if 0 <= index < len(self.failure_modes):
            self.failure_modes[index]['recommended_actions'] = actions

    def record_actions_taken(self, index: int, actions: str,
                           new_severity: int, new_occurrence: int,
                           new_detection: int):
        """Record actions taken and post-improvement evaluation

        Args:
            index: Failure mode index
            actions: Actions taken
            new_severity: Post-improvement severity
            new_occurrence: Post-improvement occurrence
            new_detection: Post-improvement detection
        """
        if 0 <= index < len(self.failure_modes):
            fm = self.failure_modes[index]
            fm['actions_taken'] = actions
            fm['new_severity'] = new_severity
            fm['new_occurrence'] = new_occurrence
            fm['new_detection'] = new_detection
            fm['new_rpn'] = new_severity * new_occurrence * new_detection

            print(f"RPN improvement: {fm['rpn']} → {fm['new_rpn']} "
                  f"({(1 - fm['new_rpn']/fm['rpn'])*100:.1f}% reduction)")

    def generate_fmea_report(self, filename: str):
        """Generate FMEA report

        Args:
            filename: Output filename (Excel format)
        """
        df = self.prioritize_risks()

        # Excel output
        with pd.ExcelWriter(filename, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name='FMEA', index=False)

            # Create summary sheet
            summary = pd.DataFrame({
                'Item': [
                    'Total Failure Modes',
                    'High Risk Items (RPN≥200)',
                    'Medium Risk Items (100≤RPN<200)',
                    'Low Risk Items (RPN<100)',
                    'Average RPN',
                    'Maximum RPN'
                ],
                'Value': [
                    len(df),
                    (df['rpn'] >= 200).sum(),
                    ((df['rpn'] >= 100) & (df['rpn'] < 200)).sum(),
                    (df['rpn'] < 100).sum(),
                    round(df['rpn'].mean(), 1),
                    df['rpn'].max()
                ]
            })

            summary.to_excel(writer, sheet_name='Summary', index=False)

        print(f"FMEA report exported to {filename}")


# Usage example
fmea = FMEASystem(fmea_type="Process")

# Add failure modes (manufacturing process example)
fmea.add_failure_mode(
    process_step="Raw material receiving inspection",
    potential_failure="Acceptance of non-conforming raw materials",
    effects="Production of defective products, customer complaints, recall",
    severity=9,  # Very severe
    causes="Unclear inspection criteria, insufficient inspector competence",
    occurrence=4,  # Occurs occasionally
    current_controls="Sampling inspection",
    detection=5,  # Somewhat difficult to detect
    responsible_person="Quality Control Dept."
)

fmea.add_failure_mode(
    process_step="Molding process",
    potential_failure="Dimensional defects",
    effects="Assembly defects, performance degradation",
    severity=7,
    causes="Mold wear, poor temperature control",
    occurrence=6,
    current_controls="First article inspection, periodic measurement",
    detection=3,  # Relatively easy to detect
    responsible_person="Manufacturing Dept."
)

fmea.add_failure_mode(
    process_step="Final inspection",
    potential_failure="Inspection oversight",
    effects="Shipment of defective products",
    severity=10,  # Most severe
    causes="Too many inspection items, time pressure",
    occurrence=2,  # Rare
    current_controls="Double check",
    detection=7,  # Difficult to detect
    responsible_person="Quality Assurance Dept."
)

# Prioritize risks
print("=== FMEA Risk Analysis ===")
risk_df = fmea.prioritize_risks(threshold=100)
print(risk_df[['process_step', 'potential_failure', 'rpn', 'risk_level']])

# Record recommended actions (for highest RPN item)
fmea.recommend_actions(
    index=2,  # Final inspection oversight
    actions="Introduce automated inspection equipment, simplify inspection procedures, ensure traceability with checklists"
)

# Post-action evaluation
fmea.record_actions_taken(
    index=2,
    actions="Introduced automated visual inspection equipment. Classified inspection items by importance.",
    new_severity=10,  # Severity unchanged
    new_occurrence=1,  # Occurrence greatly reduced
    new_detection=2    # Detection improved (through automation)
)

# Generate report
fmea.generate_fmea_report("fmea_report.xlsx")

📊 RPN Evaluation Criteria

RPN Range Risk Level Response
200-1000 High Risk Immediate action required
100-199 Medium Risk Implement planned actions
1-99 Low Risk Continue monitoring

4.3.2 Supplier Quality Evaluation System

Corresponds to ISO 9001 requirement 8.4 "Control of externally provided processes, products and services".

Example 6: Supplier Quality Management System
import pandas as pd
from datetime import datetime
from typing import List, Dict
import numpy as np

class SupplierQualitySystem:
    """Supplier quality management system

    Manages supplier evaluation, quality performance tracking,
    and improvement activities.
    """

    def __init__(self):
        self.suppliers = []
        self.quality_records = []

    def register_supplier(self, supplier_id: str, name: str,
                         product_category: str,
                         criticality: str = "Medium") -> Dict:
        """Register supplier

        Args:
            supplier_id: Supplier ID
            name: Supplier name
            product_category: Product category supplied
            criticality: Criticality level ("Critical", "High", "Medium", "Low")

        Returns:
            Registered supplier information
        """
        supplier = {
            "supplier_id": supplier_id,
            "name": name,
            "product_category": product_category,
            "criticality": criticality,
            "registration_date": datetime.now(),
            "status": "Approved",  # Approved, Conditional, Suspended
            "quality_score": None,
            "delivery_score": None,
            "overall_rating": None
        }

        self.suppliers.append(supplier)
        print(f"Supplier registered: {name} ({supplier_id})")
        return supplier

    def record_quality_data(self, supplier_id: str,
                           delivery_date: datetime,
                           lot_number: str,
                           quantity_received: int,
                           quantity_accepted: int,
                           defect_count: int,
                           defect_types: str = "") -> Dict:
        """Record quality data

        Args:
            supplier_id: Supplier ID
            delivery_date: Delivery date
            lot_number: Lot number
            quantity_received: Quantity received
            quantity_accepted: Quantity accepted
            defect_count: Defect count
            defect_types: Defect types

        Returns:
            Recorded quality data
        """
        # Calculate PPM (Parts Per Million)
        ppm = (defect_count / quantity_received * 1_000_000) if quantity_received > 0 else 0

        record = {
            "supplier_id": supplier_id,
            "delivery_date": delivery_date,
            "lot_number": lot_number,
            "quantity_received": quantity_received,
            "quantity_accepted": quantity_accepted,
            "defect_count": defect_count,
            "defect_types": defect_types,
            "ppm": ppm,
            "acceptance_rate": (quantity_accepted / quantity_received * 100)
                               if quantity_received > 0 else 0
        }

        self.quality_records.append(record)
        return record

    def calculate_supplier_rating(self, supplier_id: str,
                                 months: int = 6) -> Dict:
        """Calculate supplier rating

        Args:
            supplier_id: Supplier ID
            months: Evaluation period (months)

        Returns:
            Evaluation results
        """
        # Get quality records within period
        cutoff_date = datetime.now() - pd.Timedelta(days=months * 30)
        records = [
            r for r in self.quality_records
            if r['supplier_id'] == supplier_id and r['delivery_date'] >= cutoff_date
        ]

        if not records:
            return {"error": "No data available for evaluation period"}

        df = pd.DataFrame(records)

        # Quality score (0-100)
        avg_ppm = df['ppm'].mean()
        quality_score = max(0, 100 - (avg_ppm / 100))  # 10,000 PPM = 0 points

        # Overall rating (A-D)
        if quality_score >= 95:
            overall_rating = "A"
        elif quality_score >= 85:
            overall_rating = "B"
        elif quality_score >= 70:
            overall_rating = "C"
        else:
            overall_rating = "D"

        rating = {
            "supplier_id": supplier_id,
            "evaluation_period": f"{months} months",
            "total_deliveries": len(records),
            "total_quantity": df['quantity_received'].sum(),
            "total_defects": df['defect_count'].sum(),
            "avg_ppm": round(avg_ppm, 2),
            "quality_score": round(quality_score, 2),
            "overall_rating": overall_rating
        }

        # Update supplier information
        supplier = self._find_supplier(supplier_id)
        if supplier:
            supplier['quality_score'] = quality_score
            supplier['overall_rating'] = overall_rating

        return rating

    def get_supplier_performance_summary(self) -> pd.DataFrame:
        """Get performance summary for all suppliers

        Returns:
            DataFrame of performance by supplier
        """
        if not self.suppliers:
            return pd.DataFrame()

        # Calculate rating for each supplier
        for supplier in self.suppliers:
            self.calculate_supplier_rating(supplier['supplier_id'])

        df = pd.DataFrame(self.suppliers)

        # Sort by criticality
        criticality_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3}
        df['criticality_rank'] = df['criticality'].map(criticality_order)
        df = df.sort_values(['criticality_rank', 'quality_score'], ascending=[True, False])

        return df[['supplier_id', 'name', 'criticality',
                  'quality_score', 'overall_rating', 'status']]

    def identify_improvement_targets(self, threshold_rating: str = "C") -> List[Dict]:
        """Identify suppliers requiring improvement

        Args:
            threshold_rating: Rating threshold for improvement targets

        Returns:
            List of suppliers requiring improvement
        """
        rating_map = {"A": 4, "B": 3, "C": 2, "D": 1}
        threshold_value = rating_map.get(threshold_rating, 2)

        targets = []
        for supplier in self.suppliers:
            if supplier['overall_rating']:
                rating_value = rating_map.get(supplier['overall_rating'], 0)
                if rating_value <= threshold_value:
                    targets.append({
                        "supplier_id": supplier['supplier_id'],
                        "name": supplier['name'],
                        "rating": supplier['overall_rating'],
                        "quality_score": supplier['quality_score'],
                        "action": "Quality improvement plan development and implementation required"
                    })

        return targets

    def _find_supplier(self, supplier_id: str) -> Optional[Dict]:
        """Search supplier by supplier ID (internal use)"""
        for supplier in self.suppliers:
            if supplier['supplier_id'] == supplier_id:
                return supplier
        return None


# Usage example
sqs = SupplierQualitySystem()

# Register suppliers
sqs.register_supplier("SUP-001", "ABC Parts Industry", "Electronic Components", criticality="Critical")
sqs.register_supplier("SUP-002", "XYZ Resin", "Resin Materials", criticality="High")
sqs.register_supplier("SUP-003", "123 Packaging Materials", "Packaging Materials", criticality="Low")

# Record quality data
sqs.record_quality_data(
    supplier_id="SUP-001",
    delivery_date=datetime(2025, 1, 15),
    lot_number="LOT-20250115-001",
    quantity_received=10000,
    quantity_accepted=9950,
    defect_count=50,
    defect_types="Dimensional defects"
)

sqs.record_quality_data(
    supplier_id="SUP-001",
    delivery_date=datetime(2025, 2, 20),
    lot_number="LOT-20250220-002",
    quantity_received=15000,
    quantity_accepted=14800,
    defect_count=200,
    defect_types="Visual defects, dimensional defects"
)

sqs.record_quality_data(
    supplier_id="SUP-002",
    delivery_date=datetime(2025, 1, 10),
    lot_number="LOT-20250110-001",
    quantity_received=5000,
    quantity_accepted=4990,
    defect_count=10,
    defect_types="Color unevenness"
)

# Supplier evaluation
print("=== Supplier Evaluation ===")
rating = sqs.calculate_supplier_rating("SUP-001", months=6)
for key, value in rating.items():
    print(f"{key}: {value}")

# Performance summary
print("\n=== Supplier Performance Summary ===")
print(sqs.get_supplier_performance_summary())

# Identify improvement targets
print("\n=== Suppliers Requiring Improvement ===")
targets = sqs.identify_improvement_targets(threshold_rating="B")
for target in targets:
    print(f"{target['name']} - Rating: {target['rating']}, Score: {target['quality_score']}")
    print(f"  → {target['action']}")

4.3.3 Quality Objective KPI Tracking System

Corresponds to ISO 9001 requirement 6.2 "Quality objectives and planning to achieve them".

Example 7: Quality Objective KPI Management System
import pandas as pd
from datetime import datetime
from typing import List, Dict, Optional
import matplotlib.pyplot as plt

class QualityKPISystem:
    """Quality objectives and KPI management system

    Manages quality objective setting, KPI tracking, and achievement visualization.
    """

    def __init__(self, fiscal_year: int):
        self.fiscal_year = fiscal_year
        self.objectives = []
        self.kpi_data = []

    def define_objective(self, objective_id: str, title: str,
                        kpi_name: str, target_value: float,
                        unit: str, measurement_frequency: str,
                        responsible_dept: str) -> Dict:
        """Define quality objective

        Args:
            objective_id: Objective ID
            title: Objective title
            kpi_name: KPI name
            target_value: Target value
            unit: Unit
            measurement_frequency: Measurement frequency ("Daily", "Weekly", "Monthly")
            responsible_dept: Responsible department

        Returns:
            Defined objective information
        """
        objective = {
            "objective_id": objective_id,
            "title": title,
            "kpi_name": kpi_name,
            "target_value": target_value,
            "unit": unit,
            "measurement_frequency": measurement_frequency,
            "responsible_dept": responsible_dept,
            "fiscal_year": self.fiscal_year,
            "status": "Active"  # Active, Achieved, At Risk, Not Achieved
        }

        self.objectives.append(objective)
        print(f"Quality objective set: {title} (target value: {target_value}{unit})")
        return objective

    def record_kpi_data(self, objective_id: str,
                       measurement_date: datetime,
                       actual_value: float,
                       notes: str = "") -> Dict:
        """Record KPI actual data

        Args:
            objective_id: Objective ID
            measurement_date: Measurement date
            actual_value: Actual value
            notes: Notes

        Returns:
            Recorded KPI data
        """
        objective = self._find_objective(objective_id)
        if not objective:
            raise ValueError(f"Objective {objective_id} not found")

        # Calculate achievement rate
        achievement_rate = (actual_value / objective['target_value'] * 100)

        kpi_record = {
            "objective_id": objective_id,
            "measurement_date": measurement_date,
            "actual_value": actual_value,
            "target_value": objective['target_value'],
            "achievement_rate": achievement_rate,
            "notes": notes
        }

        self.kpi_data.append(kpi_record)
        return kpi_record

    def get_kpi_trend(self, objective_id: str) -> pd.DataFrame:
        """Get KPI trend data

        Args:
            objective_id: Objective ID

        Returns:
            Trend data DataFrame
        """
        records = [
            r for r in self.kpi_data if r['objective_id'] == objective_id
        ]

        if not records:
            return pd.DataFrame()

        df = pd.DataFrame(records)
        df = df.sort_values('measurement_date')

        return df

    def evaluate_objective_status(self, objective_id: str) -> str:
        """Evaluate objective achievement status

        Args:
            objective_id: Objective ID

        Returns:
            Status ("Achieved", "On Track", "At Risk", "Not Achieved")
        """
        df = self.get_kpi_trend(objective_id)

        if df.empty:
            return "No Data"

        # Evaluate average achievement rate of last 3 measurements
        recent_achievement = df.tail(3)['achievement_rate'].mean()

        if recent_achievement >= 100:
            status = "Achieved"
        elif recent_achievement >= 90:
            status = "On Track"
        elif recent_achievement >= 70:
            status = "At Risk"
        else:
            status = "Not Achieved"

        # Update objective information
        objective = self._find_objective(objective_id)
        if objective:
            objective['status'] = status

        return status

    def get_objectives_dashboard(self) -> pd.DataFrame:
        """Get quality objectives dashboard

        Returns:
            DataFrame showing achievement status of all objectives
        """
        if not self.objectives:
            return pd.DataFrame()

        # Evaluate status of each objective
        for obj in self.objectives:
            self.evaluate_objective_status(obj['objective_id'])

        df = pd.DataFrame(self.objectives)

        # Add latest actual value
        for idx, obj in df.iterrows():
            trend_df = self.get_kpi_trend(obj['objective_id'])
            if not trend_df.empty:
                latest = trend_df.iloc[-1]
                df.at[idx, 'latest_value'] = latest['actual_value']
                df.at[idx, 'latest_achievement_rate'] = latest['achievement_rate']
            else:
                df.at[idx, 'latest_value'] = None
                df.at[idx, 'latest_achievement_rate'] = None

        return df[['objective_id', 'title', 'target_value', 'unit',
                  'latest_value', 'latest_achievement_rate', 'status',
                  'responsible_dept']]

    def generate_kpi_report(self, objective_id: str, filename: str):
        """Generate KPI report (with trend graph)

        Args:
            objective_id: Objective ID
            filename: Output filename (PNG format)
        """
        df = self.get_kpi_trend(objective_id)
        objective = self._find_objective(objective_id)

        if df.empty or not objective:
            print("Insufficient data")
            return

        # Create graph
        fig, ax = plt.subplots(figsize=(10, 6))

        # Plot actual values
        ax.plot(df['measurement_date'], df['actual_value'],
               marker='o', label='Actual Value', linewidth=2)

        # Add target value line
        ax.axhline(y=objective['target_value'], color='r',
                  linestyle='--', label='Target Value')

        ax.set_xlabel('Measurement Date')
        ax.set_ylabel(f"{objective['kpi_name']} ({objective['unit']})")
        ax.set_title(f"{objective['title']} - KPI Trend")
        ax.legend()
        ax.grid(True, alpha=0.3)

        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig(filename, dpi=300)
        plt.close()

        print(f"KPI report exported to {filename}")

    def _find_objective(self, objective_id: str) -> Optional[Dict]:
        """Search objective by objective ID (internal use)"""
        for obj in self.objectives:
            if obj['objective_id'] == objective_id:
                return obj
        return None


# Usage example
kpi_system = QualityKPISystem(fiscal_year=2025)

# Define quality objectives
kpi_system.define_objective(
    objective_id="OBJ-2025-001",
    title="Reduce Customer Complaints",
    kpi_name="Customer Complaint Count",
    target_value=10,
    unit="complaints/month",
    measurement_frequency="Monthly",
    responsible_dept="Quality Assurance Dept."
)

kpi_system.define_objective(
    objective_id="OBJ-2025-002",
    title="Reduce In-Process Defect Rate",
    kpi_name="In-Process Defect Rate",
    target_value=0.5,
    unit="%",
    measurement_frequency="Monthly",
    responsible_dept="Manufacturing Dept."
)

# Record KPI actual values
for month in range(1, 7):
    kpi_system.record_kpi_data(
        objective_id="OBJ-2025-001",
        measurement_date=datetime(2025, month, 1),
        actual_value=15 - month,  # Improvement trend
        notes=f"Month {month} actual"
    )

    kpi_system.record_kpi_data(
        objective_id="OBJ-2025-002",
        measurement_date=datetime(2025, month, 1),
        actual_value=1.0 - (month * 0.08),  # Improvement trend
        notes=f"Month {month} actual"
    )

# Display dashboard
print("=== Quality Objectives Dashboard ===")
print(kpi_system.get_objectives_dashboard())

# Status evaluation
print("\n=== Achievement Status Evaluation ===")
for obj_id in ["OBJ-2025-001", "OBJ-2025-002"]:
    status = kpi_system.evaluate_objective_status(obj_id)
    print(f"{obj_id}: {status}")

4.3.4 Management Review Report Generation

A report generation system corresponding to ISO 9001 requirement 9.3 "Management review".

Example 8: Management Review Report Generation System
from datetime import datetime
import pandas as pd
from typing import Dict, List
import json

class ManagementReviewSystem:
    """Management review report generation system

    Generates reports for top management to review QMS performance and
    improvement opportunities based on ISO 9001 requirement 9.3.
    """

    def __init__(self, review_date: datetime, review_period: str):
        """
        Args:
            review_date: Review execution date
            review_period: Review period (e.g., "FY2024 Second Half")
        """
        self.review_date = review_date
        self.review_period = review_period
        self.review_data = {
            "basic_info": {
                "review_date": review_date.strftime("%Y-%m-%d"),
                "review_period": review_period,
                "attendees": []
            },
            "inputs": {},
            "decisions": [],
            "action_items": []
        }

    def add_attendee(self, name: str, position: str):
        """Add attendee

        Args:
            name: Name
            position: Position
        """
        self.review_data["basic_info"]["attendees"].append({
            "name": name,
            "position": position
        })

    def add_customer_feedback(self, satisfaction_score: float,
                             complaint_count: int,
                             positive_feedback: str,
                             improvement_areas: str):
        """Add customer feedback (ISO 9001 9.3.2 a)

        Args:
            satisfaction_score: Customer satisfaction score (0-100)
            complaint_count: Complaint count
            positive_feedback: Positive feedback
            improvement_areas: Areas requiring improvement
        """
        self.review_data["inputs"]["customer_feedback"] = {
            "satisfaction_score": satisfaction_score,
            "complaint_count": complaint_count,
            "positive_feedback": positive_feedback,
            "improvement_areas": improvement_areas
        }

    def add_conformity_status(self, audit_results: str,
                             nc_count: int,
                             capa_completion_rate: float):
        """Add conformity and performance status (ISO 9001 9.3.2 b, c)

        Args:
            audit_results: Audit results summary
            nc_count: Non-conformance count
            capa_completion_rate: CAPA completion rate (%)
        """
        self.review_data["inputs"]["conformity_status"] = {
            "audit_results": audit_results,
            "nc_count": nc_count,
            "capa_completion_rate": capa_completion_rate
        }

    def add_process_performance(self, kpi_summary: Dict[str, float],
                               process_efficiency: float):
        """Add process performance (ISO 9001 9.3.2 d)

        Args:
            kpi_summary: KPI summary (KPI name: achievement rate)
            process_efficiency: Process efficiency (%)
        """
        self.review_data["inputs"]["process_performance"] = {
            "kpi_summary": kpi_summary,
            "process_efficiency": process_efficiency
        }

    def add_resource_adequacy(self, resource_status: str,
                             training_completion: float,
                             infrastructure_issues: str):
        """Add resource adequacy (ISO 9001 9.3.2 e)

        Args:
            resource_status: Resource status evaluation
            training_completion: Training completion rate (%)
            infrastructure_issues: Infrastructure issues
        """
        self.review_data["inputs"]["resource_adequacy"] = {
            "resource_status": resource_status,
            "training_completion": training_completion,
            "infrastructure_issues": infrastructure_issues
        }

    def add_improvement_actions(self, actions_planned: int,
                               actions_completed: int,
                               effectiveness_rate: float):
        """Add improvement action effectiveness (ISO 9001 9.3.2 f)

        Args:
            actions_planned: Number of planned improvement actions
            actions_completed: Number of completed improvement actions
            effectiveness_rate: Effectiveness verification rate (%)
        """
        self.review_data["inputs"]["improvement_actions"] = {
            "actions_planned": actions_planned,
            "actions_completed": actions_completed,
            "completion_rate": (actions_completed / actions_planned * 100)
                               if actions_planned > 0 else 0,
            "effectiveness_rate": effectiveness_rate
        }

    def add_changes_impact(self, internal_changes: str,
                          external_changes: str,
                          impact_assessment: str):
        """Add changes impact (ISO 9001 9.3.2 g)

        Args:
            internal_changes: Internal changes
            external_changes: External changes
            impact_assessment: Impact assessment
        """
        self.review_data["inputs"]["changes_impact"] = {
            "internal_changes": internal_changes,
            "external_changes": external_changes,
            "impact_assessment": impact_assessment
        }

    def add_decision(self, decision_type: str, decision: str,
                    responsible: str, due_date: str):
        """Add management decision (ISO 9001 9.3.3)

        Args:
            decision_type: "QMS Improvement", "Product/Service Improvement",
                          "Resource Needs"
            decision: Decision content
            responsible: Responsible person
            due_date: Due date
        """
        self.review_data["decisions"].append({
            "type": decision_type,
            "decision": decision,
            "responsible": responsible,
            "due_date": due_date
        })

    def add_action_item(self, action: str, responsible: str,
                       due_date: str, priority: str):
        """Add action item

        Args:
            action: Action content
            responsible: Responsible person
            due_date: Due date
            priority: Priority ("High", "Medium", "Low")
        """
        self.review_data["action_items"].append({
            "action": action,
            "responsible": responsible,
            "due_date": due_date,
            "priority": priority,
            "status": "Open"
        })

    def generate_report(self, filename: str):
        """Generate management review report

        Args:
            filename: Output filename (JSON format)
        """
        # Add overall assessment
        self.review_data["overall_assessment"] = self._calculate_overall_assessment()

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.review_data, f, indent=2, ensure_ascii=False)

        print(f"Management review report exported to {filename}")

        # Display summary
        self._print_summary()

    def _calculate_overall_assessment(self) -> Dict:
        """Calculate overall assessment (internal use)"""
        inputs = self.review_data["inputs"]

        # Score calculation
        scores = []

        if "customer_feedback" in inputs:
            scores.append(inputs["customer_feedback"]["satisfaction_score"])

        if "conformity_status" in inputs:
            scores.append(inputs["conformity_status"]["capa_completion_rate"])

        if "process_performance" in inputs:
            scores.append(inputs["process_performance"]["process_efficiency"])

        if "resource_adequacy" in inputs:
            scores.append(inputs["resource_adequacy"]["training_completion"])

        if "improvement_actions" in inputs:
            scores.append(inputs["improvement_actions"]["completion_rate"])

        avg_score = sum(scores) / len(scores) if scores else 0

        # Evaluation rank
        if avg_score >= 90:
            rank = "Excellent"
        elif avg_score >= 80:
            rank = "Good"
        elif avg_score >= 70:
            rank = "Acceptable"
        else:
            rank = "Needs Improvement"

        return {
            "average_score": round(avg_score, 2),
            "rank": rank,
            "total_decisions": len(self.review_data["decisions"]),
            "total_actions": len(self.review_data["action_items"])
        }

    def _print_summary(self):
        """Display summary (internal use)"""
        assessment = self.review_data["overall_assessment"]

        print("\n" + "="*60)
        print("Management Review Summary")
        print("="*60)
        print(f"Review Period: {self.review_period}")
        print(f"Date: {self.review_date.strftime('%Y-%m-%d')}")
        print(f"Attendees: {len(self.review_data['basic_info']['attendees'])} persons")
        print(f"\nOverall Assessment: {assessment['rank']} ({assessment['average_score']} points)")
        print(f"Decisions: {assessment['total_decisions']}")
        print(f"Action Items: {assessment['total_actions']}")
        print("="*60)


# Usage example
mr_system = ManagementReviewSystem(
    review_date=datetime(2025, 3, 15),
    review_period="FY2024 Second Half"
)

# Add attendees
mr_system.add_attendee("Taro Yamada", "President & CEO")
mr_system.add_attendee("Hanako Sato", "Quality Manager")
mr_system.add_attendee("Ichiro Suzuki", "Manufacturing Manager")

# Add input information
mr_system.add_customer_feedback(
    satisfaction_score=85.5,
    complaint_count=12,
    positive_feedback="Improved delivery compliance. Stable product quality.",
    improvement_areas="Faster technical support response"
)

mr_system.add_conformity_status(
    audit_results="5 internal audits conducted. 0 Major NCs, 3 Minor NCs.",
    nc_count=3,
    capa_completion_rate=92.0
)

mr_system.add_process_performance(
    kpi_summary={
        "Customer Complaint Reduction": 95.0,
        "In-Process Defect Rate Reduction": 88.0,
        "Delivery Compliance": 98.0
    },
    process_efficiency=91.5
)

mr_system.add_resource_adequacy(
    resource_status="Generally adequate. Some equipment aging issues.",
    training_completion=87.0,
    infrastructure_issues="Calibration management system update needed for measuring equipment"
)

mr_system.add_improvement_actions(
    actions_planned=15,
    actions_completed=13,
    effectiveness_rate=85.0
)

mr_system.add_changes_impact(
    internal_changes="Introduction of new production line, digitalization of quality management system",
    external_changes="Preparation for transition to ISO 9001:2015 revised version",
    impact_assessment="Positive impact. Additional resources required."
)

# Add decisions
mr_system.add_decision(
    decision_type="QMS Improvement",
    decision="Complete full digitalization of quality management system by FY2025 Q2",
    responsible="Quality Manager",
    due_date="2025-09-30"
)

mr_system.add_decision(
    decision_type="Resource Needs",
    decision="Secure budget for measuring equipment update (5 million yen)",
    responsible="Corporate Planning Manager",
    due_date="2025-06-30"
)

# Add action items
mr_system.add_action_item(
    action="Establish process to reduce technical support response time to within 24 hours",
    responsible="Sales Manager",
    due_date="2025-05-31",
    priority="High"
)

mr_system.add_action_item(
    action="Conduct internal auditor competence improvement training",
    responsible="Quality Manager",
    due_date="2025-04-30",
    priority="Medium"
)

# Generate report
mr_system.generate_report("management_review_2024H2.json")

Summary

In this chapter, we learned about ISO 9001 Quality Management System requirements and practical implementation methods using Python.

📚 Key Points

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer