Systematic Analysis of System Failure Modes and Reliability Assessment
FMEA (Failure Mode and Effects Analysis) is a methodology that systematically analyzes possible failure modes, their effects, occurrence probability, and detectability for each component of a system. In the process industries, it is widely used for equipment reliability assessment and maintenance planning.
The core of FMEA is the calculation of Risk Priority Number (RPN). RPN = Severity(S) × Occurrence(O) × Detection(D) is calculated and evaluated on a 1000-point scale.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
# Example 1: Basic FMEA Implementation
import pandas as pd
import numpy as np
from typing import List, Dict
class FMEA:
"""Failure Mode and Effects Analysis (FMEA) System
Systematically analyze failure modes, effects, and risks for each equipment
"""
def __init__(self):
self.analysis_data = []
def add_failure_mode(self, component: str, failure_mode: str,
effects: str, causes: str,
severity: int, occurrence: int, detection: int,
current_controls: str = ""):
"""Add failure mode
Args:
component: Target component
failure_mode: Failure mode
effects: Effects
causes: Causes
severity: Severity (1-10)
occurrence: Occurrence (1-10)
detection: Detection (1-10)
current_controls: Current controls
"""
rpn = severity * occurrence * detection
self.analysis_data.append({
'Component': component,
'Failure Mode': failure_mode,
'Effects': effects,
'Causes': causes,
'Severity': severity,
'Occurrence': occurrence,
'Detection': detection,
'RPN': rpn,
'Current Controls': current_controls
})
def get_analysis(self) -> pd.DataFrame:
"""Get FMEA analysis results"""
df = pd.DataFrame(self.analysis_data)
return df.sort_values('RPN', ascending=False)
def get_high_risk_items(self, threshold: int = 200) -> pd.DataFrame:
"""Extract high-risk items
Args:
threshold: RPN threshold (default: RPN ≥ 200 considered high risk)
"""
df = self.get_analysis()
return df[df['RPN'] >= threshold]
# Practical example: FMEA for chemical reactor
fmea = FMEA()
# Temperature control system failure modes
fmea.add_failure_mode(
component="Temperature Sensor",
failure_mode="Sensor failure (high temperature side)",
effects="Reaction runaway, pressure rise, explosion risk",
causes="Sensor degradation, wiring disconnection",
severity=9,
occurrence=3,
detection=4,
current_controls="Redundant sensors, periodic calibration"
)
fmea.add_failure_mode(
component="Cooling Water Pump",
failure_mode="Pump stoppage",
effects="Reactor overheating, product quality degradation",
causes="Motor failure, bearing wear",
severity=8,
occurrence=4,
detection=2,
current_controls="Flow meter alarm, backup pump"
)
fmea.add_failure_mode(
component="Pressure Safety Valve",
failure_mode="Malfunction (does not open)",
effects="Overpressure vessel rupture",
causes="Valve seat seizure, spring degradation",
severity=10,
occurrence=2,
detection=8,
current_controls="Annual inspection, pressure test"
)
# Display analysis results
analysis_df = fmea.get_analysis()
print("=== FMEA Analysis Results (RPN order) ===")
print(analysis_df[['Component', 'Failure Mode', 'RPN']].to_string(index=False))
print(f"\nAverage RPN: {analysis_df['RPN'].mean():.1f}")
print(f"Maximum RPN: {analysis_df['RPN'].max()}")
# Extract high-risk items
high_risk = fmea.get_high_risk_items(threshold=150)
print(f"\nHigh-risk items (RPN≥150): {len(high_risk)} items")
print(high_risk[['Component', 'RPN']].to_string(index=False))
# Output example:
# === FMEA Analysis Results (RPN order) ===
# Component Failure Mode RPN
# Pressure Safety Valve Malfunction (does not open) 160
# Temperature Sensor Sensor failure (high temperature side) 108
# Cooling Water Pump Pump stoppage 64
#
# Average RPN: 110.7
# Maximum RPN: 160
#
# High-risk items (RPN≥150): 1 item
# Component RPN
# Pressure Safety Valve 160
Criticality analysis extends FMEA to quantitatively evaluate the impact of failure modes. Criticality of each failure mode = Occurrence probability × Severity is calculated and ranked by importance.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# Example 2: Criticality Analysis
import matplotlib.pyplot as plt
class CriticalityAnalysis:
"""Criticality Analysis
Quantitatively evaluate the importance of failure modes and determine priorities
"""
def __init__(self, fmea_data: pd.DataFrame):
self.data = fmea_data.copy()
self._calculate_criticality()
def _calculate_criticality(self):
"""Calculate criticality
Criticality = (Occurrence/10) * Severity * α
α: Weighting factor based on detectability (larger for harder to detect)
"""
# Calculate weighting factor from detection (difficult to detect = larger factor)
detection_weight = self.data['Detection'] / 10
# Criticality = Occurrence probability × Severity × Detection difficulty
self.data['Criticality'] = (
(self.data['Occurrence'] / 10) *
self.data['Severity'] *
detection_weight
)
# Criticality ranking (1-4)
criticality_values = self.data['Criticality']
self.data['Criticality Rank'] = pd.cut(
criticality_values,
bins=[0, 2, 4, 6, 10],
labels=['Low', 'Medium', 'High', 'Critical']
)
def get_critical_items(self, rank: str = 'Critical') -> pd.DataFrame:
"""Get items by criticality rank"""
return self.data[self.data['Criticality Rank'] == rank]
def plot_criticality_matrix(self):
"""Plot criticality matrix (Severity vs Occurrence)"""
fig, ax = plt.subplots(figsize=(10, 8))
# Bubble chart (bubble size = detection)
scatter = ax.scatter(
self.data['Occurrence'],
self.data['Severity'],
s=self.data['Detection'] * 50, # Bubble size
c=self.data['Criticality'],
cmap='YlOrRd',
alpha=0.6,
edgecolors='black'
)
# Label each point
for idx, row in self.data.iterrows():
ax.annotate(
row['Component'],
(row['Occurrence'], row['Severity']),
fontsize=8,
ha='center'
)
ax.set_xlabel('Occurrence', fontsize=12)
ax.set_ylabel('Severity', fontsize=12)
ax.set_title('Criticality Matrix', fontsize=14)
ax.grid(True, alpha=0.3)
# Colorbar
cbar = plt.colorbar(scatter)
cbar.set_label('Criticality', fontsize=10)
return fig
# Practical example
ca = CriticalityAnalysis(analysis_df)
print("=== Criticality Analysis Results ===")
result = ca.data[['Component', 'Criticality', 'Criticality Rank']]
result = result.sort_values('Criticality', ascending=False)
print(result.to_string(index=False))
# Statistics by rank
print("\n=== Statistics by Criticality Rank ===")
rank_stats = ca.data['Criticality Rank'].value_counts().sort_index()
print(rank_stats)
# Details of Critical items
critical_items = ca.get_critical_items('Critical')
if len(critical_items) > 0:
print(f"\n⚠️ Critical items (highest priority countermeasures): {len(critical_items)} items")
print(critical_items[['Component', 'Failure Mode', 'Criticality']].to_string(index=False))
# Output example:
# === Criticality Analysis Results ===
# Component Criticality Criticality Rank
# Pressure Safety Valve 1.60 Low
# Temperature Sensor 1.08 Low
# Cooling Water Pump 0.64 Low
#
# === Statistics by Criticality Rank ===
# Low 3
# Medium 0
# High 0
# Critical 0
Fault Tree Analysis (FTA) represents undesired events (top events) down to basic events that cause them, connected by logic gates (AND/OR) in a tree structure. This allows quantitative evaluation of overall system reliability.
# Example 3: Fault Tree Construction
from dataclasses import dataclass
from typing import Optional, List
from enum import Enum
class GateType(Enum):
"""Types of logic gates"""
AND = "AND" # True when all inputs are true
OR = "OR" # True when at least one input is true
@dataclass
class FaultTreeNode:
"""Node of fault tree"""
name: str
probability: float = 0.0 # Failure probability
gate_type: Optional[GateType] = None
children: List['FaultTreeNode'] = None
def __post_init__(self):
if self.children is None:
self.children = []
class FaultTree:
"""Fault Tree Analysis (FTA) System"""
def __init__(self, top_event: str):
self.root = FaultTreeNode(name=top_event)
def add_gate(self, parent_name: str, gate_type: GateType,
children: List[tuple]):
"""Add logic gate
Args:
parent_name: Parent node name
gate_type: Gate type (AND/OR)
children: [(child node name, failure probability), ...]
"""
parent = self._find_node(self.root, parent_name)
if parent is None:
raise ValueError(f"Node '{parent_name}' not found")
parent.gate_type = gate_type
for child_name, prob in children:
child_node = FaultTreeNode(name=child_name, probability=prob)
parent.children.append(child_node)
def _find_node(self, node: FaultTreeNode, name: str) -> Optional[FaultTreeNode]:
"""Search node by name"""
if node.name == name:
return node
for child in node.children:
result = self._find_node(child, name)
if result:
return result
return None
def calculate_top_event_probability(self) -> float:
"""Calculate top event occurrence probability"""
return self._calculate_node_probability(self.root)
def _calculate_node_probability(self, node: FaultTreeNode) -> float:
"""Recursively calculate node failure probability"""
# For basic events (leaf nodes)
if not node.children:
return node.probability
# Calculate child node probabilities
child_probs = [self._calculate_node_probability(child)
for child in node.children]
# Calculate based on gate type
if node.gate_type == GateType.AND:
# AND gate: probability that all fail
node.probability = np.prod(child_probs)
elif node.gate_type == GateType.OR:
# OR gate: probability that at least one fails
# P(A∪B) = 1 - P(A'∩B') = 1 - (1-P(A))(1-P(B))
node.probability = 1 - np.prod([1 - p for p in child_probs])
return node.probability
def print_tree(self, node: Optional[FaultTreeNode] = None,
level: int = 0):
"""Display fault tree"""
if node is None:
node = self.root
indent = " " * level
gate_str = f" [{node.gate_type.value}]" if node.gate_type else ""
prob_str = f" (P={node.probability:.4f})" if node.probability > 0 else ""
print(f"{indent}{node.name}{gate_str}{prob_str}")
for child in node.children:
self.print_tree(child, level + 1)
# Practical example: Fault tree for reactor overpressure accident
ft = FaultTree(top_event="Reactor Overpressure Accident")
# Level 1: Direct causes of overpressure (OR gate)
ft.add_gate(
parent_name="Reactor Overpressure Accident",
gate_type=GateType.OR,
children=[
("Reaction Runaway", 0.0),
("Pressure Control System Failure", 0.0),
("Safety Valve Malfunction", 0.002) # Annual failure probability 0.2%
]
)
# Level 2: Causes of reaction runaway (OR gate)
ft.add_gate(
parent_name="Reaction Runaway",
gate_type=GateType.OR,
children=[
("Temperature Control Failure", 0.0),
("Excessive Raw Material Input", 0.001)
]
)
# Level 3: Causes of temperature control failure (AND gate - all occur simultaneously)
ft.add_gate(
parent_name="Temperature Control Failure",
gate_type=GateType.AND,
children=[
("Cooling System Failure", 0.01), # Annual 1%
("Temperature Sensor Failure", 0.005) # Annual 0.5%
]
)
# Level 3: Causes of pressure control system failure (OR gate)
ft.add_gate(
parent_name="Pressure Control System Failure",
gate_type=GateType.OR,
children=[
("Pressure Sensor Failure", 0.003),
("Control Valve Failure", 0.004)
]
)
# Calculate top event probability
top_prob = ft.calculate_top_event_probability()
print("=== Fault Tree Analysis Results ===\n")
ft.print_tree()
print(f"\n【Top Event Occurrence Probability】")
print(f"Reactor Overpressure Accident: {top_prob:.6f} (Annual {top_prob*100:.4f}%)")
print(f"Expected frequency: Once every {1/top_prob:.1f} years" if top_prob > 0 else "")
# Output example:
# === Fault Tree Analysis Results ===
#
# Reactor Overpressure Accident [OR] (P=0.0090)
# Reaction Runaway [OR] (P=0.0010)
# Temperature Control Failure [AND] (P=0.0001)
# Cooling System Failure (P=0.0100)
# Temperature Sensor Failure (P=0.0050)
# Excessive Raw Material Input (P=0.0010)
# Pressure Control System Failure [OR] (P=0.0070)
# Pressure Sensor Failure (P=0.0030)
# Control Valve Failure (P=0.0040)
# Safety Valve Malfunction (P=0.0020)
#
# 【Top Event Occurrence Probability】
# Reactor Overpressure Accident: 0.009000 (Annual 0.9000%)
# Expected frequency: Once every 111.1 years
Minimal Cut Sets are the minimum combinations of basic events that, when occurring simultaneously, cause the top event to occur. Identifying these allows understanding of the most important failure paths.
# Example 4: Minimal Cut Set Analysis
from itertools import combinations
class MinimalCutSetAnalyzer:
"""Minimal Cut Set Analysis
Derive minimum combinations of basic events that cause the top event
"""
def __init__(self, fault_tree: FaultTree):
self.fault_tree = fault_tree
self.minimal_cut_sets = []
def find_minimal_cut_sets(self) -> List[List[str]]:
"""Derive minimal cut sets"""
self.minimal_cut_sets = []
self._traverse_tree(self.fault_tree.root, [])
# Check minimality (exclude supersets of other cut sets)
self.minimal_cut_sets = self._remove_supersets(self.minimal_cut_sets)
return self.minimal_cut_sets
def _traverse_tree(self, node: FaultTreeNode, current_path: List[str]):
"""Traverse fault tree to extract cut sets"""
# Reached leaf node (basic event)
if not node.children:
self.minimal_cut_sets.append(current_path + [node.name])
return
if node.gate_type == GateType.AND:
# AND gate: add all children to same path
new_path = current_path.copy()
for child in node.children:
if not child.children: # Basic event
new_path.append(child.name)
else:
self._traverse_tree(child, new_path)
# If all are basic events
if all(not child.children for child in node.children):
self.minimal_cut_sets.append(new_path)
elif node.gate_type == GateType.OR:
# OR gate: expand each child as individual path
for child in node.children:
self._traverse_tree(child, current_path.copy())
def _remove_supersets(self, cut_sets: List[List[str]]) -> List[List[str]]:
"""Exclude non-minimal cut sets (supersets)"""
minimal = []
sorted_sets = sorted(cut_sets, key=len)
for cs in sorted_sets:
cs_set = set(cs)
is_minimal = True
for existing in minimal:
if set(existing).issubset(cs_set):
is_minimal = False
break
if is_minimal:
minimal.append(cs)
return minimal
def calculate_cut_set_probabilities(self) -> pd.DataFrame:
"""Calculate occurrence probability for each cut set"""
results = []
for i, cut_set in enumerate(self.minimal_cut_sets, 1):
# Get probabilities of basic events in cut set
probabilities = []
for event_name in cut_set:
node = self.fault_tree._find_node(
self.fault_tree.root, event_name
)
if node:
probabilities.append(node.probability)
# Cut set occurrence probability = all events occur simultaneously
cut_set_prob = np.prod(probabilities) if probabilities else 0
results.append({
'Cut Set ID': f"CS{i}",
'Events': ' AND '.join(cut_set),
'Order': len(cut_set),
'Probability': cut_set_prob
})
df = pd.DataFrame(results)
return df.sort_values('Probability', ascending=False)
def get_importance_measures(self) -> pd.DataFrame:
"""Calculate importance measures for each basic event
Fussell-Vesely importance: contribution of cut sets containing that event
"""
# Extract all basic events
all_events = set()
for cut_set in self.minimal_cut_sets:
all_events.update(cut_set)
# Top event probability
top_prob = self.fault_tree.calculate_top_event_probability()
results = []
for event in all_events:
# Sum of probabilities of cut sets containing this event
containing_prob = 0
for cut_set in self.minimal_cut_sets:
if event in cut_set:
probs = []
for e in cut_set:
node = self.fault_tree._find_node(
self.fault_tree.root, e
)
if node:
probs.append(node.probability)
containing_prob += np.prod(probs)
# Fussell-Vesely importance
fv_importance = containing_prob / top_prob if top_prob > 0 else 0
results.append({
'Event': event,
'FV Importance': fv_importance,
'Cut Sets': sum(1 for cs in self.minimal_cut_sets if event in cs)
})
df = pd.DataFrame(results)
return df.sort_values('FV Importance', ascending=False)
# Practical example
mcs_analyzer = MinimalCutSetAnalyzer(ft)
minimal_cuts = mcs_analyzer.find_minimal_cut_sets()
print("\n=== Minimal Cut Sets ===")
for i, cut_set in enumerate(minimal_cuts, 1):
print(f"CS{i}: {' AND '.join(cut_set)}")
# Cut set probabilities
cut_set_probs = mcs_analyzer.calculate_cut_set_probabilities()
print("\n=== Cut Set Occurrence Probabilities ===")
print(cut_set_probs.to_string(index=False))
# Importance analysis
importance = mcs_analyzer.get_importance_measures()
print("\n=== Importance of Basic Events ===")
print(importance.to_string(index=False))
# Output example:
# === Minimal Cut Sets ===
# CS1: Cooling System Failure AND Temperature Sensor Failure
# CS2: Excessive Raw Material Input
# CS3: Pressure Sensor Failure
# CS4: Control Valve Failure
# CS5: Safety Valve Malfunction
#
# === Cut Set Occurrence Probabilities ===
# Cut Set ID Events Order Probability
# CS2 Excessive Raw Material Input 1 0.0010
# CS4 Control Valve Failure 1 0.0004
# CS3 Pressure Sensor Failure 1 0.0003
# CS5 Safety Valve Malfunction 1 0.0002
# CS1 Cooling System Failure AND Temperature Sensor Failure 2 0.0001
# Requirements:
# - Python 3.9+
# - scipy>=1.11.0
# Example 5: Advanced Reliability Calculation
import scipy.stats as stats
class ReliabilityCalculator:
"""System Reliability Calculation
Calculate failure rate, MTBF, and availability
"""
@staticmethod
def failure_rate_to_probability(lambda_rate: float, time: float) -> float:
"""Calculate failure probability from failure rate
Args:
lambda_rate: Failure rate [1/hour]
time: Time [hour]
Returns:
Failure probability P(t) = 1 - exp(-λt)
"""
return 1 - np.exp(-lambda_rate * time)
@staticmethod
def calculate_mtbf(failure_rate: float) -> float:
"""Calculate Mean Time Between Failures (MTBF)
Args:
failure_rate: Failure rate [1/hour]
Returns:
MTBF [hour]
"""
return 1 / failure_rate if failure_rate > 0 else float('inf')
@staticmethod
def calculate_availability(mtbf: float, mttr: float) -> float:
"""Calculate availability (uptime rate)
Args:
mtbf: Mean time between failures [hour]
mttr: Mean time to repair [hour]
Returns:
Availability = MTBF / (MTBF + MTTR)
"""
return mtbf / (mtbf + mttr)
@staticmethod
def weibull_reliability(t: float, beta: float, eta: float) -> float:
"""Reliability function using Weibull distribution
Args:
t: Time
beta: Shape parameter (β<1: early failure, β=1: random failure, β>1: wear-out failure)
eta: Scale parameter (characteristic life)
Returns:
R(t) = exp(-(t/η)^β)
"""
return np.exp(-((t / eta) ** beta))
@staticmethod
def series_system_reliability(reliabilities: List[float]) -> float:
"""Reliability of series system
R_sys = R1 × R2 × ... × Rn
"""
return np.prod(reliabilities)
@staticmethod
def parallel_system_reliability(reliabilities: List[float]) -> float:
"""Reliability of parallel system (redundancy)
R_sys = 1 - (1-R1) × (1-R2) × ... × (1-Rn)
"""
return 1 - np.prod([1 - r for r in reliabilities])
# Practical example
rc = ReliabilityCalculator()
# Equipment specifications
equipment = {
'Temperature Sensor': {'lambda': 5.7e-6, 'mttr': 4}, # Failure rate [1/h], MTTR [h]
'Pressure Sensor': {'lambda': 6.2e-6, 'mttr': 4},
'Control Valve': {'lambda': 3.8e-6, 'mttr': 8},
'Cooling Pump': {'lambda': 1.2e-5, 'mttr': 12}
}
print("=== Equipment Reliability Analysis (1 year operation) ===\n")
operation_time = 8760 # 1 year = 8760 hours
for name, spec in equipment.items():
lambda_rate = spec['lambda']
mttr = spec['mttr']
# Calculate various metrics
failure_prob = rc.failure_rate_to_probability(lambda_rate, operation_time)
reliability = 1 - failure_prob
mtbf = rc.calculate_mtbf(lambda_rate)
availability = rc.calculate_availability(mtbf, mttr)
print(f"【{name}】")
print(f" Failure rate: {lambda_rate:.2e} [1/h]")
print(f" MTBF: {mtbf:.0f} h ({mtbf/8760:.1f} years)")
print(f" MTTR: {mttr} h")
print(f" Annual failure probability: {failure_prob*100:.2f}%")
print(f" Annual reliability: {reliability*100:.2f}%")
print(f" Availability: {availability*100:.4f}%\n")
# Series system (system cannot operate unless all are normal)
series_reliabilities = [
1 - rc.failure_rate_to_probability(spec['lambda'], operation_time)
for spec in equipment.values()
]
series_reliability = rc.series_system_reliability(series_reliabilities)
print(f"【Overall Series System】")
print(f" System annual reliability: {series_reliability*100:.2f}%")
print(f" System annual failure probability: {(1-series_reliability)*100:.2f}%")
# Parallel system (when sensors are duplicated)
sensor_reliability = 1 - rc.failure_rate_to_probability(5.7e-6, operation_time)
redundant_reliability = rc.parallel_system_reliability([sensor_reliability, sensor_reliability])
print(f"\n【Effect of Temperature Sensor Duplication】")
print(f" Single: {sensor_reliability*100:.2f}%")
print(f" Duplicated: {redundant_reliability*100:.4f}%")
print(f" Reliability improvement: {(redundant_reliability-sensor_reliability)*100:.4f}%")
# Output example:
# === Equipment Reliability Analysis (1 year operation) ===
#
# 【Temperature Sensor】
# Failure rate: 5.70e-06 [1/h]
# MTBF: 175439 h (20.0 years)
# MTTR: 4 h
# Annual failure probability: 4.90%
# Annual reliability: 95.10%
# Availability: 99.9977%
# Example 6: Common Cause Failure Analysis
class CommonCauseFailureAnalysis:
"""Common Cause Failure (CCF) Analysis
Evaluate simultaneous failures in redundant systems due to common causes
"""
def __init__(self):
self.beta_factors = {} # β factor (CCF occurrence ratio)
def set_beta_factor(self, component: str, beta: float):
"""Set β factor
Args:
component: Target component
beta: β factor (0-1, typically 0.05-0.15)
"""
if not 0 <= beta <= 1:
raise ValueError("β factor must be specified in the range 0-1")
self.beta_factors[component] = beta
def calculate_redundant_reliability_with_ccf(
self,
component: str,
base_reliability: float,
n_redundant: int = 2
) -> float:
"""Reliability of redundant system considering CCF
Args:
component: Component name
base_reliability: Single unit reliability
n_redundant: Number of redundant units
Returns:
System reliability after considering CCF
"""
beta = self.beta_factors.get(component, 0)
# Independent failure probability
independent_failure = (1 - base_reliability) * (1 - beta)
# CCF failure probability
ccf_failure = (1 - base_reliability) * beta
# Redundant system reliability
# R_sys = (redundancy considering independent failures) × (no CCF)
independent_reliability = 1 - (independent_failure ** n_redundant)
ccf_reliability = 1 - ccf_failure
total_reliability = independent_reliability * ccf_reliability
return total_reliability
def compare_ccf_impact(
self,
component: str,
base_reliability: float,
n_redundant: int = 2
) -> Dict:
"""Compare impact of CCF
Returns:
Comparison of reliability before and after considering CCF
"""
# Redundancy effect when CCF is ignored
without_ccf = 1 - ((1 - base_reliability) ** n_redundant)
# When CCF is considered
with_ccf = self.calculate_redundant_reliability_with_ccf(
component, base_reliability, n_redundant
)
# Reliability degradation
degradation = without_ccf - with_ccf
return {
'Component': component,
'Base Reliability': base_reliability,
'Redundancy': n_redundant,
'Beta Factor': self.beta_factors.get(component, 0),
'Without CCF': without_ccf,
'With CCF': with_ccf,
'Degradation': degradation,
'Degradation %': (degradation / without_ccf * 100) if without_ccf > 0 else 0
}
# Practical example
ccf = CommonCauseFailureAnalysis()
# Set β factor for each component (typical values based on industrial data)
ccf.set_beta_factor('Temperature Sensor', beta=0.10) # 10% is CCF
ccf.set_beta_factor('Pressure Sensor', beta=0.12)
ccf.set_beta_factor('Control Valve', beta=0.08)
# Single unit reliability (1 year)
base_reliabilities = {
'Temperature Sensor': 0.95,
'Pressure Sensor': 0.95,
'Control Valve': 0.97
}
print("=== Impact Assessment of Common Cause Failure (CCF) ===\n")
results = []
for component, base_rel in base_reliabilities.items():
comparison = ccf.compare_ccf_impact(component, base_rel, n_redundant=2)
results.append(comparison)
print(f"【{component}】(Duplicated)")
print(f" Single unit reliability: {comparison['Base Reliability']*100:.2f}%")
print(f" β factor: {comparison['Beta Factor']:.2f}")
print(f" Without CCF: {comparison['Without CCF']*100:.4f}%")
print(f" With CCF: {comparison['With CCF']*100:.4f}%")
print(f" Reliability degradation: {comparison['Degradation']*100:.4f}% "
f"({comparison['Degradation %']:.2f}% decrease)\n")
# Compare results in DataFrame
df_ccf = pd.DataFrame(results)
print("=== CCF Impact Summary ===")
print(df_ccf[['Component', 'Beta Factor', 'Without CCF', 'With CCF', 'Degradation %']])
# Output example:
# === Impact Assessment of Common Cause Failure (CCF) ===
#
# 【Temperature Sensor】(Duplicated)
# Single unit reliability: 95.00%
# β factor: 0.10
# Without CCF: 99.7500%
# With CCF: 99.2525%
# Reliability degradation: 0.4975% (0.50% decrease)
By integrating FMEA and FTA, comprehensive safety assessment from component level to system level becomes possible. High-risk items identified by FMEA are quantitatively evaluated with FTA.
# Example 7: Integrated FMEA/FTA Analysis System
class IntegratedSafetyAnalysis:
"""Integrated FMEA/FTA Safety Assessment System
Quantitatively evaluate risks identified by FMEA with FTA and propose optimal countermeasures
"""
def __init__(self):
self.fmea = FMEA()
self.fault_trees = {}
self.mitigation_strategies = []
def conduct_fmea(self, components: List[Dict]):
"""Conduct FMEA
Args:
components: [{component, failure_mode, ...}, ...]
"""
for comp in components:
self.fmea.add_failure_mode(**comp)
def build_fault_tree_for_high_risk(self, threshold: int = 200):
"""Build fault tree for high-risk items"""
high_risk_items = self.fmea.get_high_risk_items(threshold)
for _, item in high_risk_items.iterrows():
component = item['Component']
failure_mode = item['Failure Mode']
# Build fault tree (simplified)
ft_name = f"{component}_{failure_mode}"
self.fault_trees[ft_name] = {
'component': component,
'failure_mode': failure_mode,
'rpn': item['RPN'],
'severity': item['Severity'],
'occurrence': item['Occurrence'],
'detection': item['Detection']
}
def generate_mitigation_strategies(self) -> pd.DataFrame:
"""Generate risk mitigation strategies
Propose prioritized countermeasures based on RPN values
"""
analysis_df = self.fmea.get_analysis()
strategies = []
for _, row in analysis_df.iterrows():
rpn = row['RPN']
severity = row['Severity']
occurrence = row['Occurrence']
detection = row['Detection']
# Determine countermeasure priority
if severity >= 8:
priority = 'Critical'
action = 'Immediate design change/redundancy'
elif rpn >= 200:
priority = 'High'
action = 'Improve detectability/strengthen preventive maintenance'
elif rpn >= 100:
priority = 'Medium'
action = 'Periodic monitoring/procedure improvement'
else:
priority = 'Low'
action = 'Maintain status quo/periodic review'
# Specific countermeasure proposals
specific_actions = []
if occurrence >= 5:
specific_actions.append('Reduce occurrence: preventive maintenance, shorten part replacement cycle')
if detection >= 7:
specific_actions.append('Improve detectability: add sensors, set alarms')
if severity >= 8:
specific_actions.append('Reduce severity: redundancy, fail-safe design')
strategies.append({
'Component': row['Component'],
'Failure Mode': row['Failure Mode'],
'Current RPN': rpn,
'Priority': priority,
'Recommended Action': action,
'Specific Measures': '; '.join(specific_actions)
})
df = pd.DataFrame(strategies)
return df.sort_values('Current RPN', ascending=False)
def calculate_risk_reduction(
self,
component: str,
new_occurrence: int = None,
new_detection: int = None
) -> Dict:
"""Calculate risk reduction effect after implementing countermeasures
Args:
component: Target component
new_occurrence: Occurrence after countermeasures
new_detection: Detection after countermeasures
Returns:
Analysis results of risk reduction effect
"""
# Get current values
analysis_df = self.fmea.get_analysis()
current = analysis_df[analysis_df['Component'] == component].iloc[0]
current_rpn = current['RPN']
severity = current['Severity']
current_occ = current['Occurrence']
current_det = current['Detection']
# Calculate new RPN
new_occ = new_occurrence if new_occurrence is not None else current_occ
new_det = new_detection if new_detection is not None else current_det
new_rpn = severity * new_occ * new_det
# Risk reduction rate
reduction = ((current_rpn - new_rpn) / current_rpn * 100) if current_rpn > 0 else 0
return {
'Component': component,
'Current RPN': current_rpn,
'New RPN': new_rpn,
'Reduction': reduction,
'Current O-D': f"{current_occ}-{current_det}",
'New O-D': f"{new_occ}-{new_det}",
'Status': 'Improved' if new_rpn < current_rpn else 'No Change'
}
def generate_safety_report(self) -> str:
"""Generate comprehensive safety assessment report"""
analysis_df = self.fmea.get_analysis()
strategies_df = self.generate_mitigation_strategies()
report = []
report.append("=" * 60)
report.append("Comprehensive Safety Assessment Report")
report.append("=" * 60)
report.append("")
# Summary statistics
report.append("【Overall Statistics】")
report.append(f" Number of assessment items: {len(analysis_df)}")
report.append(f" Average RPN: {analysis_df['RPN'].mean():.1f}")
report.append(f" Maximum RPN: {analysis_df['RPN'].max()}")
report.append(f" High-risk items (RPN≥200): {len(analysis_df[analysis_df['RPN'] >= 200])}")
report.append("")
# Number of countermeasures by priority
report.append("【Number of Countermeasures by Priority】")
priority_counts = strategies_df['Priority'].value_counts()
for priority in ['Critical', 'High', 'Medium', 'Low']:
count = priority_counts.get(priority, 0)
report.append(f" {priority}: {count} items")
report.append("")
# Details of Critical items
critical = strategies_df[strategies_df['Priority'] == 'Critical']
if len(critical) > 0:
report.append("【Critical Items (Immediate Response Required)】")
for _, item in critical.iterrows():
report.append(f" ⚠️ {item['Component']} - {item['Failure Mode']}")
report.append(f" RPN: {item['Current RPN']}")
report.append(f" Countermeasure: {item['Recommended Action']}")
report.append("")
return "\n".join(report)
# Practical example: Integrated analysis of reactor system
isa = IntegratedSafetyAnalysis()
# Conduct FMEA (multiple components)
components_data = [
{
'component': 'Temperature Sensor',
'failure_mode': 'Sensor failure (high temperature side)',
'effects': 'Reaction runaway, pressure rise',
'causes': 'Sensor degradation, wiring disconnection',
'severity': 9,
'occurrence': 3,
'detection': 4,
'current_controls': 'Redundant sensors, periodic calibration'
},
{
'component': 'Cooling Water Pump',
'failure_mode': 'Pump stoppage',
'effects': 'Reactor overheating',
'causes': 'Motor failure, bearing wear',
'severity': 8,
'occurrence': 4,
'detection': 2,
'current_controls': 'Flow meter, backup pump'
},
{
'component': 'Pressure Safety Valve',
'failure_mode': 'Malfunction',
'effects': 'Overpressure rupture',
'causes': 'Valve seat seizure',
'severity': 10,
'occurrence': 2,
'detection': 8,
'current_controls': 'Annual inspection'
},
{
'component': 'Raw Material Supply Valve',
'failure_mode': 'Stuck fully open',
'effects': 'Excessive raw material input',
'causes': 'Actuator failure',
'severity': 7,
'occurrence': 3,
'detection': 5,
'current_controls': 'Flow control'
}
]
isa.conduct_fmea(components_data)
# Generate mitigation strategies
strategies = isa.generate_mitigation_strategies()
print("=== Risk Mitigation Strategies ===")
print(strategies[['Component', 'Current RPN', 'Priority', 'Recommended Action']].to_string(index=False))
# Predict countermeasure effects
print("\n=== Countermeasure Effect Simulation ===")
# Temperature sensor: Improve detection from 4→2 with triplication
temp_sensor_effect = isa.calculate_risk_reduction(
'Temperature Sensor',
new_detection=2
)
print(f"【Temperature Sensor Triplication】")
print(f" RPN before improvement: {temp_sensor_effect['Current RPN']}")
print(f" RPN after improvement: {temp_sensor_effect['New RPN']}")
print(f" Risk reduction: {temp_sensor_effect['Reduction']:.1f}%")
# Cooling water pump: Improve occurrence from 4→2 with preventive maintenance
pump_effect = isa.calculate_risk_reduction(
'Cooling Water Pump',
new_occurrence=2
)
print(f"\n【Cooling Water Pump Preventive Maintenance Enhancement】")
print(f" RPN before improvement: {pump_effect['Current RPN']}")
print(f" RPN after improvement: {pump_effect['New RPN']}")
print(f" Risk reduction: {pump_effect['Reduction']:.1f}%")
# Comprehensive report
print("\n" + isa.generate_safety_report())
# Output example:
# === Risk Mitigation Strategies ===
# Component Current RPN Priority Recommended Action
# Pressure Safety Valve 160 Critical Immediate design change/redundancy
# Temperature Sensor 108 Medium Periodic monitoring/procedure improvement
# Raw Material Supply Valve 105 Medium Periodic monitoring/procedure improvement
# Cooling Water Pump 64 Low Maintain status quo/periodic review
Upon completing this chapter, you will be able to: