Understanding International Quality Management Standards and Practical Python Implementation
ISO 9001 is an international standard for Quality Management Systems (QMS) established by the International Organization for Standardization (ISO). It defines requirements for organizations to improve customer satisfaction and continuously enhance quality.
ISO 9001:2015 is based on the following seven quality management principles:
| Principle | Description | Practice Examples |
|---|---|---|
| 1. Customer Focus | Understanding customer requirements and exceeding expectations | Customer satisfaction surveys, VOC analysis |
| 2. Leadership | Top management establishes direction and environment | Quality policy formulation, goal setting |
| 3. Engagement of People | Everyone demonstrates their capabilities and contributes | Training and education, suggestion systems |
| 4. Process Approach | Managing activities as processes | Creating flow diagrams, setting KPIs |
| 5. Improvement | Pursuing continuous improvement | PDCA, Kaizen activities |
| 6. Evidence-based Decision Making | Decisions based on data and analysis | Statistical analysis, dashboards |
| 7. Relationship Management | Mutually beneficial relationships with stakeholders | Supplier evaluation, partnerships |
Implementing a checklist system to manage compliance with ISO 9001 requirements.
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import json
class ISO9001ComplianceChecker:
"""System for checking and managing compliance with ISO 9001 requirements
Evaluates compliance with ISO 9001:2015 requirements based on 10-clause structure,
manages evidence, and tracks improvement plans.
"""
def __init__(self):
# ISO 9001:2015 requirements definition
self.requirements = {
"4": {"title": "Context of the Organization", "subclauses": [
"4.1 Understanding the organization and its context",
"4.2 Understanding the needs and expectations of interested parties",
"4.3 Determining the scope of the quality management system",
"4.4 Quality management system and its processes"
]},
"5": {"title": "Leadership", "subclauses": [
"5.1 Leadership and commitment",
"5.2 Policy",
"5.3 Organizational roles, responsibilities and authorities"
]},
"6": {"title": "Planning", "subclauses": [
"6.1 Actions to address risks and opportunities",
"6.2 Quality objectives and planning to achieve them",
"6.3 Planning of changes"
]},
"7": {"title": "Support", "subclauses": [
"7.1 Resources",
"7.2 Competence",
"7.3 Awareness",
"7.4 Communication",
"7.5 Documented information"
]},
"8": {"title": "Operation", "subclauses": [
"8.1 Operational planning and control",
"8.2 Requirements for products and services",
"8.3 Design and development of products and services",
"8.4 Control of externally provided processes, products and services",
"8.5 Production and service provision",
"8.6 Release of products and services",
"8.7 Control of nonconforming outputs"
]},
"9": {"title": "Performance Evaluation", "subclauses": [
"9.1 Monitoring, measurement, analysis and evaluation",
"9.2 Internal audit",
"9.3 Management review"
]},
"10": {"title": "Improvement", "subclauses": [
"10.1 General",
"10.2 Nonconformity and corrective action",
"10.3 Continual improvement"
]}
}
self.compliance_data = []
def add_compliance_record(self, clause: str, subclause: str,
status: str, evidence: str,
auditor: str, notes: str = "") -> Dict:
"""Add compliance record
Args:
clause: Clause number (e.g., "4", "5")
subclause: Detailed requirement
status: Compliance status ("Compliant", "Partial", "Non-Compliant", "Not Applicable")
evidence: Evidence (document number, reference)
auditor: Auditor name
notes: Notes
Returns:
Added record
"""
record = {
"date": datetime.now().strftime("%Y-%m-%d"),
"clause": clause,
"subclause": subclause,
"status": status,
"evidence": evidence,
"auditor": auditor,
"notes": notes,
"action_required": status in ["Partial", "Non-Compliant"]
}
self.compliance_data.append(record)
return record
def get_compliance_summary(self) -> pd.DataFrame:
"""Get compliance summary
Returns:
DataFrame containing compliance rate by clause
"""
if not self.compliance_data:
return pd.DataFrame()
df = pd.DataFrame(self.compliance_data)
# Aggregate by clause
summary = df.groupby('clause').agg({
'status': lambda x: (x == 'Compliant').sum() / len(x) * 100,
'subclause': 'count',
'action_required': 'sum'
}).round(2)
summary.columns = ['Compliance Rate (%)', 'Number of Requirements', 'Items Requiring Action']
summary.index.name = 'Clause'
return summary
def get_non_compliant_items(self) -> List[Dict]:
"""Get list of non-compliant items
Returns:
List of non-compliant and partially compliant items
"""
return [
record for record in self.compliance_data
if record['status'] in ['Partial', 'Non-Compliant']
]
def export_audit_report(self, filename: str):
"""Export audit report
Args:
filename: Output filename (JSON format)
"""
report = {
"report_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"total_requirements": len(self.compliance_data),
"summary": self.get_compliance_summary().to_dict(),
"non_compliant_items": self.get_non_compliant_items(),
"overall_compliance": (
sum(1 for r in self.compliance_data if r['status'] == 'Compliant')
/ len(self.compliance_data) * 100
if self.compliance_data else 0
)
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"Audit report exported to {filename}")
print(f"Overall compliance rate: {report['overall_compliance']:.2f}%")
# Usage example
checker = ISO9001ComplianceChecker()
# Add compliance records
checker.add_compliance_record(
clause="4",
subclause="4.1 Understanding the organization and its context",
status="Compliant",
evidence="DOC-001: SWOT analysis document",
auditor="Taro Tanaka"
)
checker.add_compliance_record(
clause="6",
subclause="6.1 Actions to address risks and opportunities",
status="Partial",
evidence="DOC-015: Risk register (needs update)",
auditor="Hanako Sato",
notes="Risk assessment has not been updated for over 6 months"
)
checker.add_compliance_record(
clause="9",
subclause="9.2 Internal audit",
status="Non-Compliant",
evidence="None",
auditor="Ichiro Suzuki",
notes="Annual internal audit has not been conducted"
)
# Display summary
print("\n=== ISO 9001 Compliance Summary ===")
print(checker.get_compliance_summary())
# Check non-compliant items
print("\n=== Items Requiring Action ===")
for item in checker.get_non_compliant_items():
print(f"[{item['clause']}] {item['subclause']}")
print(f" Status: {item['status']}")
print(f" Notes: {item['notes']}\n")
# Export report
checker.export_audit_report("iso9001_audit_report.json")
A document management system corresponding to ISO 9001 requirement 7.5 "Documented information".
import hashlib
from datetime import datetime
from typing import List, Dict, Optional
import os
class DocumentControlSystem:
"""ISO 9001 compliant document management system
Manages document version control, approval workflows, revision history,
and expiration date management.
"""
def __init__(self):
self.documents = []
self.revision_history = []
def create_document(self, doc_id: str, title: str, doc_type: str,
content: str, owner: str,
review_period_days: int = 365) -> Dict:
"""Create new document
Args:
doc_id: Document number (e.g., "QP-001")
title: Document title
doc_type: Document type ("Procedure", "Form", "Record")
content: Document content
owner: Document owner
review_period_days: Review period (days)
Returns:
Created document information
"""
# Calculate document hash (for tamper detection)
content_hash = hashlib.sha256(content.encode()).hexdigest()
document = {
"doc_id": doc_id,
"title": title,
"type": doc_type,
"version": "1.0",
"status": "Draft", # Draft, Under Review, Approved, Obsolete
"content_hash": content_hash,
"owner": owner,
"created_date": datetime.now(),
"approved_date": None,
"approved_by": None,
"next_review_date": None,
"review_period_days": review_period_days,
"access_level": "Internal" # Public, Internal, Confidential
}
self.documents.append(document)
# Record in revision history
self._add_revision_history(
doc_id, "1.0", "Draft", "New creation", owner
)
return document
def approve_document(self, doc_id: str, approver: str) -> bool:
"""Approve document
Args:
doc_id: Document number
approver: Approver name
Returns:
Approval success status
"""
doc = self._find_document(doc_id)
if not doc:
print(f"Document {doc_id} not found")
return False
if doc['status'] != 'Draft' and doc['status'] != 'Under Review':
print(f"Document {doc_id} cannot be approved in current status: {doc['status']}")
return False
doc['status'] = 'Approved'
doc['approved_date'] = datetime.now()
doc['approved_by'] = approver
doc['next_review_date'] = (
datetime.now() +
pd.Timedelta(days=doc['review_period_days'])
)
self._add_revision_history(
doc_id, doc['version'], "Approved",
f"Approved by {approver}", approver
)
print(f"Document {doc_id} v{doc['version']} approved")
return True
def revise_document(self, doc_id: str, new_content: str,
revision_note: str, revised_by: str) -> Dict:
"""Revise document (create new version)
Args:
doc_id: Document number
new_content: New document content
revision_note: Revision reason
revised_by: Revisor
Returns:
Revised document information
"""
doc = self._find_document(doc_id)
if not doc:
raise ValueError(f"Document {doc_id} not found")
# Update version number
major, minor = map(int, doc['version'].split('.'))
doc['version'] = f"{major}.{minor + 1}"
# Return document to Draft status
doc['status'] = 'Draft'
doc['content_hash'] = hashlib.sha256(new_content.encode()).hexdigest()
doc['approved_date'] = None
doc['approved_by'] = None
# Record in revision history
self._add_revision_history(
doc_id, doc['version'], "Draft", revision_note, revised_by
)
print(f"Document {doc_id} revised to v{doc['version']}")
return doc
def get_documents_for_review(self) -> List[Dict]:
"""Get list of documents requiring review
Returns:
List of documents with approaching or past review dates
"""
now = datetime.now()
review_needed = []
for doc in self.documents:
if doc['status'] != 'Approved':
continue
if doc['next_review_date'] and doc['next_review_date'] <= now:
days_overdue = (now - doc['next_review_date']).days
review_needed.append({
**doc,
"days_overdue": days_overdue,
"priority": "High" if days_overdue > 30 else "Medium"
})
return sorted(review_needed, key=lambda x: x['days_overdue'], reverse=True)
def get_revision_history(self, doc_id: str) -> pd.DataFrame:
"""Get document revision history
Args:
doc_id: Document number
Returns:
Revision history DataFrame
"""
history = [
h for h in self.revision_history if h['doc_id'] == doc_id
]
if not history:
return pd.DataFrame()
return pd.DataFrame(history)
def _find_document(self, doc_id: str) -> Optional[Dict]:
"""Search document by document ID (internal use)"""
for doc in self.documents:
if doc['doc_id'] == doc_id:
return doc
return None
def _add_revision_history(self, doc_id: str, version: str,
status: str, note: str, user: str):
"""Add revision history (internal use)"""
self.revision_history.append({
"doc_id": doc_id,
"version": version,
"status": status,
"note": note,
"user": user,
"timestamp": datetime.now()
})
# Usage example
import pandas as pd
dcs = DocumentControlSystem()
# Create quality procedure
doc = dcs.create_document(
doc_id="QP-001",
title="Internal Audit Procedure",
doc_type="Procedure",
content="1. Purpose\n2. Scope\n3. Procedure...",
owner="Quality Manager",
review_period_days=365
)
print(f"Document created: {doc['doc_id']} - {doc['title']}")
# Approve document
dcs.approve_document("QP-001", "Factory Manager")
# Revise document
dcs.revise_document(
doc_id="QP-001",
new_content="1. Purpose\n2. Scope (updated)\n3. Procedure (added)...",
revision_note="Expanded scope, detailed procedures",
revised_by="Quality Manager"
)
# Re-approve after revision
dcs.approve_document("QP-001", "Factory Manager")
# Display revision history
print("\n=== Revision History ===")
print(dcs.get_revision_history("QP-001"))
An audit management system corresponding to ISO 9001 requirement 9.2 "Internal audit".
from datetime import datetime, timedelta
import pandas as pd
from typing import List, Dict
class InternalAuditSystem:
"""Manages internal audit planning, execution, and follow-up
Creates annual audit plans, records audit execution, tracks findings,
and verifies corrective actions.
"""
def __init__(self):
self.audit_plan = []
self.audit_findings = []
self.corrective_actions = []
def create_annual_audit_plan(self, year: int,
departments: List[str],
frequency: str = "Annual") -> List[Dict]:
"""Create annual audit plan
Args:
year: Target year
departments: List of departments to audit
frequency: Audit frequency ("Annual", "Semi-Annual", "Quarterly")
Returns:
List of created audit plans
"""
freq_map = {
"Annual": 1,
"Semi-Annual": 2,
"Quarterly": 4
}
audits_per_year = freq_map.get(frequency, 1)
for dept in departments:
for i in range(audits_per_year):
# Distribute audit timing evenly
month = (i * 12 // audits_per_year) + 1
planned_date = datetime(year, month, 15)
audit = {
"audit_id": f"IA-{year}-{dept[:3].upper()}-{i+1:02d}",
"department": dept,
"audit_type": "Internal",
"scope": f"Quality management system of {dept} department",
"planned_date": planned_date,
"status": "Planned", # Planned, In Progress, Completed
"lead_auditor": None,
"audit_team": [],
"completion_date": None
}
self.audit_plan.append(audit)
print(f"Created audit plan for fiscal year {year}: {len(self.audit_plan)} audits")
return self.audit_plan
def assign_auditors(self, audit_id: str, lead_auditor: str,
team_members: List[str]):
"""Assign audit team
Args:
audit_id: Audit ID
lead_auditor: Lead auditor
team_members: Audit team members
"""
audit = self._find_audit(audit_id)
if audit:
audit['lead_auditor'] = lead_auditor
audit['audit_team'] = team_members
print(f"Team assigned to audit {audit_id}")
def record_finding(self, audit_id: str, finding_type: str,
clause: str, description: str,
severity: str) -> Dict:
"""Record audit finding
Args:
audit_id: Audit ID
finding_type: "Non-Conformance", "Observation", "Opportunity"
clause: Related ISO clause
description: Finding details
severity: Severity ("Major", "Minor")
Returns:
Recorded finding
"""
finding = {
"finding_id": f"F-{len(self.audit_findings) + 1:04d}",
"audit_id": audit_id,
"type": finding_type,
"clause": clause,
"description": description,
"severity": severity,
"recorded_date": datetime.now(),
"status": "Open", # Open, Under Review, Closed
"ca_required": finding_type == "Non-Conformance"
}
self.audit_findings.append(finding)
return finding
def complete_audit(self, audit_id: str):
"""Complete audit
Args:
audit_id: Audit ID
"""
audit = self._find_audit(audit_id)
if audit:
audit['status'] = 'Completed'
audit['completion_date'] = datetime.now()
# Count related findings
findings = [f for f in self.audit_findings
if f['audit_id'] == audit_id]
print(f"Audit {audit_id} completed")
print(f" Findings: {len(findings)}")
print(f" Non-conformances: {sum(1 for f in findings if f['type'] == 'Non-Conformance')}")
def get_audit_dashboard(self) -> pd.DataFrame:
"""Get audit dashboard data
Returns:
DataFrame showing audit plan progress
"""
if not self.audit_plan:
return pd.DataFrame()
df = pd.DataFrame(self.audit_plan)
# Aggregate by status
status_summary = df['status'].value_counts()
# Findings statistics
findings_df = pd.DataFrame(self.audit_findings)
if not findings_df.empty:
nc_count = (findings_df['type'] == 'Non-Conformance').sum()
open_findings = (findings_df['status'] == 'Open').sum()
else:
nc_count = 0
open_findings = 0
summary = pd.DataFrame({
"Metric": ["Planned", "Completed", "In Progress", "Non-Conformances", "Open Findings"],
"Value": [
(df['status'] == 'Planned').sum(),
(df['status'] == 'Completed').sum(),
(df['status'] == 'In Progress').sum(),
nc_count,
open_findings
]
})
return summary
def get_overdue_audits(self) -> List[Dict]:
"""Get overdue audits
Returns:
List of audits past their planned date
"""
now = datetime.now()
overdue = []
for audit in self.audit_plan:
if (audit['status'] == 'Planned' and
audit['planned_date'] < now):
days_overdue = (now - audit['planned_date']).days
overdue.append({
**audit,
"days_overdue": days_overdue
})
return sorted(overdue, key=lambda x: x['days_overdue'], reverse=True)
def _find_audit(self, audit_id: str) -> Optional[Dict]:
"""Search audit by audit ID (internal use)"""
for audit in self.audit_plan:
if audit['audit_id'] == audit_id:
return audit
return None
# Usage example
ias = InternalAuditSystem()
# Create annual audit plan
departments = ["Manufacturing", "Quality Control", "Purchasing", "Design", "Sales"]
ias.create_annual_audit_plan(2025, departments, frequency="Annual")
# Assign auditors
ias.assign_auditors(
"IA-2025-MAN-01",
lead_auditor="Taro Tanaka (Lead Auditor)",
team_members=["Hanako Sato", "Ichiro Suzuki"]
)
# Record findings
finding1 = ias.record_finding(
audit_id="IA-2025-MAN-01",
finding_type="Non-Conformance",
clause="8.5 Production and service provision",
description="Work procedures not updated to latest version (still 3 years old)",
severity="Major"
)
finding2 = ias.record_finding(
audit_id="IA-2025-MAN-01",
finding_type="Observation",
clause="7.2 Competence",
description="Training records partially incomplete",
severity="Minor"
)
print(f"\nFindings recorded: {finding1['finding_id']}, {finding2['finding_id']}")
# Complete audit
ias.complete_audit("IA-2025-MAN-01")
# Display dashboard
print("\n=== Audit Dashboard ===")
print(ias.get_audit_dashboard())
A system corresponding to ISO 9001 requirement 10.2 "Nonconformity and corrective action".
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import pandas as pd
class CAPASystem:
"""Corrective Action and Preventive Action (CAPA) management system
Manages the lifecycle of nonconformance recording, root cause analysis,
corrective action planning and implementation, and effectiveness verification.
"""
def __init__(self):
self.capas = []
self.root_cause_analyses = []
def create_capa(self, source: str, description: str,
category: str, severity: str,
reported_by: str) -> Dict:
"""Create CAPA (Corrective Action/Preventive Action)
Args:
source: Source ("Internal Audit", "Customer Complaint",
"Process Monitoring", "Supplier Issue")
description: Nonconformance details
category: Classification ("Product", "Process", "Documentation", "Training")
severity: Severity ("Critical", "Major", "Minor")
reported_by: Reporter
Returns:
Created CAPA information
"""
capa_id = f"CAPA-{datetime.now().strftime('%Y%m%d')}-{len(self.capas) + 1:03d}"
# Set response deadline based on severity
due_days = {"Critical": 7, "Major": 30, "Minor": 60}
capa = {
"capa_id": capa_id,
"source": source,
"description": description,
"category": category,
"severity": severity,
"reported_by": reported_by,
"reported_date": datetime.now(),
"status": "Open", # Open, Investigation, Action, Verification, Closed
"assigned_to": None,
"due_date": datetime.now() + timedelta(days=due_days[severity]),
"root_cause": None,
"corrective_action": None,
"preventive_action": None,
"effectiveness_verified": False,
"closed_date": None
}
self.capas.append(capa)
print(f"CAPA created: {capa_id} (due date: {capa['due_date'].strftime('%Y-%m-%d')})")
return capa
def assign_capa(self, capa_id: str, assignee: str):
"""Assign CAPA to responsible person
Args:
capa_id: CAPA number
assignee: Assignee name
"""
capa = self._find_capa(capa_id)
if capa:
capa['assigned_to'] = assignee
capa['status'] = 'Investigation'
print(f"CAPA {capa_id} assigned to {assignee}")
def record_root_cause_analysis(self, capa_id: str,
method: str, findings: str,
root_cause: str, analyst: str) -> Dict:
"""Record root cause analysis
Args:
capa_id: CAPA number
method: Analysis method ("5 Whys", "Fishbone", "FTA", "FMEA")
findings: Analysis results
root_cause: Identified root cause
analyst: Analyst
Returns:
Recorded analysis information
"""
analysis = {
"capa_id": capa_id,
"method": method,
"findings": findings,
"root_cause": root_cause,
"analyst": analyst,
"analysis_date": datetime.now()
}
self.root_cause_analyses.append(analysis)
# Record root cause in CAPA
capa = self._find_capa(capa_id)
if capa:
capa['root_cause'] = root_cause
return analysis
def define_corrective_action(self, capa_id: str,
corrective_action: str,
preventive_action: str = ""):
"""Define corrective and preventive actions
Args:
capa_id: CAPA number
corrective_action: Corrective action (addressing the problem)
preventive_action: Preventive action (preventing recurrence)
"""
capa = self._find_capa(capa_id)
if not capa:
print(f"CAPA {capa_id} not found")
return
capa['corrective_action'] = corrective_action
capa['preventive_action'] = preventive_action
capa['status'] = 'Action'
print(f"Actions defined for CAPA {capa_id}")
def verify_effectiveness(self, capa_id: str,
verified_by: str,
is_effective: bool,
verification_note: str):
"""Verify effectiveness of corrective action
Args:
capa_id: CAPA number
verified_by: Verifier
is_effective: Effectiveness verification result
verification_note: Verification notes
"""
capa = self._find_capa(capa_id)
if not capa:
print(f"CAPA {capa_id} not found")
return
capa['effectiveness_verified'] = is_effective
capa['status'] = 'Closed' if is_effective else 'Action'
if is_effective:
capa['closed_date'] = datetime.now()
print(f"CAPA {capa_id} completed (effectiveness verified)")
else:
print(f"Effectiveness of CAPA {capa_id} is insufficient. Additional actions required")
print(f" Reason: {verification_note}")
def get_open_capas(self) -> pd.DataFrame:
"""Get list of open CAPAs
Returns:
DataFrame of open CAPAs
"""
open_capas = [c for c in self.capas if c['status'] != 'Closed']
if not open_capas:
return pd.DataFrame()
df = pd.DataFrame(open_capas)
df['days_open'] = df['reported_date'].apply(
lambda x: (datetime.now() - x).days
)
df['overdue'] = df['due_date'] < datetime.now()
return df[['capa_id', 'severity', 'category', 'status',
'assigned_to', 'due_date', 'days_open', 'overdue']]
def get_capa_metrics(self) -> Dict:
"""Get CAPA metrics
Returns:
Key CAPA metrics
"""
if not self.capas:
return {}
df = pd.DataFrame(self.capas)
# Average resolution days (closed only)
closed_capas = df[df['status'] == 'Closed'].copy()
if not closed_capas.empty:
closed_capas['resolution_days'] = (
closed_capas['closed_date'] - closed_capas['reported_date']
).dt.days
avg_resolution_days = closed_capas['resolution_days'].mean()
else:
avg_resolution_days = 0
# Overdue rate
now = datetime.now()
overdue_count = sum(1 for c in self.capas
if c['status'] != 'Closed' and c['due_date'] < now)
metrics = {
"Total CAPAs": len(self.capas),
"Open": (df['status'] != 'Closed').sum(),
"Closed": (df['status'] == 'Closed').sum(),
"Overdue": overdue_count,
"Average Resolution Days": round(avg_resolution_days, 1),
"Closure Rate (%)": round((df['status'] == 'Closed').sum() / len(self.capas) * 100, 1)
}
return metrics
def _find_capa(self, capa_id: str) -> Optional[Dict]:
"""Search CAPA by CAPA ID (internal use)"""
for capa in self.capas:
if capa['capa_id'] == capa_id:
return capa
return None
# Usage example
capa_system = CAPASystem()
# Create CAPA
capa1 = capa_system.create_capa(
source="Internal Audit",
description="Non-compliance with work procedures found in production line A. Step 3 of procedure is being skipped.",
category="Process",
severity="Major",
reported_by="Auditor Tanaka"
)
# Assign to responsible person
capa_system.assign_capa(capa1['capa_id'], "Manufacturing Manager Sato")
# Conduct root cause analysis
capa_system.record_root_cause_analysis(
capa_id=capa1['capa_id'],
method="5 Whys",
findings="""
Why was the procedure skipped? → Because it takes time
Why does it take time? → Equipment setup requires time
Why does setup take time? → Jig preparation is complicated
Why is jig preparation complicated? → Not well organized
Why not well organized? → 5S activities have become formalized
""",
root_cause="Formalization of 5S activities led to inadequate jig management. Operators chose to skip procedures.",
analyst="Quality Control Dept. Suzuki"
)
# Define corrective and preventive actions
capa_system.define_corrective_action(
capa_id=capa1['capa_id'],
corrective_action="Immediately organize jigs. Re-train on procedure compliance.",
preventive_action="Revitalize 5S activities. Implement monthly audits. Introduce jig management system."
)
# Verify effectiveness
capa_system.verify_effectiveness(
capa_id=capa1['capa_id'],
verified_by="Quality Manager",
is_effective=True,
verification_note="Re-audit conducted after 1 month. Confirmed 100% procedure compliance. Jig management also improved."
)
# Display metrics
print("\n=== CAPA Metrics ===")
metrics = capa_system.get_capa_metrics()
for key, value in metrics.items():
print(f"{key}: {value}")
ISO 9001:2015 clause 6.1 "Actions to address risks and opportunities" requires risk-based thinking. FMEA (Failure Mode and Effects Analysis) is a method to identify and prioritize potential failure modes in processes or products.
import pandas as pd
from typing import List, Dict
import numpy as np
class FMEASystem:
"""FMEA (Failure Mode and Effects Analysis) implementation system
Conducts Process FMEA or Design FMEA and prioritizes risks
based on RPN (Risk Priority Number).
"""
def __init__(self, fmea_type: str = "Process"):
"""
Args:
fmea_type: FMEA type ("Process" or "Design")
"""
self.fmea_type = fmea_type
self.failure_modes = []
def add_failure_mode(self, process_step: str,
potential_failure: str,
effects: str, severity: int,
causes: str, occurrence: int,
current_controls: str, detection: int,
responsible_person: str = "") -> Dict:
"""Add failure mode
Args:
process_step: Process step/function
potential_failure: Potential failure mode
effects: Effects of failure
severity: Severity (1-10, 10 is most severe)
causes: Causes of failure
occurrence: Occurrence frequency (1-10, 10 is most frequent)
current_controls: Current control methods
detection: Detection rating (1-10, 10 is hardest to detect)
responsible_person: Responsible person
Returns:
Added failure mode information
"""
# Calculate RPN (Risk Priority Number)
rpn = severity * occurrence * detection
failure_mode = {
"process_step": process_step,
"potential_failure": potential_failure,
"effects": effects,
"severity": severity,
"causes": causes,
"occurrence": occurrence,
"current_controls": current_controls,
"detection": detection,
"rpn": rpn,
"responsible_person": responsible_person,
"recommended_actions": "",
"actions_taken": "",
"new_severity": None,
"new_occurrence": None,
"new_detection": None,
"new_rpn": None
}
self.failure_modes.append(failure_mode)
return failure_mode
def prioritize_risks(self, threshold: int = 100) -> pd.DataFrame:
"""Prioritize risks
Args:
threshold: RPN threshold requiring action
Returns:
DataFrame sorted by RPN
"""
if not self.failure_modes:
return pd.DataFrame()
df = pd.DataFrame(self.failure_modes)
# Sort by RPN
df = df.sort_values('rpn', ascending=False)
# Classify risk level
df['risk_level'] = df['rpn'].apply(lambda x:
'High' if x >= 200 else
'Medium' if x >= 100 else
'Low'
)
# Flag items requiring action
df['action_required'] = df['rpn'] >= threshold
return df
def recommend_actions(self, index: int, actions: str):
"""Record recommended actions
Args:
index: Failure mode index
actions: Recommended actions content
"""
if 0 <= index < len(self.failure_modes):
self.failure_modes[index]['recommended_actions'] = actions
def record_actions_taken(self, index: int, actions: str,
new_severity: int, new_occurrence: int,
new_detection: int):
"""Record actions taken and post-improvement evaluation
Args:
index: Failure mode index
actions: Actions taken
new_severity: Post-improvement severity
new_occurrence: Post-improvement occurrence
new_detection: Post-improvement detection
"""
if 0 <= index < len(self.failure_modes):
fm = self.failure_modes[index]
fm['actions_taken'] = actions
fm['new_severity'] = new_severity
fm['new_occurrence'] = new_occurrence
fm['new_detection'] = new_detection
fm['new_rpn'] = new_severity * new_occurrence * new_detection
print(f"RPN improvement: {fm['rpn']} → {fm['new_rpn']} "
f"({(1 - fm['new_rpn']/fm['rpn'])*100:.1f}% reduction)")
def generate_fmea_report(self, filename: str):
"""Generate FMEA report
Args:
filename: Output filename (Excel format)
"""
df = self.prioritize_risks()
# Excel output
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='FMEA', index=False)
# Create summary sheet
summary = pd.DataFrame({
'Item': [
'Total Failure Modes',
'High Risk Items (RPN≥200)',
'Medium Risk Items (100≤RPN<200)',
'Low Risk Items (RPN<100)',
'Average RPN',
'Maximum RPN'
],
'Value': [
len(df),
(df['rpn'] >= 200).sum(),
((df['rpn'] >= 100) & (df['rpn'] < 200)).sum(),
(df['rpn'] < 100).sum(),
round(df['rpn'].mean(), 1),
df['rpn'].max()
]
})
summary.to_excel(writer, sheet_name='Summary', index=False)
print(f"FMEA report exported to {filename}")
# Usage example
fmea = FMEASystem(fmea_type="Process")
# Add failure modes (manufacturing process example)
fmea.add_failure_mode(
process_step="Raw material receiving inspection",
potential_failure="Acceptance of non-conforming raw materials",
effects="Production of defective products, customer complaints, recall",
severity=9, # Very severe
causes="Unclear inspection criteria, insufficient inspector competence",
occurrence=4, # Occurs occasionally
current_controls="Sampling inspection",
detection=5, # Somewhat difficult to detect
responsible_person="Quality Control Dept."
)
fmea.add_failure_mode(
process_step="Molding process",
potential_failure="Dimensional defects",
effects="Assembly defects, performance degradation",
severity=7,
causes="Mold wear, poor temperature control",
occurrence=6,
current_controls="First article inspection, periodic measurement",
detection=3, # Relatively easy to detect
responsible_person="Manufacturing Dept."
)
fmea.add_failure_mode(
process_step="Final inspection",
potential_failure="Inspection oversight",
effects="Shipment of defective products",
severity=10, # Most severe
causes="Too many inspection items, time pressure",
occurrence=2, # Rare
current_controls="Double check",
detection=7, # Difficult to detect
responsible_person="Quality Assurance Dept."
)
# Prioritize risks
print("=== FMEA Risk Analysis ===")
risk_df = fmea.prioritize_risks(threshold=100)
print(risk_df[['process_step', 'potential_failure', 'rpn', 'risk_level']])
# Record recommended actions (for highest RPN item)
fmea.recommend_actions(
index=2, # Final inspection oversight
actions="Introduce automated inspection equipment, simplify inspection procedures, ensure traceability with checklists"
)
# Post-action evaluation
fmea.record_actions_taken(
index=2,
actions="Introduced automated visual inspection equipment. Classified inspection items by importance.",
new_severity=10, # Severity unchanged
new_occurrence=1, # Occurrence greatly reduced
new_detection=2 # Detection improved (through automation)
)
# Generate report
fmea.generate_fmea_report("fmea_report.xlsx")
| RPN Range | Risk Level | Response |
|---|---|---|
| 200-1000 | High Risk | Immediate action required |
| 100-199 | Medium Risk | Implement planned actions |
| 1-99 | Low Risk | Continue monitoring |
Corresponds to ISO 9001 requirement 8.4 "Control of externally provided processes, products and services".
import pandas as pd
from datetime import datetime
from typing import List, Dict
import numpy as np
class SupplierQualitySystem:
"""Supplier quality management system
Manages supplier evaluation, quality performance tracking,
and improvement activities.
"""
def __init__(self):
self.suppliers = []
self.quality_records = []
def register_supplier(self, supplier_id: str, name: str,
product_category: str,
criticality: str = "Medium") -> Dict:
"""Register supplier
Args:
supplier_id: Supplier ID
name: Supplier name
product_category: Product category supplied
criticality: Criticality level ("Critical", "High", "Medium", "Low")
Returns:
Registered supplier information
"""
supplier = {
"supplier_id": supplier_id,
"name": name,
"product_category": product_category,
"criticality": criticality,
"registration_date": datetime.now(),
"status": "Approved", # Approved, Conditional, Suspended
"quality_score": None,
"delivery_score": None,
"overall_rating": None
}
self.suppliers.append(supplier)
print(f"Supplier registered: {name} ({supplier_id})")
return supplier
def record_quality_data(self, supplier_id: str,
delivery_date: datetime,
lot_number: str,
quantity_received: int,
quantity_accepted: int,
defect_count: int,
defect_types: str = "") -> Dict:
"""Record quality data
Args:
supplier_id: Supplier ID
delivery_date: Delivery date
lot_number: Lot number
quantity_received: Quantity received
quantity_accepted: Quantity accepted
defect_count: Defect count
defect_types: Defect types
Returns:
Recorded quality data
"""
# Calculate PPM (Parts Per Million)
ppm = (defect_count / quantity_received * 1_000_000) if quantity_received > 0 else 0
record = {
"supplier_id": supplier_id,
"delivery_date": delivery_date,
"lot_number": lot_number,
"quantity_received": quantity_received,
"quantity_accepted": quantity_accepted,
"defect_count": defect_count,
"defect_types": defect_types,
"ppm": ppm,
"acceptance_rate": (quantity_accepted / quantity_received * 100)
if quantity_received > 0 else 0
}
self.quality_records.append(record)
return record
def calculate_supplier_rating(self, supplier_id: str,
months: int = 6) -> Dict:
"""Calculate supplier rating
Args:
supplier_id: Supplier ID
months: Evaluation period (months)
Returns:
Evaluation results
"""
# Get quality records within period
cutoff_date = datetime.now() - pd.Timedelta(days=months * 30)
records = [
r for r in self.quality_records
if r['supplier_id'] == supplier_id and r['delivery_date'] >= cutoff_date
]
if not records:
return {"error": "No data available for evaluation period"}
df = pd.DataFrame(records)
# Quality score (0-100)
avg_ppm = df['ppm'].mean()
quality_score = max(0, 100 - (avg_ppm / 100)) # 10,000 PPM = 0 points
# Overall rating (A-D)
if quality_score >= 95:
overall_rating = "A"
elif quality_score >= 85:
overall_rating = "B"
elif quality_score >= 70:
overall_rating = "C"
else:
overall_rating = "D"
rating = {
"supplier_id": supplier_id,
"evaluation_period": f"{months} months",
"total_deliveries": len(records),
"total_quantity": df['quantity_received'].sum(),
"total_defects": df['defect_count'].sum(),
"avg_ppm": round(avg_ppm, 2),
"quality_score": round(quality_score, 2),
"overall_rating": overall_rating
}
# Update supplier information
supplier = self._find_supplier(supplier_id)
if supplier:
supplier['quality_score'] = quality_score
supplier['overall_rating'] = overall_rating
return rating
def get_supplier_performance_summary(self) -> pd.DataFrame:
"""Get performance summary for all suppliers
Returns:
DataFrame of performance by supplier
"""
if not self.suppliers:
return pd.DataFrame()
# Calculate rating for each supplier
for supplier in self.suppliers:
self.calculate_supplier_rating(supplier['supplier_id'])
df = pd.DataFrame(self.suppliers)
# Sort by criticality
criticality_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3}
df['criticality_rank'] = df['criticality'].map(criticality_order)
df = df.sort_values(['criticality_rank', 'quality_score'], ascending=[True, False])
return df[['supplier_id', 'name', 'criticality',
'quality_score', 'overall_rating', 'status']]
def identify_improvement_targets(self, threshold_rating: str = "C") -> List[Dict]:
"""Identify suppliers requiring improvement
Args:
threshold_rating: Rating threshold for improvement targets
Returns:
List of suppliers requiring improvement
"""
rating_map = {"A": 4, "B": 3, "C": 2, "D": 1}
threshold_value = rating_map.get(threshold_rating, 2)
targets = []
for supplier in self.suppliers:
if supplier['overall_rating']:
rating_value = rating_map.get(supplier['overall_rating'], 0)
if rating_value <= threshold_value:
targets.append({
"supplier_id": supplier['supplier_id'],
"name": supplier['name'],
"rating": supplier['overall_rating'],
"quality_score": supplier['quality_score'],
"action": "Quality improvement plan development and implementation required"
})
return targets
def _find_supplier(self, supplier_id: str) -> Optional[Dict]:
"""Search supplier by supplier ID (internal use)"""
for supplier in self.suppliers:
if supplier['supplier_id'] == supplier_id:
return supplier
return None
# Usage example
sqs = SupplierQualitySystem()
# Register suppliers
sqs.register_supplier("SUP-001", "ABC Parts Industry", "Electronic Components", criticality="Critical")
sqs.register_supplier("SUP-002", "XYZ Resin", "Resin Materials", criticality="High")
sqs.register_supplier("SUP-003", "123 Packaging Materials", "Packaging Materials", criticality="Low")
# Record quality data
sqs.record_quality_data(
supplier_id="SUP-001",
delivery_date=datetime(2025, 1, 15),
lot_number="LOT-20250115-001",
quantity_received=10000,
quantity_accepted=9950,
defect_count=50,
defect_types="Dimensional defects"
)
sqs.record_quality_data(
supplier_id="SUP-001",
delivery_date=datetime(2025, 2, 20),
lot_number="LOT-20250220-002",
quantity_received=15000,
quantity_accepted=14800,
defect_count=200,
defect_types="Visual defects, dimensional defects"
)
sqs.record_quality_data(
supplier_id="SUP-002",
delivery_date=datetime(2025, 1, 10),
lot_number="LOT-20250110-001",
quantity_received=5000,
quantity_accepted=4990,
defect_count=10,
defect_types="Color unevenness"
)
# Supplier evaluation
print("=== Supplier Evaluation ===")
rating = sqs.calculate_supplier_rating("SUP-001", months=6)
for key, value in rating.items():
print(f"{key}: {value}")
# Performance summary
print("\n=== Supplier Performance Summary ===")
print(sqs.get_supplier_performance_summary())
# Identify improvement targets
print("\n=== Suppliers Requiring Improvement ===")
targets = sqs.identify_improvement_targets(threshold_rating="B")
for target in targets:
print(f"{target['name']} - Rating: {target['rating']}, Score: {target['quality_score']}")
print(f" → {target['action']}")
Corresponds to ISO 9001 requirement 6.2 "Quality objectives and planning to achieve them".
import pandas as pd
from datetime import datetime
from typing import List, Dict, Optional
import matplotlib.pyplot as plt
class QualityKPISystem:
"""Quality objectives and KPI management system
Manages quality objective setting, KPI tracking, and achievement visualization.
"""
def __init__(self, fiscal_year: int):
self.fiscal_year = fiscal_year
self.objectives = []
self.kpi_data = []
def define_objective(self, objective_id: str, title: str,
kpi_name: str, target_value: float,
unit: str, measurement_frequency: str,
responsible_dept: str) -> Dict:
"""Define quality objective
Args:
objective_id: Objective ID
title: Objective title
kpi_name: KPI name
target_value: Target value
unit: Unit
measurement_frequency: Measurement frequency ("Daily", "Weekly", "Monthly")
responsible_dept: Responsible department
Returns:
Defined objective information
"""
objective = {
"objective_id": objective_id,
"title": title,
"kpi_name": kpi_name,
"target_value": target_value,
"unit": unit,
"measurement_frequency": measurement_frequency,
"responsible_dept": responsible_dept,
"fiscal_year": self.fiscal_year,
"status": "Active" # Active, Achieved, At Risk, Not Achieved
}
self.objectives.append(objective)
print(f"Quality objective set: {title} (target value: {target_value}{unit})")
return objective
def record_kpi_data(self, objective_id: str,
measurement_date: datetime,
actual_value: float,
notes: str = "") -> Dict:
"""Record KPI actual data
Args:
objective_id: Objective ID
measurement_date: Measurement date
actual_value: Actual value
notes: Notes
Returns:
Recorded KPI data
"""
objective = self._find_objective(objective_id)
if not objective:
raise ValueError(f"Objective {objective_id} not found")
# Calculate achievement rate
achievement_rate = (actual_value / objective['target_value'] * 100)
kpi_record = {
"objective_id": objective_id,
"measurement_date": measurement_date,
"actual_value": actual_value,
"target_value": objective['target_value'],
"achievement_rate": achievement_rate,
"notes": notes
}
self.kpi_data.append(kpi_record)
return kpi_record
def get_kpi_trend(self, objective_id: str) -> pd.DataFrame:
"""Get KPI trend data
Args:
objective_id: Objective ID
Returns:
Trend data DataFrame
"""
records = [
r for r in self.kpi_data if r['objective_id'] == objective_id
]
if not records:
return pd.DataFrame()
df = pd.DataFrame(records)
df = df.sort_values('measurement_date')
return df
def evaluate_objective_status(self, objective_id: str) -> str:
"""Evaluate objective achievement status
Args:
objective_id: Objective ID
Returns:
Status ("Achieved", "On Track", "At Risk", "Not Achieved")
"""
df = self.get_kpi_trend(objective_id)
if df.empty:
return "No Data"
# Evaluate average achievement rate of last 3 measurements
recent_achievement = df.tail(3)['achievement_rate'].mean()
if recent_achievement >= 100:
status = "Achieved"
elif recent_achievement >= 90:
status = "On Track"
elif recent_achievement >= 70:
status = "At Risk"
else:
status = "Not Achieved"
# Update objective information
objective = self._find_objective(objective_id)
if objective:
objective['status'] = status
return status
def get_objectives_dashboard(self) -> pd.DataFrame:
"""Get quality objectives dashboard
Returns:
DataFrame showing achievement status of all objectives
"""
if not self.objectives:
return pd.DataFrame()
# Evaluate status of each objective
for obj in self.objectives:
self.evaluate_objective_status(obj['objective_id'])
df = pd.DataFrame(self.objectives)
# Add latest actual value
for idx, obj in df.iterrows():
trend_df = self.get_kpi_trend(obj['objective_id'])
if not trend_df.empty:
latest = trend_df.iloc[-1]
df.at[idx, 'latest_value'] = latest['actual_value']
df.at[idx, 'latest_achievement_rate'] = latest['achievement_rate']
else:
df.at[idx, 'latest_value'] = None
df.at[idx, 'latest_achievement_rate'] = None
return df[['objective_id', 'title', 'target_value', 'unit',
'latest_value', 'latest_achievement_rate', 'status',
'responsible_dept']]
def generate_kpi_report(self, objective_id: str, filename: str):
"""Generate KPI report (with trend graph)
Args:
objective_id: Objective ID
filename: Output filename (PNG format)
"""
df = self.get_kpi_trend(objective_id)
objective = self._find_objective(objective_id)
if df.empty or not objective:
print("Insufficient data")
return
# Create graph
fig, ax = plt.subplots(figsize=(10, 6))
# Plot actual values
ax.plot(df['measurement_date'], df['actual_value'],
marker='o', label='Actual Value', linewidth=2)
# Add target value line
ax.axhline(y=objective['target_value'], color='r',
linestyle='--', label='Target Value')
ax.set_xlabel('Measurement Date')
ax.set_ylabel(f"{objective['kpi_name']} ({objective['unit']})")
ax.set_title(f"{objective['title']} - KPI Trend")
ax.legend()
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(filename, dpi=300)
plt.close()
print(f"KPI report exported to {filename}")
def _find_objective(self, objective_id: str) -> Optional[Dict]:
"""Search objective by objective ID (internal use)"""
for obj in self.objectives:
if obj['objective_id'] == objective_id:
return obj
return None
# Usage example
kpi_system = QualityKPISystem(fiscal_year=2025)
# Define quality objectives
kpi_system.define_objective(
objective_id="OBJ-2025-001",
title="Reduce Customer Complaints",
kpi_name="Customer Complaint Count",
target_value=10,
unit="complaints/month",
measurement_frequency="Monthly",
responsible_dept="Quality Assurance Dept."
)
kpi_system.define_objective(
objective_id="OBJ-2025-002",
title="Reduce In-Process Defect Rate",
kpi_name="In-Process Defect Rate",
target_value=0.5,
unit="%",
measurement_frequency="Monthly",
responsible_dept="Manufacturing Dept."
)
# Record KPI actual values
for month in range(1, 7):
kpi_system.record_kpi_data(
objective_id="OBJ-2025-001",
measurement_date=datetime(2025, month, 1),
actual_value=15 - month, # Improvement trend
notes=f"Month {month} actual"
)
kpi_system.record_kpi_data(
objective_id="OBJ-2025-002",
measurement_date=datetime(2025, month, 1),
actual_value=1.0 - (month * 0.08), # Improvement trend
notes=f"Month {month} actual"
)
# Display dashboard
print("=== Quality Objectives Dashboard ===")
print(kpi_system.get_objectives_dashboard())
# Status evaluation
print("\n=== Achievement Status Evaluation ===")
for obj_id in ["OBJ-2025-001", "OBJ-2025-002"]:
status = kpi_system.evaluate_objective_status(obj_id)
print(f"{obj_id}: {status}")
A report generation system corresponding to ISO 9001 requirement 9.3 "Management review".
from datetime import datetime
import pandas as pd
from typing import Dict, List
import json
class ManagementReviewSystem:
"""Management review report generation system
Generates reports for top management to review QMS performance and
improvement opportunities based on ISO 9001 requirement 9.3.
"""
def __init__(self, review_date: datetime, review_period: str):
"""
Args:
review_date: Review execution date
review_period: Review period (e.g., "FY2024 Second Half")
"""
self.review_date = review_date
self.review_period = review_period
self.review_data = {
"basic_info": {
"review_date": review_date.strftime("%Y-%m-%d"),
"review_period": review_period,
"attendees": []
},
"inputs": {},
"decisions": [],
"action_items": []
}
def add_attendee(self, name: str, position: str):
"""Add attendee
Args:
name: Name
position: Position
"""
self.review_data["basic_info"]["attendees"].append({
"name": name,
"position": position
})
def add_customer_feedback(self, satisfaction_score: float,
complaint_count: int,
positive_feedback: str,
improvement_areas: str):
"""Add customer feedback (ISO 9001 9.3.2 a)
Args:
satisfaction_score: Customer satisfaction score (0-100)
complaint_count: Complaint count
positive_feedback: Positive feedback
improvement_areas: Areas requiring improvement
"""
self.review_data["inputs"]["customer_feedback"] = {
"satisfaction_score": satisfaction_score,
"complaint_count": complaint_count,
"positive_feedback": positive_feedback,
"improvement_areas": improvement_areas
}
def add_conformity_status(self, audit_results: str,
nc_count: int,
capa_completion_rate: float):
"""Add conformity and performance status (ISO 9001 9.3.2 b, c)
Args:
audit_results: Audit results summary
nc_count: Non-conformance count
capa_completion_rate: CAPA completion rate (%)
"""
self.review_data["inputs"]["conformity_status"] = {
"audit_results": audit_results,
"nc_count": nc_count,
"capa_completion_rate": capa_completion_rate
}
def add_process_performance(self, kpi_summary: Dict[str, float],
process_efficiency: float):
"""Add process performance (ISO 9001 9.3.2 d)
Args:
kpi_summary: KPI summary (KPI name: achievement rate)
process_efficiency: Process efficiency (%)
"""
self.review_data["inputs"]["process_performance"] = {
"kpi_summary": kpi_summary,
"process_efficiency": process_efficiency
}
def add_resource_adequacy(self, resource_status: str,
training_completion: float,
infrastructure_issues: str):
"""Add resource adequacy (ISO 9001 9.3.2 e)
Args:
resource_status: Resource status evaluation
training_completion: Training completion rate (%)
infrastructure_issues: Infrastructure issues
"""
self.review_data["inputs"]["resource_adequacy"] = {
"resource_status": resource_status,
"training_completion": training_completion,
"infrastructure_issues": infrastructure_issues
}
def add_improvement_actions(self, actions_planned: int,
actions_completed: int,
effectiveness_rate: float):
"""Add improvement action effectiveness (ISO 9001 9.3.2 f)
Args:
actions_planned: Number of planned improvement actions
actions_completed: Number of completed improvement actions
effectiveness_rate: Effectiveness verification rate (%)
"""
self.review_data["inputs"]["improvement_actions"] = {
"actions_planned": actions_planned,
"actions_completed": actions_completed,
"completion_rate": (actions_completed / actions_planned * 100)
if actions_planned > 0 else 0,
"effectiveness_rate": effectiveness_rate
}
def add_changes_impact(self, internal_changes: str,
external_changes: str,
impact_assessment: str):
"""Add changes impact (ISO 9001 9.3.2 g)
Args:
internal_changes: Internal changes
external_changes: External changes
impact_assessment: Impact assessment
"""
self.review_data["inputs"]["changes_impact"] = {
"internal_changes": internal_changes,
"external_changes": external_changes,
"impact_assessment": impact_assessment
}
def add_decision(self, decision_type: str, decision: str,
responsible: str, due_date: str):
"""Add management decision (ISO 9001 9.3.3)
Args:
decision_type: "QMS Improvement", "Product/Service Improvement",
"Resource Needs"
decision: Decision content
responsible: Responsible person
due_date: Due date
"""
self.review_data["decisions"].append({
"type": decision_type,
"decision": decision,
"responsible": responsible,
"due_date": due_date
})
def add_action_item(self, action: str, responsible: str,
due_date: str, priority: str):
"""Add action item
Args:
action: Action content
responsible: Responsible person
due_date: Due date
priority: Priority ("High", "Medium", "Low")
"""
self.review_data["action_items"].append({
"action": action,
"responsible": responsible,
"due_date": due_date,
"priority": priority,
"status": "Open"
})
def generate_report(self, filename: str):
"""Generate management review report
Args:
filename: Output filename (JSON format)
"""
# Add overall assessment
self.review_data["overall_assessment"] = self._calculate_overall_assessment()
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.review_data, f, indent=2, ensure_ascii=False)
print(f"Management review report exported to {filename}")
# Display summary
self._print_summary()
def _calculate_overall_assessment(self) -> Dict:
"""Calculate overall assessment (internal use)"""
inputs = self.review_data["inputs"]
# Score calculation
scores = []
if "customer_feedback" in inputs:
scores.append(inputs["customer_feedback"]["satisfaction_score"])
if "conformity_status" in inputs:
scores.append(inputs["conformity_status"]["capa_completion_rate"])
if "process_performance" in inputs:
scores.append(inputs["process_performance"]["process_efficiency"])
if "resource_adequacy" in inputs:
scores.append(inputs["resource_adequacy"]["training_completion"])
if "improvement_actions" in inputs:
scores.append(inputs["improvement_actions"]["completion_rate"])
avg_score = sum(scores) / len(scores) if scores else 0
# Evaluation rank
if avg_score >= 90:
rank = "Excellent"
elif avg_score >= 80:
rank = "Good"
elif avg_score >= 70:
rank = "Acceptable"
else:
rank = "Needs Improvement"
return {
"average_score": round(avg_score, 2),
"rank": rank,
"total_decisions": len(self.review_data["decisions"]),
"total_actions": len(self.review_data["action_items"])
}
def _print_summary(self):
"""Display summary (internal use)"""
assessment = self.review_data["overall_assessment"]
print("\n" + "="*60)
print("Management Review Summary")
print("="*60)
print(f"Review Period: {self.review_period}")
print(f"Date: {self.review_date.strftime('%Y-%m-%d')}")
print(f"Attendees: {len(self.review_data['basic_info']['attendees'])} persons")
print(f"\nOverall Assessment: {assessment['rank']} ({assessment['average_score']} points)")
print(f"Decisions: {assessment['total_decisions']}")
print(f"Action Items: {assessment['total_actions']}")
print("="*60)
# Usage example
mr_system = ManagementReviewSystem(
review_date=datetime(2025, 3, 15),
review_period="FY2024 Second Half"
)
# Add attendees
mr_system.add_attendee("Taro Yamada", "President & CEO")
mr_system.add_attendee("Hanako Sato", "Quality Manager")
mr_system.add_attendee("Ichiro Suzuki", "Manufacturing Manager")
# Add input information
mr_system.add_customer_feedback(
satisfaction_score=85.5,
complaint_count=12,
positive_feedback="Improved delivery compliance. Stable product quality.",
improvement_areas="Faster technical support response"
)
mr_system.add_conformity_status(
audit_results="5 internal audits conducted. 0 Major NCs, 3 Minor NCs.",
nc_count=3,
capa_completion_rate=92.0
)
mr_system.add_process_performance(
kpi_summary={
"Customer Complaint Reduction": 95.0,
"In-Process Defect Rate Reduction": 88.0,
"Delivery Compliance": 98.0
},
process_efficiency=91.5
)
mr_system.add_resource_adequacy(
resource_status="Generally adequate. Some equipment aging issues.",
training_completion=87.0,
infrastructure_issues="Calibration management system update needed for measuring equipment"
)
mr_system.add_improvement_actions(
actions_planned=15,
actions_completed=13,
effectiveness_rate=85.0
)
mr_system.add_changes_impact(
internal_changes="Introduction of new production line, digitalization of quality management system",
external_changes="Preparation for transition to ISO 9001:2015 revised version",
impact_assessment="Positive impact. Additional resources required."
)
# Add decisions
mr_system.add_decision(
decision_type="QMS Improvement",
decision="Complete full digitalization of quality management system by FY2025 Q2",
responsible="Quality Manager",
due_date="2025-09-30"
)
mr_system.add_decision(
decision_type="Resource Needs",
decision="Secure budget for measuring equipment update (5 million yen)",
responsible="Corporate Planning Manager",
due_date="2025-06-30"
)
# Add action items
mr_system.add_action_item(
action="Establish process to reduce technical support response time to within 24 hours",
responsible="Sales Manager",
due_date="2025-05-31",
priority="High"
)
mr_system.add_action_item(
action="Conduct internal auditor competence improvement training",
responsible="Quality Manager",
due_date="2025-04-30",
priority="Medium"
)
# Generate report
mr_system.generate_report("management_review_2024H2.json")
In this chapter, we learned about ISO 9001 Quality Management System requirements and practical implementation methods using Python.