🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 3: FMEA and Fault Tree Analysis

Systematic Analysis of System Failure Modes and Reliability Assessment

📚 Introduction to Process Safety Assessment ⏱️ Reading Time: 30-35 minutes 🎯 Difficulty: Intermediate

What You'll Learn in This Chapter

3.1 FMEA (Failure Mode and Effects Analysis)

FMEA (Failure Mode and Effects Analysis) is a methodology that systematically analyzes possible failure modes, their effects, occurrence probability, and detectability for each component of a system. In the process industries, it is widely used for equipment reliability assessment and maintenance planning.

3.1.1 Basic FMEA Implementation

The core of FMEA is the calculation of Risk Priority Number (RPN). RPN = Severity(S) × Occurrence(O) × Detection(D) is calculated and evaluated on a 1000-point scale.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

# Example 1: Basic FMEA Implementation
import pandas as pd
import numpy as np
from typing import List, Dict

class FMEA:
    """Failure Mode and Effects Analysis (FMEA) System

    Systematically analyze failure modes, effects, and risks for each equipment
    """

    def __init__(self):
        self.analysis_data = []

    def add_failure_mode(self, component: str, failure_mode: str,
                        effects: str, causes: str,
                        severity: int, occurrence: int, detection: int,
                        current_controls: str = ""):
        """Add failure mode

        Args:
            component: Target component
            failure_mode: Failure mode
            effects: Effects
            causes: Causes
            severity: Severity (1-10)
            occurrence: Occurrence (1-10)
            detection: Detection (1-10)
            current_controls: Current controls
        """
        rpn = severity * occurrence * detection

        self.analysis_data.append({
            'Component': component,
            'Failure Mode': failure_mode,
            'Effects': effects,
            'Causes': causes,
            'Severity': severity,
            'Occurrence': occurrence,
            'Detection': detection,
            'RPN': rpn,
            'Current Controls': current_controls
        })

    def get_analysis(self) -> pd.DataFrame:
        """Get FMEA analysis results"""
        df = pd.DataFrame(self.analysis_data)
        return df.sort_values('RPN', ascending=False)

    def get_high_risk_items(self, threshold: int = 200) -> pd.DataFrame:
        """Extract high-risk items

        Args:
            threshold: RPN threshold (default: RPN ≥ 200 considered high risk)
        """
        df = self.get_analysis()
        return df[df['RPN'] >= threshold]

# Practical example: FMEA for chemical reactor
fmea = FMEA()

# Temperature control system failure modes
fmea.add_failure_mode(
    component="Temperature Sensor",
    failure_mode="Sensor failure (high temperature side)",
    effects="Reaction runaway, pressure rise, explosion risk",
    causes="Sensor degradation, wiring disconnection",
    severity=9,
    occurrence=3,
    detection=4,
    current_controls="Redundant sensors, periodic calibration"
)

fmea.add_failure_mode(
    component="Cooling Water Pump",
    failure_mode="Pump stoppage",
    effects="Reactor overheating, product quality degradation",
    causes="Motor failure, bearing wear",
    severity=8,
    occurrence=4,
    detection=2,
    current_controls="Flow meter alarm, backup pump"
)

fmea.add_failure_mode(
    component="Pressure Safety Valve",
    failure_mode="Malfunction (does not open)",
    effects="Overpressure vessel rupture",
    causes="Valve seat seizure, spring degradation",
    severity=10,
    occurrence=2,
    detection=8,
    current_controls="Annual inspection, pressure test"
)

# Display analysis results
analysis_df = fmea.get_analysis()
print("=== FMEA Analysis Results (RPN order) ===")
print(analysis_df[['Component', 'Failure Mode', 'RPN']].to_string(index=False))
print(f"\nAverage RPN: {analysis_df['RPN'].mean():.1f}")
print(f"Maximum RPN: {analysis_df['RPN'].max()}")

# Extract high-risk items
high_risk = fmea.get_high_risk_items(threshold=150)
print(f"\nHigh-risk items (RPN≥150): {len(high_risk)} items")
print(high_risk[['Component', 'RPN']].to_string(index=False))

# Output example:
# === FMEA Analysis Results (RPN order) ===
# Component              Failure Mode                                RPN
# Pressure Safety Valve  Malfunction (does not open)                 160
# Temperature Sensor     Sensor failure (high temperature side)      108
# Cooling Water Pump     Pump stoppage                                64
#
# Average RPN: 110.7
# Maximum RPN: 160
#
# High-risk items (RPN≥150): 1 item
# Component              RPN
# Pressure Safety Valve  160

3.1.2 Criticality Analysis

Criticality analysis extends FMEA to quantitatively evaluate the impact of failure modes. Criticality of each failure mode = Occurrence probability × Severity is calculated and ranked by importance.

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0

# Example 2: Criticality Analysis
import matplotlib.pyplot as plt

class CriticalityAnalysis:
    """Criticality Analysis

    Quantitatively evaluate the importance of failure modes and determine priorities
    """

    def __init__(self, fmea_data: pd.DataFrame):
        self.data = fmea_data.copy()
        self._calculate_criticality()

    def _calculate_criticality(self):
        """Calculate criticality

        Criticality = (Occurrence/10) * Severity * α
        α: Weighting factor based on detectability (larger for harder to detect)
        """
        # Calculate weighting factor from detection (difficult to detect = larger factor)
        detection_weight = self.data['Detection'] / 10

        # Criticality = Occurrence probability × Severity × Detection difficulty
        self.data['Criticality'] = (
            (self.data['Occurrence'] / 10) *
            self.data['Severity'] *
            detection_weight
        )

        # Criticality ranking (1-4)
        criticality_values = self.data['Criticality']
        self.data['Criticality Rank'] = pd.cut(
            criticality_values,
            bins=[0, 2, 4, 6, 10],
            labels=['Low', 'Medium', 'High', 'Critical']
        )

    def get_critical_items(self, rank: str = 'Critical') -> pd.DataFrame:
        """Get items by criticality rank"""
        return self.data[self.data['Criticality Rank'] == rank]

    def plot_criticality_matrix(self):
        """Plot criticality matrix (Severity vs Occurrence)"""
        fig, ax = plt.subplots(figsize=(10, 8))

        # Bubble chart (bubble size = detection)
        scatter = ax.scatter(
            self.data['Occurrence'],
            self.data['Severity'],
            s=self.data['Detection'] * 50,  # Bubble size
            c=self.data['Criticality'],
            cmap='YlOrRd',
            alpha=0.6,
            edgecolors='black'
        )

        # Label each point
        for idx, row in self.data.iterrows():
            ax.annotate(
                row['Component'],
                (row['Occurrence'], row['Severity']),
                fontsize=8,
                ha='center'
            )

        ax.set_xlabel('Occurrence', fontsize=12)
        ax.set_ylabel('Severity', fontsize=12)
        ax.set_title('Criticality Matrix', fontsize=14)
        ax.grid(True, alpha=0.3)

        # Colorbar
        cbar = plt.colorbar(scatter)
        cbar.set_label('Criticality', fontsize=10)

        return fig

# Practical example
ca = CriticalityAnalysis(analysis_df)

print("=== Criticality Analysis Results ===")
result = ca.data[['Component', 'Criticality', 'Criticality Rank']]
result = result.sort_values('Criticality', ascending=False)
print(result.to_string(index=False))

# Statistics by rank
print("\n=== Statistics by Criticality Rank ===")
rank_stats = ca.data['Criticality Rank'].value_counts().sort_index()
print(rank_stats)

# Details of Critical items
critical_items = ca.get_critical_items('Critical')
if len(critical_items) > 0:
    print(f"\n⚠️  Critical items (highest priority countermeasures): {len(critical_items)} items")
    print(critical_items[['Component', 'Failure Mode', 'Criticality']].to_string(index=False))

# Output example:
# === Criticality Analysis Results ===
# Component              Criticality  Criticality Rank
# Pressure Safety Valve        1.60            Low
# Temperature Sensor           1.08            Low
# Cooling Water Pump           0.64            Low
#
# === Statistics by Criticality Rank ===
# Low         3
# Medium      0
# High        0
# Critical    0

3.2 Fault Tree Analysis (FTA)

Fault Tree Analysis (FTA) represents undesired events (top events) down to basic events that cause them, connected by logic gates (AND/OR) in a tree structure. This allows quantitative evaluation of overall system reliability.

3.2.1 Fault Tree Construction

# Example 3: Fault Tree Construction
from dataclasses import dataclass
from typing import Optional, List
from enum import Enum

class GateType(Enum):
    """Types of logic gates"""
    AND = "AND"  # True when all inputs are true
    OR = "OR"    # True when at least one input is true

@dataclass
class FaultTreeNode:
    """Node of fault tree"""
    name: str
    probability: float = 0.0  # Failure probability
    gate_type: Optional[GateType] = None
    children: List['FaultTreeNode'] = None

    def __post_init__(self):
        if self.children is None:
            self.children = []

class FaultTree:
    """Fault Tree Analysis (FTA) System"""

    def __init__(self, top_event: str):
        self.root = FaultTreeNode(name=top_event)

    def add_gate(self, parent_name: str, gate_type: GateType,
                 children: List[tuple]):
        """Add logic gate

        Args:
            parent_name: Parent node name
            gate_type: Gate type (AND/OR)
            children: [(child node name, failure probability), ...]
        """
        parent = self._find_node(self.root, parent_name)
        if parent is None:
            raise ValueError(f"Node '{parent_name}' not found")

        parent.gate_type = gate_type
        for child_name, prob in children:
            child_node = FaultTreeNode(name=child_name, probability=prob)
            parent.children.append(child_node)

    def _find_node(self, node: FaultTreeNode, name: str) -> Optional[FaultTreeNode]:
        """Search node by name"""
        if node.name == name:
            return node
        for child in node.children:
            result = self._find_node(child, name)
            if result:
                return result
        return None

    def calculate_top_event_probability(self) -> float:
        """Calculate top event occurrence probability"""
        return self._calculate_node_probability(self.root)

    def _calculate_node_probability(self, node: FaultTreeNode) -> float:
        """Recursively calculate node failure probability"""
        # For basic events (leaf nodes)
        if not node.children:
            return node.probability

        # Calculate child node probabilities
        child_probs = [self._calculate_node_probability(child)
                      for child in node.children]

        # Calculate based on gate type
        if node.gate_type == GateType.AND:
            # AND gate: probability that all fail
            node.probability = np.prod(child_probs)
        elif node.gate_type == GateType.OR:
            # OR gate: probability that at least one fails
            # P(A∪B) = 1 - P(A'∩B') = 1 - (1-P(A))(1-P(B))
            node.probability = 1 - np.prod([1 - p for p in child_probs])

        return node.probability

    def print_tree(self, node: Optional[FaultTreeNode] = None,
                   level: int = 0):
        """Display fault tree"""
        if node is None:
            node = self.root

        indent = "  " * level
        gate_str = f" [{node.gate_type.value}]" if node.gate_type else ""
        prob_str = f" (P={node.probability:.4f})" if node.probability > 0 else ""

        print(f"{indent}{node.name}{gate_str}{prob_str}")

        for child in node.children:
            self.print_tree(child, level + 1)

# Practical example: Fault tree for reactor overpressure accident
ft = FaultTree(top_event="Reactor Overpressure Accident")

# Level 1: Direct causes of overpressure (OR gate)
ft.add_gate(
    parent_name="Reactor Overpressure Accident",
    gate_type=GateType.OR,
    children=[
        ("Reaction Runaway", 0.0),
        ("Pressure Control System Failure", 0.0),
        ("Safety Valve Malfunction", 0.002)  # Annual failure probability 0.2%
    ]
)

# Level 2: Causes of reaction runaway (OR gate)
ft.add_gate(
    parent_name="Reaction Runaway",
    gate_type=GateType.OR,
    children=[
        ("Temperature Control Failure", 0.0),
        ("Excessive Raw Material Input", 0.001)
    ]
)

# Level 3: Causes of temperature control failure (AND gate - all occur simultaneously)
ft.add_gate(
    parent_name="Temperature Control Failure",
    gate_type=GateType.AND,
    children=[
        ("Cooling System Failure", 0.01),      # Annual 1%
        ("Temperature Sensor Failure", 0.005)  # Annual 0.5%
    ]
)

# Level 3: Causes of pressure control system failure (OR gate)
ft.add_gate(
    parent_name="Pressure Control System Failure",
    gate_type=GateType.OR,
    children=[
        ("Pressure Sensor Failure", 0.003),
        ("Control Valve Failure", 0.004)
    ]
)

# Calculate top event probability
top_prob = ft.calculate_top_event_probability()

print("=== Fault Tree Analysis Results ===\n")
ft.print_tree()
print(f"\n【Top Event Occurrence Probability】")
print(f"Reactor Overpressure Accident: {top_prob:.6f} (Annual {top_prob*100:.4f}%)")
print(f"Expected frequency: Once every {1/top_prob:.1f} years" if top_prob > 0 else "")

# Output example:
# === Fault Tree Analysis Results ===
#
# Reactor Overpressure Accident [OR] (P=0.0090)
#   Reaction Runaway [OR] (P=0.0010)
#     Temperature Control Failure [AND] (P=0.0001)
#       Cooling System Failure (P=0.0100)
#       Temperature Sensor Failure (P=0.0050)
#     Excessive Raw Material Input (P=0.0010)
#   Pressure Control System Failure [OR] (P=0.0070)
#     Pressure Sensor Failure (P=0.0030)
#     Control Valve Failure (P=0.0040)
#   Safety Valve Malfunction (P=0.0020)
#
# 【Top Event Occurrence Probability】
# Reactor Overpressure Accident: 0.009000 (Annual 0.9000%)
# Expected frequency: Once every 111.1 years

3.2.2 Minimal Cut Set Analysis

Minimal Cut Sets are the minimum combinations of basic events that, when occurring simultaneously, cause the top event to occur. Identifying these allows understanding of the most important failure paths.

# Example 4: Minimal Cut Set Analysis
from itertools import combinations

class MinimalCutSetAnalyzer:
    """Minimal Cut Set Analysis

    Derive minimum combinations of basic events that cause the top event
    """

    def __init__(self, fault_tree: FaultTree):
        self.fault_tree = fault_tree
        self.minimal_cut_sets = []

    def find_minimal_cut_sets(self) -> List[List[str]]:
        """Derive minimal cut sets"""
        self.minimal_cut_sets = []
        self._traverse_tree(self.fault_tree.root, [])

        # Check minimality (exclude supersets of other cut sets)
        self.minimal_cut_sets = self._remove_supersets(self.minimal_cut_sets)

        return self.minimal_cut_sets

    def _traverse_tree(self, node: FaultTreeNode, current_path: List[str]):
        """Traverse fault tree to extract cut sets"""
        # Reached leaf node (basic event)
        if not node.children:
            self.minimal_cut_sets.append(current_path + [node.name])
            return

        if node.gate_type == GateType.AND:
            # AND gate: add all children to same path
            new_path = current_path.copy()
            for child in node.children:
                if not child.children:  # Basic event
                    new_path.append(child.name)
                else:
                    self._traverse_tree(child, new_path)

            # If all are basic events
            if all(not child.children for child in node.children):
                self.minimal_cut_sets.append(new_path)

        elif node.gate_type == GateType.OR:
            # OR gate: expand each child as individual path
            for child in node.children:
                self._traverse_tree(child, current_path.copy())

    def _remove_supersets(self, cut_sets: List[List[str]]) -> List[List[str]]:
        """Exclude non-minimal cut sets (supersets)"""
        minimal = []
        sorted_sets = sorted(cut_sets, key=len)

        for cs in sorted_sets:
            cs_set = set(cs)
            is_minimal = True

            for existing in minimal:
                if set(existing).issubset(cs_set):
                    is_minimal = False
                    break

            if is_minimal:
                minimal.append(cs)

        return minimal

    def calculate_cut_set_probabilities(self) -> pd.DataFrame:
        """Calculate occurrence probability for each cut set"""
        results = []

        for i, cut_set in enumerate(self.minimal_cut_sets, 1):
            # Get probabilities of basic events in cut set
            probabilities = []
            for event_name in cut_set:
                node = self.fault_tree._find_node(
                    self.fault_tree.root, event_name
                )
                if node:
                    probabilities.append(node.probability)

            # Cut set occurrence probability = all events occur simultaneously
            cut_set_prob = np.prod(probabilities) if probabilities else 0

            results.append({
                'Cut Set ID': f"CS{i}",
                'Events': ' AND '.join(cut_set),
                'Order': len(cut_set),
                'Probability': cut_set_prob
            })

        df = pd.DataFrame(results)
        return df.sort_values('Probability', ascending=False)

    def get_importance_measures(self) -> pd.DataFrame:
        """Calculate importance measures for each basic event

        Fussell-Vesely importance: contribution of cut sets containing that event
        """
        # Extract all basic events
        all_events = set()
        for cut_set in self.minimal_cut_sets:
            all_events.update(cut_set)

        # Top event probability
        top_prob = self.fault_tree.calculate_top_event_probability()

        results = []
        for event in all_events:
            # Sum of probabilities of cut sets containing this event
            containing_prob = 0
            for cut_set in self.minimal_cut_sets:
                if event in cut_set:
                    probs = []
                    for e in cut_set:
                        node = self.fault_tree._find_node(
                            self.fault_tree.root, e
                        )
                        if node:
                            probs.append(node.probability)
                    containing_prob += np.prod(probs)

            # Fussell-Vesely importance
            fv_importance = containing_prob / top_prob if top_prob > 0 else 0

            results.append({
                'Event': event,
                'FV Importance': fv_importance,
                'Cut Sets': sum(1 for cs in self.minimal_cut_sets if event in cs)
            })

        df = pd.DataFrame(results)
        return df.sort_values('FV Importance', ascending=False)

# Practical example
mcs_analyzer = MinimalCutSetAnalyzer(ft)
minimal_cuts = mcs_analyzer.find_minimal_cut_sets()

print("\n=== Minimal Cut Sets ===")
for i, cut_set in enumerate(minimal_cuts, 1):
    print(f"CS{i}: {' AND '.join(cut_set)}")

# Cut set probabilities
cut_set_probs = mcs_analyzer.calculate_cut_set_probabilities()
print("\n=== Cut Set Occurrence Probabilities ===")
print(cut_set_probs.to_string(index=False))

# Importance analysis
importance = mcs_analyzer.get_importance_measures()
print("\n=== Importance of Basic Events ===")
print(importance.to_string(index=False))

# Output example:
# === Minimal Cut Sets ===
# CS1: Cooling System Failure AND Temperature Sensor Failure
# CS2: Excessive Raw Material Input
# CS3: Pressure Sensor Failure
# CS4: Control Valve Failure
# CS5: Safety Valve Malfunction
#
# === Cut Set Occurrence Probabilities ===
# Cut Set ID                                        Events  Order  Probability
#       CS2                      Excessive Raw Material Input      1       0.0010
#       CS4                                Control Valve Failure      1       0.0004
#       CS3                            Pressure Sensor Failure      1       0.0003
#       CS5                            Safety Valve Malfunction      1       0.0002
#       CS1  Cooling System Failure AND Temperature Sensor Failure      2       0.0001

3.2.3 Reliability Calculation Implementation

# Requirements:
# - Python 3.9+
# - scipy>=1.11.0

# Example 5: Advanced Reliability Calculation
import scipy.stats as stats

class ReliabilityCalculator:
    """System Reliability Calculation

    Calculate failure rate, MTBF, and availability
    """

    @staticmethod
    def failure_rate_to_probability(lambda_rate: float, time: float) -> float:
        """Calculate failure probability from failure rate

        Args:
            lambda_rate: Failure rate [1/hour]
            time: Time [hour]

        Returns:
            Failure probability P(t) = 1 - exp(-λt)
        """
        return 1 - np.exp(-lambda_rate * time)

    @staticmethod
    def calculate_mtbf(failure_rate: float) -> float:
        """Calculate Mean Time Between Failures (MTBF)

        Args:
            failure_rate: Failure rate [1/hour]

        Returns:
            MTBF [hour]
        """
        return 1 / failure_rate if failure_rate > 0 else float('inf')

    @staticmethod
    def calculate_availability(mtbf: float, mttr: float) -> float:
        """Calculate availability (uptime rate)

        Args:
            mtbf: Mean time between failures [hour]
            mttr: Mean time to repair [hour]

        Returns:
            Availability = MTBF / (MTBF + MTTR)
        """
        return mtbf / (mtbf + mttr)

    @staticmethod
    def weibull_reliability(t: float, beta: float, eta: float) -> float:
        """Reliability function using Weibull distribution

        Args:
            t: Time
            beta: Shape parameter (β<1: early failure, β=1: random failure, β>1: wear-out failure)
            eta: Scale parameter (characteristic life)

        Returns:
            R(t) = exp(-(t/η)^β)
        """
        return np.exp(-((t / eta) ** beta))

    @staticmethod
    def series_system_reliability(reliabilities: List[float]) -> float:
        """Reliability of series system

        R_sys = R1 × R2 × ... × Rn
        """
        return np.prod(reliabilities)

    @staticmethod
    def parallel_system_reliability(reliabilities: List[float]) -> float:
        """Reliability of parallel system (redundancy)

        R_sys = 1 - (1-R1) × (1-R2) × ... × (1-Rn)
        """
        return 1 - np.prod([1 - r for r in reliabilities])

# Practical example
rc = ReliabilityCalculator()

# Equipment specifications
equipment = {
    'Temperature Sensor': {'lambda': 5.7e-6, 'mttr': 4},    # Failure rate [1/h], MTTR [h]
    'Pressure Sensor': {'lambda': 6.2e-6, 'mttr': 4},
    'Control Valve': {'lambda': 3.8e-6, 'mttr': 8},
    'Cooling Pump': {'lambda': 1.2e-5, 'mttr': 12}
}

print("=== Equipment Reliability Analysis (1 year operation) ===\n")

operation_time = 8760  # 1 year = 8760 hours

for name, spec in equipment.items():
    lambda_rate = spec['lambda']
    mttr = spec['mttr']

    # Calculate various metrics
    failure_prob = rc.failure_rate_to_probability(lambda_rate, operation_time)
    reliability = 1 - failure_prob
    mtbf = rc.calculate_mtbf(lambda_rate)
    availability = rc.calculate_availability(mtbf, mttr)

    print(f"【{name}】")
    print(f"  Failure rate: {lambda_rate:.2e} [1/h]")
    print(f"  MTBF: {mtbf:.0f} h ({mtbf/8760:.1f} years)")
    print(f"  MTTR: {mttr} h")
    print(f"  Annual failure probability: {failure_prob*100:.2f}%")
    print(f"  Annual reliability: {reliability*100:.2f}%")
    print(f"  Availability: {availability*100:.4f}%\n")

# Series system (system cannot operate unless all are normal)
series_reliabilities = [
    1 - rc.failure_rate_to_probability(spec['lambda'], operation_time)
    for spec in equipment.values()
]
series_reliability = rc.series_system_reliability(series_reliabilities)

print(f"【Overall Series System】")
print(f"  System annual reliability: {series_reliability*100:.2f}%")
print(f"  System annual failure probability: {(1-series_reliability)*100:.2f}%")

# Parallel system (when sensors are duplicated)
sensor_reliability = 1 - rc.failure_rate_to_probability(5.7e-6, operation_time)
redundant_reliability = rc.parallel_system_reliability([sensor_reliability, sensor_reliability])

print(f"\n【Effect of Temperature Sensor Duplication】")
print(f"  Single: {sensor_reliability*100:.2f}%")
print(f"  Duplicated: {redundant_reliability*100:.4f}%")
print(f"  Reliability improvement: {(redundant_reliability-sensor_reliability)*100:.4f}%")

# Output example:
# === Equipment Reliability Analysis (1 year operation) ===
#
# 【Temperature Sensor】
#   Failure rate: 5.70e-06 [1/h]
#   MTBF: 175439 h (20.0 years)
#   MTTR: 4 h
#   Annual failure probability: 4.90%
#   Annual reliability: 95.10%
#   Availability: 99.9977%

3.2.4 Common Cause Failure Analysis

# Example 6: Common Cause Failure Analysis
class CommonCauseFailureAnalysis:
    """Common Cause Failure (CCF) Analysis

    Evaluate simultaneous failures in redundant systems due to common causes
    """

    def __init__(self):
        self.beta_factors = {}  # β factor (CCF occurrence ratio)

    def set_beta_factor(self, component: str, beta: float):
        """Set β factor

        Args:
            component: Target component
            beta: β factor (0-1, typically 0.05-0.15)
        """
        if not 0 <= beta <= 1:
            raise ValueError("β factor must be specified in the range 0-1")
        self.beta_factors[component] = beta

    def calculate_redundant_reliability_with_ccf(
        self,
        component: str,
        base_reliability: float,
        n_redundant: int = 2
    ) -> float:
        """Reliability of redundant system considering CCF

        Args:
            component: Component name
            base_reliability: Single unit reliability
            n_redundant: Number of redundant units

        Returns:
            System reliability after considering CCF
        """
        beta = self.beta_factors.get(component, 0)

        # Independent failure probability
        independent_failure = (1 - base_reliability) * (1 - beta)

        # CCF failure probability
        ccf_failure = (1 - base_reliability) * beta

        # Redundant system reliability
        # R_sys = (redundancy considering independent failures) × (no CCF)
        independent_reliability = 1 - (independent_failure ** n_redundant)
        ccf_reliability = 1 - ccf_failure

        total_reliability = independent_reliability * ccf_reliability

        return total_reliability

    def compare_ccf_impact(
        self,
        component: str,
        base_reliability: float,
        n_redundant: int = 2
    ) -> Dict:
        """Compare impact of CCF

        Returns:
            Comparison of reliability before and after considering CCF
        """
        # Redundancy effect when CCF is ignored
        without_ccf = 1 - ((1 - base_reliability) ** n_redundant)

        # When CCF is considered
        with_ccf = self.calculate_redundant_reliability_with_ccf(
            component, base_reliability, n_redundant
        )

        # Reliability degradation
        degradation = without_ccf - with_ccf

        return {
            'Component': component,
            'Base Reliability': base_reliability,
            'Redundancy': n_redundant,
            'Beta Factor': self.beta_factors.get(component, 0),
            'Without CCF': without_ccf,
            'With CCF': with_ccf,
            'Degradation': degradation,
            'Degradation %': (degradation / without_ccf * 100) if without_ccf > 0 else 0
        }

# Practical example
ccf = CommonCauseFailureAnalysis()

# Set β factor for each component (typical values based on industrial data)
ccf.set_beta_factor('Temperature Sensor', beta=0.10)  # 10% is CCF
ccf.set_beta_factor('Pressure Sensor', beta=0.12)
ccf.set_beta_factor('Control Valve', beta=0.08)

# Single unit reliability (1 year)
base_reliabilities = {
    'Temperature Sensor': 0.95,
    'Pressure Sensor': 0.95,
    'Control Valve': 0.97
}

print("=== Impact Assessment of Common Cause Failure (CCF) ===\n")

results = []
for component, base_rel in base_reliabilities.items():
    comparison = ccf.compare_ccf_impact(component, base_rel, n_redundant=2)
    results.append(comparison)

    print(f"【{component}】(Duplicated)")
    print(f"  Single unit reliability: {comparison['Base Reliability']*100:.2f}%")
    print(f"  β factor: {comparison['Beta Factor']:.2f}")
    print(f"  Without CCF: {comparison['Without CCF']*100:.4f}%")
    print(f"  With CCF: {comparison['With CCF']*100:.4f}%")
    print(f"  Reliability degradation: {comparison['Degradation']*100:.4f}% "
          f"({comparison['Degradation %']:.2f}% decrease)\n")

# Compare results in DataFrame
df_ccf = pd.DataFrame(results)
print("=== CCF Impact Summary ===")
print(df_ccf[['Component', 'Beta Factor', 'Without CCF', 'With CCF', 'Degradation %']])

# Output example:
# === Impact Assessment of Common Cause Failure (CCF) ===
#
# 【Temperature Sensor】(Duplicated)
#   Single unit reliability: 95.00%
#   β factor: 0.10
#   Without CCF: 99.7500%
#   With CCF: 99.2525%
#   Reliability degradation: 0.4975% (0.50% decrease)

3.3 Integrated FMEA/FTA Analysis

By integrating FMEA and FTA, comprehensive safety assessment from component level to system level becomes possible. High-risk items identified by FMEA are quantitatively evaluated with FTA.

# Example 7: Integrated FMEA/FTA Analysis System
class IntegratedSafetyAnalysis:
    """Integrated FMEA/FTA Safety Assessment System

    Quantitatively evaluate risks identified by FMEA with FTA and propose optimal countermeasures
    """

    def __init__(self):
        self.fmea = FMEA()
        self.fault_trees = {}
        self.mitigation_strategies = []

    def conduct_fmea(self, components: List[Dict]):
        """Conduct FMEA

        Args:
            components: [{component, failure_mode, ...}, ...]
        """
        for comp in components:
            self.fmea.add_failure_mode(**comp)

    def build_fault_tree_for_high_risk(self, threshold: int = 200):
        """Build fault tree for high-risk items"""
        high_risk_items = self.fmea.get_high_risk_items(threshold)

        for _, item in high_risk_items.iterrows():
            component = item['Component']
            failure_mode = item['Failure Mode']

            # Build fault tree (simplified)
            ft_name = f"{component}_{failure_mode}"
            self.fault_trees[ft_name] = {
                'component': component,
                'failure_mode': failure_mode,
                'rpn': item['RPN'],
                'severity': item['Severity'],
                'occurrence': item['Occurrence'],
                'detection': item['Detection']
            }

    def generate_mitigation_strategies(self) -> pd.DataFrame:
        """Generate risk mitigation strategies

        Propose prioritized countermeasures based on RPN values
        """
        analysis_df = self.fmea.get_analysis()

        strategies = []
        for _, row in analysis_df.iterrows():
            rpn = row['RPN']
            severity = row['Severity']
            occurrence = row['Occurrence']
            detection = row['Detection']

            # Determine countermeasure priority
            if severity >= 8:
                priority = 'Critical'
                action = 'Immediate design change/redundancy'
            elif rpn >= 200:
                priority = 'High'
                action = 'Improve detectability/strengthen preventive maintenance'
            elif rpn >= 100:
                priority = 'Medium'
                action = 'Periodic monitoring/procedure improvement'
            else:
                priority = 'Low'
                action = 'Maintain status quo/periodic review'

            # Specific countermeasure proposals
            specific_actions = []
            if occurrence >= 5:
                specific_actions.append('Reduce occurrence: preventive maintenance, shorten part replacement cycle')
            if detection >= 7:
                specific_actions.append('Improve detectability: add sensors, set alarms')
            if severity >= 8:
                specific_actions.append('Reduce severity: redundancy, fail-safe design')

            strategies.append({
                'Component': row['Component'],
                'Failure Mode': row['Failure Mode'],
                'Current RPN': rpn,
                'Priority': priority,
                'Recommended Action': action,
                'Specific Measures': '; '.join(specific_actions)
            })

        df = pd.DataFrame(strategies)
        return df.sort_values('Current RPN', ascending=False)

    def calculate_risk_reduction(
        self,
        component: str,
        new_occurrence: int = None,
        new_detection: int = None
    ) -> Dict:
        """Calculate risk reduction effect after implementing countermeasures

        Args:
            component: Target component
            new_occurrence: Occurrence after countermeasures
            new_detection: Detection after countermeasures

        Returns:
            Analysis results of risk reduction effect
        """
        # Get current values
        analysis_df = self.fmea.get_analysis()
        current = analysis_df[analysis_df['Component'] == component].iloc[0]

        current_rpn = current['RPN']
        severity = current['Severity']
        current_occ = current['Occurrence']
        current_det = current['Detection']

        # Calculate new RPN
        new_occ = new_occurrence if new_occurrence is not None else current_occ
        new_det = new_detection if new_detection is not None else current_det
        new_rpn = severity * new_occ * new_det

        # Risk reduction rate
        reduction = ((current_rpn - new_rpn) / current_rpn * 100) if current_rpn > 0 else 0

        return {
            'Component': component,
            'Current RPN': current_rpn,
            'New RPN': new_rpn,
            'Reduction': reduction,
            'Current O-D': f"{current_occ}-{current_det}",
            'New O-D': f"{new_occ}-{new_det}",
            'Status': 'Improved' if new_rpn < current_rpn else 'No Change'
        }

    def generate_safety_report(self) -> str:
        """Generate comprehensive safety assessment report"""
        analysis_df = self.fmea.get_analysis()
        strategies_df = self.generate_mitigation_strategies()

        report = []
        report.append("=" * 60)
        report.append("Comprehensive Safety Assessment Report")
        report.append("=" * 60)
        report.append("")

        # Summary statistics
        report.append("【Overall Statistics】")
        report.append(f"  Number of assessment items: {len(analysis_df)}")
        report.append(f"  Average RPN: {analysis_df['RPN'].mean():.1f}")
        report.append(f"  Maximum RPN: {analysis_df['RPN'].max()}")
        report.append(f"  High-risk items (RPN≥200): {len(analysis_df[analysis_df['RPN'] >= 200])}")
        report.append("")

        # Number of countermeasures by priority
        report.append("【Number of Countermeasures by Priority】")
        priority_counts = strategies_df['Priority'].value_counts()
        for priority in ['Critical', 'High', 'Medium', 'Low']:
            count = priority_counts.get(priority, 0)
            report.append(f"  {priority}: {count} items")
        report.append("")

        # Details of Critical items
        critical = strategies_df[strategies_df['Priority'] == 'Critical']
        if len(critical) > 0:
            report.append("【Critical Items (Immediate Response Required)】")
            for _, item in critical.iterrows():
                report.append(f"  ⚠️  {item['Component']} - {item['Failure Mode']}")
                report.append(f"      RPN: {item['Current RPN']}")
                report.append(f"      Countermeasure: {item['Recommended Action']}")
            report.append("")

        return "\n".join(report)

# Practical example: Integrated analysis of reactor system
isa = IntegratedSafetyAnalysis()

# Conduct FMEA (multiple components)
components_data = [
    {
        'component': 'Temperature Sensor',
        'failure_mode': 'Sensor failure (high temperature side)',
        'effects': 'Reaction runaway, pressure rise',
        'causes': 'Sensor degradation, wiring disconnection',
        'severity': 9,
        'occurrence': 3,
        'detection': 4,
        'current_controls': 'Redundant sensors, periodic calibration'
    },
    {
        'component': 'Cooling Water Pump',
        'failure_mode': 'Pump stoppage',
        'effects': 'Reactor overheating',
        'causes': 'Motor failure, bearing wear',
        'severity': 8,
        'occurrence': 4,
        'detection': 2,
        'current_controls': 'Flow meter, backup pump'
    },
    {
        'component': 'Pressure Safety Valve',
        'failure_mode': 'Malfunction',
        'effects': 'Overpressure rupture',
        'causes': 'Valve seat seizure',
        'severity': 10,
        'occurrence': 2,
        'detection': 8,
        'current_controls': 'Annual inspection'
    },
    {
        'component': 'Raw Material Supply Valve',
        'failure_mode': 'Stuck fully open',
        'effects': 'Excessive raw material input',
        'causes': 'Actuator failure',
        'severity': 7,
        'occurrence': 3,
        'detection': 5,
        'current_controls': 'Flow control'
    }
]

isa.conduct_fmea(components_data)

# Generate mitigation strategies
strategies = isa.generate_mitigation_strategies()
print("=== Risk Mitigation Strategies ===")
print(strategies[['Component', 'Current RPN', 'Priority', 'Recommended Action']].to_string(index=False))

# Predict countermeasure effects
print("\n=== Countermeasure Effect Simulation ===")

# Temperature sensor: Improve detection from 4→2 with triplication
temp_sensor_effect = isa.calculate_risk_reduction(
    'Temperature Sensor',
    new_detection=2
)
print(f"【Temperature Sensor Triplication】")
print(f"  RPN before improvement: {temp_sensor_effect['Current RPN']}")
print(f"  RPN after improvement: {temp_sensor_effect['New RPN']}")
print(f"  Risk reduction: {temp_sensor_effect['Reduction']:.1f}%")

# Cooling water pump: Improve occurrence from 4→2 with preventive maintenance
pump_effect = isa.calculate_risk_reduction(
    'Cooling Water Pump',
    new_occurrence=2
)
print(f"\n【Cooling Water Pump Preventive Maintenance Enhancement】")
print(f"  RPN before improvement: {pump_effect['Current RPN']}")
print(f"  RPN after improvement: {pump_effect['New RPN']}")
print(f"  Risk reduction: {pump_effect['Reduction']:.1f}%")

# Comprehensive report
print("\n" + isa.generate_safety_report())

# Output example:
# === Risk Mitigation Strategies ===
# Component                   Current RPN  Priority                         Recommended Action
# Pressure Safety Valve               160  Critical  Immediate design change/redundancy
# Temperature Sensor                  108    Medium  Periodic monitoring/procedure improvement
# Raw Material Supply Valve           105    Medium  Periodic monitoring/procedure improvement
# Cooling Water Pump                   64       Low  Maintain status quo/periodic review

Checking Learning Objectives

Upon completing this chapter, you will be able to:

Back to Introduction to Process Safety Assessment Table of Contents

PI Practical Technology Top | Back to Home

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer