🌐 EN | šŸ‡ÆšŸ‡µ JP | Last sync: 2025-11-16

Chapter 2: HAZOP and Risk Assessment

Complete Implementation of Guide Word Application, Deviation Analysis, and Quantitative Risk Assessment (QRA)

šŸ“– Reading Time: 30-35 minutes šŸ“Š Difficulty: Intermediate šŸ’» Code Examples: 8

This chapter covers HAZOP and Risk Assessment. You will learn Applying Guide Words (No, Conducting Deviation Analysis, and Implementing Quantitative Risk Assessment (QRA).

Learning Objectives

By completing this chapter, you will master:


2.1 Fundamentals of HAZOP Study

What is HAZOP?

HAZOP (Hazard and Operability Study) is a methodology for systematically identifying hazards and operational problems at the process design stage. Developed by ICI in the 1960s, it is now a standard for chemical plant design.

HAZOP Principles

HAZOP applies Guide Words to Process Parameters to generate Deviations, then analyzes their causes, consequences, and countermeasures.

$$ \text{Deviation} = \text{Guide Word} + \text{Process Parameter} $$

Key Guide Words

Guide Word Meaning Application Example
No/Not/None Complete negation No flow, No pressure
More Quantitative increase More flow, More temperature
Less Quantitative decrease Less flow, Less pressure
As Well As Addition Impurity as well as product
Part Of Partial absence Part of composition (missing component)
Reverse Opposite direction Reverse flow
Other Than Completely different Other than normal operation

Example 1: HAZOP Guide Word Engine

# ===================================
# Example 1: HAZOP Guide Word Engine
# ===================================

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class GuideWord(Enum):
    """HAZOP Guide Words"""
    NO = "No/Not/None"
    MORE = "More"
    LESS = "Less"
    AS_WELL_AS = "As Well As"
    PART_OF = "Part Of"
    REVERSE = "Reverse"
    OTHER_THAN = "Other Than"

class ProcessParameter(Enum):
    """Process Parameters"""
    FLOW = "Flow"
    PRESSURE = "Pressure"
    TEMPERATURE = "Temperature"
    LEVEL = "Level"
    COMPOSITION = "Composition"
    VISCOSITY = "Viscosity"
    TIME = "Time"
    MIXING = "Mixing"

@dataclass
class Deviation:
    """Deviation"""
    guide_word: GuideWord
    parameter: ProcessParameter
    description: str
    potential_causes: List[str]
    potential_consequences: List[str]
    existing_safeguards: List[str]
    recommendations: List[str]
    risk_rank: str  # High/Medium/Low

class HAZOPGuideWordEngine:
    """HAZOP Guide Word Application Engine"""

    def __init__(self):
        # Guide word and parameter combination rules
        self.valid_combinations = {
            GuideWord.NO: [ProcessParameter.FLOW, ProcessParameter.PRESSURE,
                           ProcessParameter.LEVEL, ProcessParameter.MIXING],
            GuideWord.MORE: [ProcessParameter.FLOW, ProcessParameter.PRESSURE,
                             ProcessParameter.TEMPERATURE, ProcessParameter.LEVEL],
            GuideWord.LESS: [ProcessParameter.FLOW, ProcessParameter.PRESSURE,
                             ProcessParameter.TEMPERATURE, ProcessParameter.LEVEL],
            GuideWord.REVERSE: [ProcessParameter.FLOW],
            GuideWord.AS_WELL_AS: [ProcessParameter.COMPOSITION],
            GuideWord.PART_OF: [ProcessParameter.COMPOSITION],
            GuideWord.OTHER_THAN: [ProcessParameter.COMPOSITION, ProcessParameter.TIME]
        }

    def is_valid_combination(self, guide_word: GuideWord,
                             parameter: ProcessParameter) -> bool:
        """Check if guide word and parameter combination is valid"""
        if guide_word in self.valid_combinations:
            return parameter in self.valid_combinations[guide_word]
        return False

    def generate_deviation_description(self, guide_word: GuideWord,
                                        parameter: ProcessParameter,
                                        node_name: str = "Node") -> str:
        """Generate deviation description"""
        return f"{guide_word.value} {parameter.value} at {node_name}"

    def apply_guide_words(self, node_name: str,
                          parameters: List[ProcessParameter]) -> List[str]:
        """Apply guide words to a node"""
        deviations = []

        for param in parameters:
            for guide_word in GuideWord:
                if self.is_valid_combination(guide_word, param):
                    deviation = self.generate_deviation_description(
                        guide_word, param, node_name
                    )
                    deviations.append(deviation)

        return deviations


# Usage example
hazop_engine = HAZOPGuideWordEngine()

# Node: Reactor feed line
node = "Reactor Feed Line (P&ID: R-101)"
parameters = [
    ProcessParameter.FLOW,
    ProcessParameter.PRESSURE,
    ProcessParameter.TEMPERATURE,
    ProcessParameter.COMPOSITION
]

# Apply guide words
deviations = hazop_engine.apply_guide_words(node, parameters)

print(f"=== HAZOP Node: {node} ===\n")
print(f"Total Deviations Generated: {len(deviations)}\n")
print("Deviations:")
for i, dev in enumerate(deviations, 1):
    print(f"{i:2d}. {dev}")

# Expected output:
# === HAZOP Node: Reactor Feed Line (P&ID: R-101) ===
#
# Total Deviations Generated: 13
#
# Deviations:
#  1. No/Not/None Flow at Reactor Feed Line (P&ID: R-101)
#  2. More Flow at Reactor Feed Line (P&ID: R-101)
#  3. Less Flow at Reactor Feed Line (P&ID: R-101)
#  4. Reverse Flow at Reactor Feed Line (P&ID: R-101)
#  5. No/Not/None Pressure at Reactor Feed Line (P&ID: R-101)
#  6. More Pressure at Reactor Feed Line (P&ID: R-101)
#  7. Less Pressure at Reactor Feed Line (P&ID: R-101)
#  8. More Temperature at Reactor Feed Line (P&ID: R-101)
#  9. Less Temperature at Reactor Feed Line (P&ID: R-101)
# 10. As Well As Composition at Reactor Feed Line (P&ID: R-101)
# 11. Part Of Composition at Reactor Feed Line (P&ID: R-101)
# 12. Other Than Composition at Reactor Feed Line (P&ID: R-101)
# 13. Other Than Time at Reactor Feed Line (P&ID: R-101)

2.2 Deviation Analysis

Example 2: Deviation Analysis System Implementation

Analyze causes, consequences, existing safeguards, and recommendations for each deviation.

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0

# ===================================
# Example 2: Deviation Analysis System
# ===================================

import pandas as pd

class DeviationAnalyzer:
    """Deviation Analysis System"""

    def __init__(self, node_name: str, equipment_type: str):
        self.node_name = node_name
        self.equipment_type = equipment_type
        self.deviations: List[Deviation] = []
        self._load_knowledge_base()

    def _load_knowledge_base(self):
        """Load knowledge base (typical causes and consequences)"""
        # In actual systems, learn from past HAZOP databases

        self.knowledge_base = {
            ("NO", "FLOW"): {
                "causes": [
                    "Pump failure",
                    "Valve closed (inadvertent or maintenance)",
                    "Line blockage (solidification, fouling)",
                    "Suction strainer plugged",
                    "Upstream vessel empty"
                ],
                "consequences": [
                    "Reactor starvation (loss of cooling/reactant)",
                    "Pump cavitation/damage",
                    "Product off-spec",
                    "Temperature excursion (if flow is coolant)"
                ],
                "safeguards": [
                    "Low flow alarm",
                    "Pump status indication",
                    "Level indication on upstream vessel"
                ]
            },
            ("MORE", "FLOW"): {
                "causes": [
                    "Control valve fails open",
                    "Pump speed increase (VFD malfunction)",
                    "Upstream pressure increase",
                    "Parallel line opened unintentionally"
                ],
                "consequences": [
                    "Reactor flooding",
                    "Downstream vessel overflow",
                    "Product off-spec (dilution)",
                    "Increased pressure drop"
                ],
                "safeguards": [
                    "High flow alarm",
                    "Level high alarm on receiving vessel",
                    "Flow control with override"
                ]
            },
            ("MORE", "TEMPERATURE"): {
                "causes": [
                    "Cooling system failure",
                    "Heat exchanger fouling (reduced heat transfer)",
                    "Exothermic reaction runaway",
                    "External fire",
                    "Temperature transmitter failure"
                ],
                "consequences": [
                    "Thermal decomposition",
                    "Reactor overpressure",
                    "Product degradation",
                    "Equipment damage (metallurgy limits)"
                ],
                "safeguards": [
                    "High temperature alarm",
                    "Independent high temperature interlock",
                    "Pressure relief valve",
                    "Emergency cooling system"
                ]
            },
            ("REVERSE", "FLOW"): {
                "causes": [
                    "Pump stopped with downstream pressure higher",
                    "Check valve failure",
                    "Incorrect valve lineup",
                    "Siphon effect"
                ],
                "consequences": [
                    "Contamination of upstream system",
                    "Pump damage (reverse rotation)",
                    "Loss of containment",
                    "Mixing of incompatible materials"
                ],
                "safeguards": [
                    "Check valve",
                    "Backflow prevention valve",
                    "Pump discharge valve"
                ]
            }
        }

    def analyze_deviation(self, guide_word: GuideWord,
                          parameter: ProcessParameter) -> Deviation:
        """Analyze deviation"""

        # Retrieve information from knowledge base
        key = (guide_word.name, parameter.name)
        kb_entry = self.knowledge_base.get(key, {
            "causes": ["To be determined in HAZOP meeting"],
            "consequences": ["To be determined in HAZOP meeting"],
            "safeguards": ["To be determined in HAZOP meeting"]
        })

        # Risk ranking (simplified)
        high_risk_combos = [
            ("NO", "FLOW"), ("MORE", "TEMPERATURE"), ("MORE", "PRESSURE")
        ]
        risk_rank = "High" if key in high_risk_combos else "Medium"

        # Generate recommendations
        recommendations = self._generate_recommendations(
            guide_word, parameter, kb_entry["safeguards"]
        )

        deviation = Deviation(
            guide_word=guide_word,
            parameter=parameter,
            description=f"{guide_word.value} {parameter.value} at {self.node_name}",
            potential_causes=kb_entry["causes"],
            potential_consequences=kb_entry["consequences"],
            existing_safeguards=kb_entry["safeguards"],
            recommendations=recommendations,
            risk_rank=risk_rank
        )

        self.deviations.append(deviation)
        return deviation

    def _generate_recommendations(self, guide_word: GuideWord,
                                   parameter: ProcessParameter,
                                   existing_safeguards: List[str]) -> List[str]:
        """Generate recommendations"""
        recommendations = []

        # Check for missing alarms
        if "alarm" not in ' '.join(existing_safeguards).lower():
            recommendations.append(
                f"Install {parameter.value.lower()} alarm"
            )

        # Check for missing interlocks
        if guide_word in [GuideWord.MORE, GuideWord.NO] and \
           "interlock" not in ' '.join(existing_safeguards).lower():
            recommendations.append(
                f"Consider {parameter.value.lower()} interlock (SIL assessment required)"
            )

        # Redundancy check
        if len(existing_safeguards) < 2:
            recommendations.append(
                "Review adequacy of protection layers (LOPA recommended)"
            )

        if not recommendations:
            recommendations.append("Existing safeguards adequate - continue routine maintenance")

        return recommendations

    def generate_hazop_worksheet(self) -> pd.DataFrame:
        """Generate HAZOP worksheet"""
        data = []

        for dev in self.deviations:
            data.append({
                'Node': self.node_name,
                'Deviation': dev.description,
                'Causes': '; '.join(dev.potential_causes[:2]) + '...',  # First 2
                'Consequences': '; '.join(dev.potential_consequences[:2]) + '...',
                'Safeguards': '; '.join(dev.existing_safeguards[:2]) + '...',
                'Risk': dev.risk_rank,
                'Actions': '; '.join(dev.recommendations[:1])  # Top priority recommendation
            })

        return pd.DataFrame(data)


# Usage example
analyzer = DeviationAnalyzer(
    node_name="Reactor Feed Line (R-101)",
    equipment_type="Piping"
)

# Analyze critical deviations
critical_deviations = [
    (GuideWord.NO, ProcessParameter.FLOW),
    (GuideWord.MORE, ProcessParameter.FLOW),
    (GuideWord.MORE, ProcessParameter.TEMPERATURE),
    (GuideWord.REVERSE, ProcessParameter.FLOW)
]

for gw, param in critical_deviations:
    analyzer.analyze_deviation(gw, param)

# Generate HAZOP worksheet
worksheet = analyzer.generate_hazop_worksheet()

print("=== HAZOP Worksheet ===\n")
print(worksheet.to_string(index=False))

print("\n=== Detailed Analysis: No Flow ===")
no_flow_dev = analyzer.deviations[0]
print(f"\nDeviation: {no_flow_dev.description}")
print(f"\nPotential Causes:")
for cause in no_flow_dev.potential_causes:
    print(f"  - {cause}")
print(f"\nPotential Consequences:")
for cons in no_flow_dev.potential_consequences:
    print(f"  - {cons}")
print(f"\nRecommendations:")
for rec in no_flow_dev.recommendations:
    print(f"  - {rec}")

# Expected output:
# === HAZOP Worksheet ===
#
#                     Node                           Deviation  Risk ...
#  Reactor Feed Line (R-101)    No/Not/None Flow at Reactor... High ...
#  Reactor Feed Line (R-101)          More Flow at Reactor... Medium ...
#  Reactor Feed Line (R-101)   More Temperature at Reactor... High ...
#  Reactor Feed Line (R-101)       Reverse Flow at Reactor... Medium ...

2.3 HAZOP Automation System

Example 3: P&ID Analysis and HAZOP Auto-generation

Automatically generate HAZOP nodes and deviations from P&ID (Piping and Instrumentation Diagram) information.

# ===================================
# Example 3: P&ID Analysis and HAZOP Auto-generation
# ===================================

from dataclasses import dataclass
from typing import List, Dict, Set

@dataclass
class PIDElement:
    """P&ID Element"""
    id: str
    element_type: str  # Vessel, Pump, HeatExchanger, Valve, Line
    fluid: str
    design_pressure: float  # MPa
    design_temperature: float  # °C
    connected_to: List[str]  # Connected element IDs

@dataclass
class HAZOPNode:
    """HAZOP Node (study unit)"""
    id: str
    name: str
    elements: List[PIDElement]
    intent: str  # Design Intent
    parameters_of_interest: List[ProcessParameter]

class PIDHAZOPAutomation:
    """P&ID-based HAZOP Automation System"""

    def __init__(self):
        self.pid_elements: Dict[str, PIDElement] = {}
        self.hazop_nodes: List[HAZOPNode] = []

    def add_pid_element(self, element: PIDElement):
        """Add P&ID element"""
        self.pid_elements[element.id] = element

    def identify_hazop_nodes(self) -> List[HAZOPNode]:
        """Automatically identify HAZOP nodes

        Node division criteria:
          - Functional units (reaction, separation, heat exchange)
          - Process condition change points (phase change, temperature/pressure change)
          - Control loop boundaries
        """
        nodes = []

        # Simplified: Treat each major equipment as a node
        for elem_id, elem in self.pid_elements.items():
            if elem.element_type in ['Vessel', 'Pump', 'HeatExchanger']:
                # Infer design intent
                intent = self._infer_design_intent(elem)

                # Determine relevant parameters
                parameters = self._determine_parameters(elem)

                node = HAZOPNode(
                    id=f"NODE-{elem_id}",
                    name=f"{elem.element_type}: {elem_id}",
                    elements=[elem],
                    intent=intent,
                    parameters_of_interest=parameters
                )

                nodes.append(node)

        self.hazop_nodes = nodes
        return nodes

    def _infer_design_intent(self, element: PIDElement) -> str:
        """Infer design intent"""
        intents = {
            'Vessel': f"Contain {element.fluid} at {element.design_pressure} MPa, {element.design_temperature}°C",
            'Pump': f"Transfer {element.fluid} at design flow rate",
            'HeatExchanger': f"Heat/cool {element.fluid} to target temperature"
        }
        return intents.get(element.element_type, "To be determined")

    def _determine_parameters(self, element: PIDElement) -> List[ProcessParameter]:
        """Determine parameters to study"""
        param_map = {
            'Vessel': [ProcessParameter.LEVEL, ProcessParameter.PRESSURE,
                       ProcessParameter.TEMPERATURE, ProcessParameter.COMPOSITION],
            'Pump': [ProcessParameter.FLOW, ProcessParameter.PRESSURE],
            'HeatExchanger': [ProcessParameter.FLOW, ProcessParameter.TEMPERATURE,
                              ProcessParameter.PRESSURE]
        }
        return param_map.get(element.element_type,
                             [ProcessParameter.FLOW, ProcessParameter.PRESSURE])

    def generate_hazop_study(self) -> Dict:
        """Generate complete HAZOP study"""

        # Identify nodes
        if not self.hazop_nodes:
            self.identify_hazop_nodes()

        # Conduct HAZOP for each node
        study_results = []

        for node in self.hazop_nodes:
            analyzer = DeviationAnalyzer(node.name, node.elements[0].element_type)

            # Apply guide words
            for param in node.parameters_of_interest:
                for guide_word in GuideWord:
                    if analyzer._load_knowledge_base() or True:  # simplified
                        # Analyze only valid combinations
                        engine = HAZOPGuideWordEngine()
                        if engine.is_valid_combination(guide_word, param):
                            try:
                                deviation = analyzer.analyze_deviation(guide_word, param)
                                study_results.append({
                                    'Node': node.name,
                                    'Intent': node.intent,
                                    'Deviation': deviation.description,
                                    'Risk': deviation.risk_rank
                                })
                            except:
                                pass

        return {
            'total_nodes': len(self.hazop_nodes),
            'total_deviations': len(study_results),
            'results': pd.DataFrame(study_results)
        }


# Usage example: Build P&ID for simple process
pid_system = PIDHAZOPAutomation()

# Add P&ID elements
pid_system.add_pid_element(PIDElement(
    id="R-101",
    element_type="Vessel",
    fluid="Ethylene/Catalyst",
    design_pressure=3.0,
    design_temperature=150,
    connected_to=["P-101", "E-101"]
))

pid_system.add_pid_element(PIDElement(
    id="P-101",
    element_type="Pump",
    fluid="Product mixture",
    design_pressure=5.0,
    design_temperature=80,
    connected_to=["R-101", "T-201"]
))

pid_system.add_pid_element(PIDElement(
    id="E-101",
    element_type="HeatExchanger",
    fluid="Reactor coolant",
    design_pressure=1.5,
    design_temperature=100,
    connected_to=["R-101"]
))

# Automatically identify HAZOP nodes
nodes = pid_system.identify_hazop_nodes()
print(f"=== HAZOP Nodes Identified: {len(nodes)} ===\n")
for node in nodes:
    print(f"Node: {node.name}")
    print(f"  Intent: {node.intent}")
    print(f"  Parameters: {[p.value for p in node.parameters_of_interest]}\n")

# Auto-generate HAZOP study
study = pid_system.generate_hazop_study()
print(f"\n=== HAZOP Study Summary ===")
print(f"Total Nodes: {study['total_nodes']}")
print(f"Total Deviations: {study['total_deviations']}\n")
print(study['results'].head(10).to_string(index=False))

# Expected output:
# === HAZOP Nodes Identified: 3 ===
#
# Node: Vessel: R-101
#   Intent: Contain Ethylene/Catalyst at 3.0 MPa, 150°C
#   Parameters: ['Level', 'Pressure', 'Temperature', 'Composition']
#
# Node: Pump: P-101
#   Intent: Transfer Product mixture at design flow rate
#   Parameters: ['Flow', 'Pressure']
# ...

2.4 Quantitative Risk Assessment (QRA)

Example 4: Event Tree Analysis (ETA)

Probabilistically evaluate the event sequence from initiating event to final outcome.

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0

# ===================================
# Example 4: Event Tree Analysis (ETA)
# ===================================

from dataclasses import dataclass
from typing import List, Optional
import matplotlib.pyplot as plt
import numpy as np

@dataclass
class EventNode:
    """Event tree node"""
    name: str
    success_probability: float
    failure_probability: float

    def __post_init__(self):
        assert abs(self.success_probability + self.failure_probability - 1.0) < 1e-6, \
            "Success + Failure probability must equal 1.0"

@dataclass
class Outcome:
    """Final outcome"""
    path: List[str]
    probability: float
    consequence: str
    severity: str  # Catastrophic, Critical, Marginal, Negligible

class EventTreeAnalysis:
    """Event Tree Analysis System"""

    def __init__(self, initiating_event: str, frequency: float):
        """
        Args:
            initiating_event: Initiating event name
            frequency: Initiating event frequency (events/year)
        """
        self.initiating_event = initiating_event
        self.frequency = frequency
        self.safety_functions: List[EventNode] = []
        self.outcomes: List[Outcome] = []

    def add_safety_function(self, node: EventNode):
        """Add safety function"""
        self.safety_functions.append(node)

    def calculate_outcomes(self):
        """Calculate all outcome paths and probabilities"""
        self.outcomes = []

        # Generate all possible combinations (2^n paths)
        n_functions = len(self.safety_functions)
        for i in range(2 ** n_functions):
            path = []
            probability = self.frequency

            # Determine Success/Failure using binary representation
            binary = format(i, f'0{n_functions}b')

            for j, bit in enumerate(binary):
                function = self.safety_functions[j]
                if bit == '0':  # Success
                    path.append(f"{function.name}: Success")
                    probability *= function.success_probability
                else:  # Failure
                    path.append(f"{function.name}: Failure")
                    probability *= function.failure_probability

            # Determine consequence severity
            failure_count = binary.count('1')
            consequence, severity = self._determine_consequence(failure_count)

            self.outcomes.append(Outcome(
                path=path,
                probability=probability,
                consequence=consequence,
                severity=severity
            ))

    def _determine_consequence(self, failure_count: int) -> tuple:
        """Determine consequence based on failure count"""
        n = len(self.safety_functions)

        if failure_count == 0:
            return "Safe shutdown", "Negligible"
        elif failure_count == 1:
            return "Minor release (contained by secondary barrier)", "Marginal"
        elif failure_count == 2:
            return "Significant release (requires emergency response)", "Critical"
        else:
            return "Major release (offsite consequences)", "Catastrophic"

    def generate_eta_report(self) -> str:
        """Generate event tree analysis report"""
        if not self.outcomes:
            self.calculate_outcomes()

        report = f"""
{'='*80}
EVENT TREE ANALYSIS (ETA) REPORT
{'='*80}

Initiating Event: {self.initiating_event}
Frequency: {self.frequency:.2e} events/year

Safety Functions:
"""

        for i, sf in enumerate(self.safety_functions, 1):
            report += f"{i}. {sf.name}\n"
            report += f"   Success Prob: {sf.success_probability:.4f} ({sf.success_probability*100:.2f}%)\n"
            report += f"   Failure Prob: {sf.failure_probability:.4f} ({sf.failure_probability*100:.2f}%)\n\n"

        report += f"""
{'='*80}
OUTCOMES
{'='*80}
"""

        # Sort by severity
        severity_order = {"Catastrophic": 0, "Critical": 1, "Marginal": 2, "Negligible": 3}
        sorted_outcomes = sorted(self.outcomes,
                                 key=lambda x: (severity_order[x.severity], -x.probability))

        for i, outcome in enumerate(sorted_outcomes, 1):
            report += f"\nOutcome {i}: {outcome.consequence}\n"
            report += f"  Severity: {outcome.severity}\n"
            report += f"  Frequency: {outcome.probability:.2e} events/year\n"
            report += f"  Path:\n"
            for step in outcome.path:
                report += f"    → {step}\n"

        # Calculate total risk
        total_risk = sum(o.probability for o in self.outcomes)
        catastrophic_risk = sum(o.probability for o in self.outcomes
                                if o.severity == "Catastrophic")

        report += f"""
{'='*80}
RISK SUMMARY
{'='*80}
Total Event Frequency (all paths): {total_risk:.2e} events/year
Catastrophic Outcome Frequency: {catastrophic_risk:.2e} events/year

Risk Acceptance Criteria (example):
  Catastrophic: < 1e-6 events/year  {'āœ… ACCEPTABLE' if catastrophic_risk < 1e-6 else 'āŒ UNACCEPTABLE'}
  Critical:     < 1e-4 events/year
"""

        return report


# Usage example: ETA for flammable gas leak scenario
eta = EventTreeAnalysis(
    initiating_event="Large flange gasket failure causing H2 release",
    frequency=1e-3  # 0.001 events/year (once per 1000 years)
)

# Add safety functions (multiple protection layers)
eta.add_safety_function(EventNode(
    name="Gas detection system activates",
    success_probability=0.95,
    failure_probability=0.05
))

eta.add_safety_function(EventNode(
    name="Automatic isolation valve closes",
    success_probability=0.98,
    failure_probability=0.02
))

eta.add_safety_function(EventNode(
    name="Ignition prevention (no ignition sources)",
    success_probability=0.90,
    failure_probability=0.10
))

# Execute event tree analysis
report = eta.generate_eta_report()
print(report)

# Detailed analysis: Worst case scenario
worst_case = max(eta.outcomes, key=lambda x: x.probability
                 if x.severity == "Catastrophic" else 0)

print("\n=== WORST CASE SCENARIO ===")
print(f"Consequence: {worst_case.consequence}")
print(f"Frequency: {worst_case.probability:.2e} events/year")
print(f"Return Period: {1/worst_case.probability:.0f} years")

# Expected output:
# ============================================================================
# EVENT TREE ANALYSIS (ETA) REPORT
# ============================================================================
#
# Initiating Event: Large flange gasket failure causing H2 release
# Frequency: 1.00e-03 events/year
#
# Safety Functions:
# 1. Gas detection system activates
#    Success Prob: 0.9500 (95.00%)
#    Failure Prob: 0.0500 (5.00%)
#
# 2. Automatic isolation valve closes
#    Success Prob: 0.9800 (98.00%)
#    Failure Prob: 0.0200 (2.00%)
# ...
#
# Outcome 1: Major release (offsite consequences)
#   Severity: Catastrophic
#   Frequency: 1.00e-07 events/year  (All 3 safety functions fail)
#   Path:
#     → Gas detection system activates: Failure
#     → Automatic isolation valve closes: Failure
#     → Ignition prevention (no ignition sources): Failure

2.5 Failure Frequency Data and Reliability

Example 5: Generic Failure Rate Database

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0

# ===================================
# Example 5: Generic Failure Rate Database
# ===================================

import pandas as pd
from dataclasses import dataclass
from typing import Dict

@dataclass
class FailureRateData:
    """Failure rate data"""
    equipment_type: str
    failure_mode: str
    failure_rate: float  # failures per million hours (10^-6/hr)
    source: str  # Data source (OREDA, PDS, etc.)
    confidence_factor: float  # 1.0 = nominal, >1.0 = conservative

class GenericFailureRateDatabase:
    """Generic failure rate database"""

    def __init__(self):
        self.database: Dict[str, FailureRateData] = {}
        self._initialize_database()

    def _initialize_database(self):
        """Initialize database (based on OREDA, PDS, etc. data)"""

        failure_rates = [
            # Valves
            ("Isolation Valve", "Fail to close on demand", 5.0, "OREDA 2015", 1.0),
            ("Control Valve", "Fail open", 10.0, "PDS 2010", 1.0),
            ("Check Valve", "Reverse flow (fail to close)", 50.0, "API 581", 1.5),
            ("Pressure Relief Valve", "Fail to open on demand", 10.0, "API 581", 1.0),

            # Pumps
            ("Centrifugal Pump", "Fail to start", 5.0, "OREDA 2015", 1.0),
            ("Centrifugal Pump", "Fail to run (shutdown)", 50.0, "OREDA 2015", 1.0),

            # Instrumentation
            ("Pressure Transmitter", "Out of calibration", 2.0, "EXIDA", 1.0),
            ("Temperature Transmitter", "Fail dangerous", 1.5, "EXIDA", 1.0),
            ("Level Transmitter", "Fail dangerous", 3.0, "EXIDA", 1.0),
            ("Gas Detector", "Fail to detect", 100.0, "Industry avg", 2.0),

            # Safety Systems
            ("SIS Logic Solver", "Fail dangerous", 0.5, "IEC 61508", 1.0),
            ("Emergency Shutdown Valve", "Fail to close", 3.0, "API 581", 1.0),

            # Vessels & Piping
            ("Pressure Vessel", "Catastrophic rupture", 0.01, "API 579", 1.0),
            ("Piping (per km)", "Major leak (>10% cross-section)", 0.5, "UKOPA", 1.0),
            ("Flange Gasket", "Leak", 10.0, "Industry avg", 1.5),

            # Heat Exchangers
            ("Shell & Tube HX", "Tube leak", 20.0, "OREDA 2015", 1.0)
        ]

        for eq_type, mode, rate, source, cf in failure_rates:
            key = f"{eq_type}_{mode}"
            self.database[key] = FailureRateData(eq_type, mode, rate, source, cf)

    def get_failure_rate(self, equipment_type: str, failure_mode: str,
                         apply_confidence_factor: bool = True) -> float:
        """Get failure rate

        Returns:
            Failure rate in failures per year
        """
        key = f"{equipment_type}_{failure_mode}"
        data = self.database.get(key)

        if not data:
            raise ValueError(f"No data for {equipment_type} - {failure_mode}")

        rate_per_year = data.failure_rate * 8760 / 1e6  # 10^-6/hr → /year

        if apply_confidence_factor:
            rate_per_year *= data.confidence_factor

        return rate_per_year

    def get_pfd_from_test_interval(self, equipment_type: str, failure_mode: str,
                                    test_interval_months: int = 12) -> float:
        """Calculate Probability of Failure on Demand (PFD)

        Simplified formula: PFD ā‰ˆ Ī» * T / 2
        Ī»: failure rate (/year)
        T: test interval (year)
        """
        failure_rate = self.get_failure_rate(equipment_type, failure_mode)
        test_interval_years = test_interval_months / 12.0

        pfd = (failure_rate * test_interval_years) / 2.0

        return min(pfd, 1.0)  # PFD max 1.0

    def generate_reliability_datasheet(self, equipment_list: List[tuple]) -> pd.DataFrame:
        """Generate reliability datasheet

        Args:
            equipment_list: [(equipment_type, failure_mode, test_interval_months), ...]
        """
        data = []

        for eq_type, mode, test_interval in equipment_list:
            failure_rate = self.get_failure_rate(eq_type, mode)
            pfd = self.get_pfd_from_test_interval(eq_type, mode, test_interval)

            # SIL capability determination
            if pfd < 1e-4:
                sil_capability = "SIL 3"
            elif pfd < 1e-3:
                sil_capability = "SIL 2"
            elif pfd < 1e-2:
                sil_capability = "SIL 1"
            else:
                sil_capability = "No SIL"

            data.append({
                'Equipment': eq_type,
                'Failure Mode': mode,
                'Failure Rate': f"{failure_rate:.2e} /yr",
                'Test Interval': f"{test_interval} months",
                'PFD': f"{pfd:.2e}",
                'SIL Capability': sil_capability,
                'MTTF': f"{1/failure_rate:.1f} years" if failure_rate > 0 else "N/A"
            })

        return pd.DataFrame(data)


# Usage example
failure_db = GenericFailureRateDatabase()

# Create reliability datasheet
equipment_list = [
    ("Isolation Valve", "Fail to close on demand", 12),
    ("Pressure Relief Valve", "Fail to open on demand", 24),
    ("Gas Detector", "Fail to detect", 3),
    ("SIS Logic Solver", "Fail dangerous", 12),
    ("Emergency Shutdown Valve", "Fail to close", 6)
]

reliability_sheet = failure_db.generate_reliability_datasheet(equipment_list)

print("=== Reliability Datasheet ===\n")
print(reliability_sheet.to_string(index=False))

# Get individual failure rate
prv_failure_rate = failure_db.get_failure_rate(
    "Pressure Relief Valve",
    "Fail to open on demand"
)
print(f"\n=== PRV Reliability ===")
print(f"Failure Rate: {prv_failure_rate:.2e} /year")
print(f"MTTF: {1/prv_failure_rate:.1f} years")

# Expected output:
# === Reliability Datasheet ===
#
#                   Equipment              Failure Mode Failure Rate Test Interval      PFD SIL Capability  MTTF
#          Isolation Valve  Fail to close on demand   4.38e-05 /yr     12 months 2.19e-05        SIL 3  22831.1 years
#  Pressure Relief Valve  Fail to open on demand   8.76e-05 /yr     24 months 8.76e-05        SIL 3  11415.5 years
#            Gas Detector            Fail to detect   8.76e-04 /yr      3 months 1.09e-04        SIL 3  1141.6 years
#       SIS Logic Solver           Fail dangerous   4.38e-06 /yr     12 months 2.19e-06        SIL 4  228310.5 years
# Emergency Shutdown Valve            Fail to close   2.63e-05 /yr      6 months 6.57e-06        SIL 4  38041.8 years

2.6 F-N Curve and Risk Criteria

Example 6: F-N Curve (Frequency-Number curve)

Create F-N curves to assess societal risk.

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0

# ===================================
# Example 6: F-N Curve and Risk Criteria
# ===================================

import matplotlib.pyplot as plt
import numpy as np
from dataclasses import dataclass
from typing import List

@dataclass
class RiskScenario:
    """Risk scenario"""
    name: str
    frequency: float  # events/year
    fatalities: int   # Expected number of fatalities

class FNCurveAnalysis:
    """F-N Curve Analysis System"""

    def __init__(self, facility_name: str):
        self.facility_name = facility_name
        self.scenarios: List[RiskScenario] = []

        # Risk acceptance criteria lines (Dutch criteria example)
        self.alarp_upper_limit = lambda N: 1e-3 / (N**2)  # Upper ALARP
        self.alarp_lower_limit = lambda N: 1e-5 / (N**2)  # Lower ALARP
        self.negligible_limit = lambda N: 1e-7 / (N**2)   # Negligible

    def add_scenario(self, scenario: RiskScenario):
        """Add scenario"""
        self.scenarios.append(scenario)

    def calculate_fn_curve_data(self) -> tuple:
        """Calculate F-N curve data

        Returns:
            (N_values, F_values): Number of fatalities and cumulative frequency
        """
        # Sort by fatalities
        sorted_scenarios = sorted(self.scenarios, key=lambda x: x.fatalities)

        N_values = []
        F_values = []

        for scenario in sorted_scenarios:
            N = scenario.fatalities
            # Cumulative frequency of events with N or more fatalities
            F = sum(s.frequency for s in self.scenarios if s.fatalities >= N)

            N_values.append(N)
            F_values.append(F)

        return np.array(N_values), np.array(F_values)

    def plot_fn_curve(self, save_path: str = None):
        """Plot F-N curve"""
        N, F = self.calculate_fn_curve_data()

        fig, ax = plt.subplots(figsize=(10, 8))

        # Plot F-N curve
        ax.loglog(N, F, 'bo-', linewidth=2, markersize=8,
                  label='Facility Risk Profile', zorder=5)

        # Plot individual scenarios
        for scenario in self.scenarios:
            ax.plot(scenario.fatalities, scenario.frequency,
                    'rx', markersize=10, markeredgewidth=2)
            ax.annotate(scenario.name,
                        xy=(scenario.fatalities, scenario.frequency),
                        xytext=(10, 10), textcoords='offset points',
                        fontsize=8, alpha=0.7)

        # Risk acceptance criteria lines
        N_range = np.logspace(0, 3, 100)  # 1-1000 people

        ax.loglog(N_range, [self.alarp_upper_limit(n) for n in N_range],
                  'r--', linewidth=2, label='ALARP Upper Limit (Unacceptable)')
        ax.loglog(N_range, [self.alarp_lower_limit(n) for n in N_range],
                  'y--', linewidth=2, label='ALARP Lower Limit')
        ax.loglog(N_range, [self.negligible_limit(n) for n in N_range],
                  'g--', linewidth=2, label='Negligible Risk')

        # Fill ALARP region
        ax.fill_between(N_range,
                        [self.alarp_lower_limit(n) for n in N_range],
                        [self.alarp_upper_limit(n) for n in N_range],
                        alpha=0.2, color='yellow', label='ALARP Region')

        ax.set_xlabel('Number of Fatalities (N)', fontsize=12, fontweight='bold')
        ax.set_ylabel('Cumulative Frequency (F) [events/year]', fontsize=12, fontweight='bold')
        ax.set_title(f'F-N Curve: {self.facility_name}\nSocietal Risk Assessment',
                     fontsize=14, fontweight='bold')
        ax.grid(True, which='both', alpha=0.3)
        ax.legend(loc='upper right')

        ax.set_xlim([1, 1000])
        ax.set_ylim([1e-8, 1e-2])

        plt.tight_layout()

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')

        return fig

    def assess_alarp_status(self) -> Dict:
        """Assess ALARP status"""
        results = {
            'unacceptable': [],
            'alarp': [],
            'broadly_acceptable': []
        }

        for scenario in self.scenarios:
            N = scenario.fatalities
            F = scenario.frequency

            if F > self.alarp_upper_limit(N):
                results['unacceptable'].append(scenario)
            elif F > self.alarp_lower_limit(N):
                results['alarp'].append(scenario)
            else:
                results['broadly_acceptable'].append(scenario)

        return results

    def generate_alarp_report(self) -> str:
        """Generate ALARP report"""
        status = self.assess_alarp_status()

        report = f"""
{'='*80}
F-N CURVE ANALYSIS - ALARP ASSESSMENT
{'='*80}

Facility: {self.facility_name}
Total Scenarios Analyzed: {len(self.scenarios)}

{'='*80}
RISK CLASSIFICATION
{'='*80}

UNACCEPTABLE REGION (Above ALARP Upper Limit):
  Scenarios: {len(status['unacceptable'])}
"""

        if status['unacceptable']:
            for s in status['unacceptable']:
                report += f"  āŒ {s.name}: F={s.frequency:.2e}/yr, N={s.fatalities}\n"
            report += "  ACTION: Immediate risk reduction REQUIRED\n"
        else:
            report += "  āœ… No scenarios in unacceptable region\n"

        report += f"""
ALARP REGION (Risk Reduction As Low As Reasonably Practicable):
  Scenarios: {len(status['alarp'])}
"""

        if status['alarp']:
            for s in status['alarp']:
                report += f"  āš ļø  {s.name}: F={s.frequency:.2e}/yr, N={s.fatalities}\n"
            report += "  ACTION: Demonstrate ALARP (cost-benefit analysis)\n"
        else:
            report += "  āœ… No scenarios in ALARP region\n"

        report += f"""
BROADLY ACCEPTABLE REGION (Below ALARP Lower Limit):
  Scenarios: {len(status['broadly_acceptable'])}
"""

        if status['broadly_acceptable']:
            for s in status['broadly_acceptable']:
                report += f"  āœ… {s.name}: F={s.frequency:.2e}/yr, N={s.fatalities}\n"
            report += "  ACTION: Maintain current safety measures\n"

        report += f"\n{'='*80}\n"

        return report


# Usage example: F-N curve analysis for LNG receiving terminal
fn_analysis = FNCurveAnalysis(facility_name="LNG Import Terminal")

# Add risk scenarios
fn_analysis.add_scenario(RiskScenario(
    name="Small LNG leak (no ignition)",
    frequency=1e-2,
    fatalities=0
))

fn_analysis.add_scenario(RiskScenario(
    name="Medium LNG leak → jet fire",
    frequency=5e-4,
    fatalities=2
))

fn_analysis.add_scenario(RiskScenario(
    name="Large LNG leak → pool fire",
    frequency=1e-4,
    fatalities=10
))

fn_analysis.add_scenario(RiskScenario(
    name="Catastrophic tank failure → VCE",
    frequency=1e-6,
    fatalities=100
))

fn_analysis.add_scenario(RiskScenario(
    name="LNG carrier collision → BLEVE",
    frequency=5e-7,
    fatalities=300
))

# Generate ALARP report
alarp_report = fn_analysis.generate_alarp_report()
print(alarp_report)

# Plot F-N curve
fig = fn_analysis.plot_fn_curve()
# plt.show()

# Expected output:
# ============================================================================
# F-N CURVE ANALYSIS - ALARP ASSESSMENT
# ============================================================================
#
# Facility: LNG Import Terminal
# Total Scenarios Analyzed: 5
#
# ============================================================================
# RISK CLASSIFICATION
# ============================================================================
#
# UNACCEPTABLE REGION (Above ALARP Upper Limit):
#   Scenarios: 0
#   āœ… No scenarios in unacceptable region
#
# ALARP REGION (Risk Reduction As Low As Reasonably Practicable):
#   Scenarios: 2
#   āš ļø  Medium LNG leak → jet fire: F=5.00e-04/yr, N=2
#   āš ļø  Large LNG leak → pool fire: F=1.00e-04/yr, N=10
#   ACTION: Demonstrate ALARP (cost-benefit analysis)
#
# BROADLY ACCEPTABLE REGION (Below ALARP Lower Limit):
#   Scenarios: 2
#   āœ… Catastrophic tank failure → VCE: F=1.00e-06/yr, N=100
#   āœ… LNG carrier collision → BLEVE: F=5.00e-07/yr, N=300
#   ACTION: Maintain current safety measures

2.7 HAZOP Report Auto-generation

Example 7: Comprehensive HAZOP Report Generator

# ===================================
# Example 7: HAZOP Report Auto-generation
# ===================================

from datetime import datetime
import json

class HAZOPReportGenerator:
    """HAZOP study complete report generation system"""

    def __init__(self, project_info: Dict):
        self.project_info = project_info
        self.hazop_nodes = []
        self.team_members = []

    def add_team_member(self, name: str, role: str):
        """Add team member"""
        self.team_members.append({'name': name, 'role': role})

    def generate_full_report(self, deviations_by_node: Dict[str, List[Deviation]]) -> str:
        """Generate complete HAZOP report

        Args:
            deviations_by_node: {node_name: [Deviation, ...], ...}
        """

        report = self._generate_cover_page()
        report += self._generate_executive_summary(deviations_by_node)
        report += self._generate_methodology_section()
        report += self._generate_detailed_worksheets(deviations_by_node)
        report += self._generate_action_items(deviations_by_node)
        report += self._generate_appendices()

        return report

    def _generate_cover_page(self) -> str:
        """Cover page"""
        return f"""
{'='*80}
HAZARD AND OPERABILITY STUDY (HAZOP)
FINAL REPORT
{'='*80}

Project:     {self.project_info.get('name', 'N/A')}
Facility:    {self.project_info.get('facility', 'N/A')}
Location:    {self.project_info.get('location', 'N/A')}

Study Date:  {self.project_info.get('study_date', datetime.now().strftime('%Y-%m-%d'))}
Report Date: {datetime.now().strftime('%Y-%m-%d')}

Document No: {self.project_info.get('doc_number', 'HAZOP-001')}
Revision:    {self.project_info.get('revision', '0')}

{'='*80}
STUDY TEAM
{'='*80}

"""

    def _generate_executive_summary(self, deviations_by_node: Dict) -> str:
        """Executive summary"""

        total_deviations = sum(len(devs) for devs in deviations_by_node.values())
        high_risk = sum(1 for devs in deviations_by_node.values()
                        for dev in devs if dev.risk_rank == "High")

        summary = f"""
EXECUTIVE SUMMARY
{'='*80}

Study Scope:
  - Total HAZOP Nodes:     {len(deviations_by_node)}
  - Total Deviations:      {total_deviations}
  - High Risk Scenarios:   {high_risk}

Key Findings:
  1. {high_risk} high-risk scenarios identified requiring immediate attention
  2. All scenarios have been assessed with existing safeguards documented
  3. Recommendations provided for risk reduction where needed

Overall Assessment:
"""

        if high_risk == 0:
            summary += "  āœ… No high-risk scenarios. Facility design is robust.\n"
        elif high_risk <= 3:
            summary += f"  āš ļø  {high_risk} high-risk scenarios require mitigation.\n"
        else:
            summary += f"  āŒ {high_risk} high-risk scenarios - major design review recommended.\n"

        return summary + "\n"

    def _generate_methodology_section(self) -> str:
        """Methodology section"""
        return f"""
METHODOLOGY
{'='*80}

1. HAZOP Technique:
   - Guide Words applied to process parameters
   - Systematic deviation analysis
   - Cause-Consequence-Safeguard approach

2. Risk Ranking:
   - High:   Immediate action required
   - Medium: Risk reduction should be considered
   - Low:    Acceptable with existing safeguards

3. Documentation:
   - P&ID reviewed: {self.project_info.get('pid_revision', 'Rev 0')}
   - Process conditions: {self.project_info.get('process_basis', 'Normal operation')}

"""

    def _generate_detailed_worksheets(self, deviations_by_node: Dict) -> str:
        """Detailed worksheets"""
        worksheets = f"""
DETAILED HAZOP WORKSHEETS
{'='*80}

"""

        for node_name, deviations in deviations_by_node.items():
            worksheets += f"\nNode: {node_name}\n"
            worksheets += f"{'-'*80}\n\n"

            for i, dev in enumerate(deviations, 1):
                worksheets += f"Deviation {i}: {dev.description}\n"
                worksheets += f"Risk Rank: {dev.risk_rank}\n\n"

                worksheets += "Potential Causes:\n"
                for cause in dev.potential_causes:
                    worksheets += f"  - {cause}\n"

                worksheets += "\nPotential Consequences:\n"
                for cons in dev.potential_consequences:
                    worksheets += f"  - {cons}\n"

                worksheets += "\nExisting Safeguards:\n"
                for sg in dev.existing_safeguards:
                    worksheets += f"  - {sg}\n"

                worksheets += "\nRecommendations:\n"
                for rec in dev.recommendations:
                    worksheets += f"  - {rec}\n"

                worksheets += "\n" + "-"*80 + "\n\n"

        return worksheets

    def _generate_action_items(self, deviations_by_node: Dict) -> str:
        """Action items"""
        actions = f"""
ACTION ITEM REGISTER
{'='*80}

"""

        action_id = 1
        for node_name, deviations in deviations_by_node.items():
            for dev in deviations:
                for rec in dev.recommendations:
                    if "adequate" not in rec.lower():  # Skip no-action recommendations
                        priority = "HIGH" if dev.risk_rank == "High" else "MEDIUM"
                        actions += f"Action {action_id:03d}: [{priority}] {rec}\n"
                        actions += f"  Node: {node_name}\n"
                        actions += f"  Deviation: {dev.description}\n"
                        actions += f"  Responsible: TBD\n"
                        actions += f"  Target Date: TBD\n\n"
                        action_id += 1

        return actions

    def _generate_appendices(self) -> str:
        """Appendices"""
        return f"""
APPENDICES
{'='*80}

Appendix A: P&ID Drawings
Appendix B: Study Team Attendance Records
Appendix C: Risk Matrix Definition
Appendix D: Abbreviations and Definitions

"""

    def export_to_json(self, deviations_by_node: Dict, filepath: str):
        """Export to JSON format"""
        export_data = {
            'project_info': self.project_info,
            'study_date': datetime.now().isoformat(),
            'nodes': {}
        }

        for node_name, deviations in deviations_by_node.items():
            export_data['nodes'][node_name] = [
                {
                    'description': dev.description,
                    'guide_word': dev.guide_word.value,
                    'parameter': dev.parameter.value,
                    'causes': dev.potential_causes,
                    'consequences': dev.potential_consequences,
                    'safeguards': dev.existing_safeguards,
                    'recommendations': dev.recommendations,
                    'risk_rank': dev.risk_rank
                }
                for dev in deviations
            ]

        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(export_data, f, indent=2, ensure_ascii=False)


# Usage example
project_info = {
    'name': 'Ethylene Polymerization Plant',
    'facility': 'Reactor Section',
    'location': 'Yokkaichi, Japan',
    'study_date': '2025-10-15',
    'doc_number': 'HAZOP-EPP-001',
    'revision': '0',
    'pid_revision': 'Rev 2',
    'process_basis': 'Normal operation at 85% capacity'
}

report_gen = HAZOPReportGenerator(project_info)

# Add team members
report_gen.add_team_member("Dr. Smith", "HAZOP Leader")
report_gen.add_team_member("Eng. Tanaka", "Process Engineer")
report_gen.add_team_member("Eng. Kim", "Instrument Engineer")

# Deviation data (generated from Example 2)
analyzer = DeviationAnalyzer("Reactor R-101", "Vessel")
analyzer.analyze_deviation(GuideWord.NO, ProcessParameter.FLOW)
analyzer.analyze_deviation(GuideWord.MORE, ProcessParameter.TEMPERATURE)

deviations_data = {
    "Reactor R-101": analyzer.deviations
}

# Generate report
full_report = report_gen.generate_full_report(deviations_data)
print(full_report[:2000])  # Display first 2000 characters

# JSON export
# report_gen.export_to_json(deviations_data, "hazop_study_results.json")
# print("\nāœ… HAZOP study exported to hazop_study_results.json")

# Expected output:
# ============================================================================
# HAZARD AND OPERABILITY STUDY (HAZOP)
# FINAL REPORT
# ============================================================================
#
# Project:     Ethylene Polymerization Plant
# Facility:    Reactor Section
# Location:    Yokkaichi, Japan
# ...

2.8 Risk Ranking and Prioritization

Example 8: Multi-Criteria Decision Analysis (MCDA)

Comprehensively evaluate multiple risk scenarios and determine countermeasure priorities.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

# ===================================
# Example 8: Multi-Criteria Decision Analysis (MCDA)
# ===================================

import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class RiskCriteria:
    """Risk assessment criteria"""
    likelihood: float       # 1-5
    severity_people: float  # 1-5 (people impact)
    severity_env: float     # 1-5 (environmental impact)
    severity_asset: float   # 1-5 (asset loss)
    detectability: float    # 1-5 (detectability, lower is better)

class MultiCriteriaRiskRanking:
    """Multi-criteria risk ranking system"""

    def __init__(self):
        # Criteria weights (sum to 1.0)
        self.weights = {
            'likelihood': 0.25,
            'severity_people': 0.35,  # Highest weight on people impact
            'severity_env': 0.20,
            'severity_asset': 0.15,
            'detectability': 0.05
        }

    def calculate_risk_priority_number(self, criteria: RiskCriteria) -> float:
        """Calculate Risk Priority Number (RPN)"""

        # Weighted average score
        weighted_score = (
            criteria.likelihood * self.weights['likelihood'] +
            criteria.severity_people * self.weights['severity_people'] +
            criteria.severity_env * self.weights['severity_env'] +
            criteria.severity_asset * self.weights['severity_asset'] +
            criteria.detectability * self.weights['detectability']
        )

        # Normalize to 0-100 scale
        rpn = weighted_score * 20

        return rpn

    def rank_scenarios(self, scenarios: Dict[str, RiskCriteria]) -> pd.DataFrame:
        """Rank scenarios"""

        data = []

        for scenario_name, criteria in scenarios.items():
            rpn = self.calculate_risk_priority_number(criteria)

            # Determine priority level
            if rpn >= 80:
                priority = "Critical (P1)"
            elif rpn >= 60:
                priority = "High (P2)"
            elif rpn >= 40:
                priority = "Medium (P3)"
            else:
                priority = "Low (P4)"

            data.append({
                'Scenario': scenario_name,
                'Likelihood': criteria.likelihood,
                'Sev_People': criteria.severity_people,
                'Sev_Env': criteria.severity_env,
                'Sev_Asset': criteria.severity_asset,
                'Detectability': criteria.detectability,
                'RPN': f"{rpn:.1f}",
                'Priority': priority
            })

        df = pd.DataFrame(data)
        df = df.sort_values('RPN', ascending=False, key=lambda x: x.astype(float))

        return df

    def sensitivity_analysis(self, criteria: RiskCriteria, scenario_name: str):
        """Sensitivity analysis: Impact of each criterion change on RPN"""

        baseline_rpn = self.calculate_risk_priority_number(criteria)

        print(f"\n=== Sensitivity Analysis: {scenario_name} ===")
        print(f"Baseline RPN: {baseline_rpn:.1f}\n")

        print("Impact of +1 change in each criterion:")

        # Likelihood +1
        temp_criteria = RiskCriteria(
            criteria.likelihood + 1,
            criteria.severity_people,
            criteria.severity_env,
            criteria.severity_asset,
            criteria.detectability
        )
        new_rpn = self.calculate_risk_priority_number(temp_criteria)
        print(f"  Likelihood +1:        {baseline_rpn:.1f} → {new_rpn:.1f} (Ī”{new_rpn-baseline_rpn:+.1f})")

        # Severity_People +1
        temp_criteria = RiskCriteria(
            criteria.likelihood,
            min(criteria.severity_people + 1, 5),
            criteria.severity_env,
            criteria.severity_asset,
            criteria.detectability
        )
        new_rpn = self.calculate_risk_priority_number(temp_criteria)
        print(f"  Severity_People +1:   {baseline_rpn:.1f} → {new_rpn:.1f} (Ī”{new_rpn-baseline_rpn:+.1f})")

        # Detectability -1 (lower is better)
        temp_criteria = RiskCriteria(
            criteria.likelihood,
            criteria.severity_people,
            criteria.severity_env,
            criteria.severity_asset,
            max(criteria.detectability - 1, 1)
        )
        new_rpn = self.calculate_risk_priority_number(temp_criteria)
        print(f"  Detectability -1:     {baseline_rpn:.1f} → {new_rpn:.1f} (Ī”{new_rpn-baseline_rpn:+.1f})")


# Usage example
mcda = MultiCriteriaRiskRanking()

# Evaluate multiple risk scenarios
scenarios = {
    "Reactor overpressure (no relief)": RiskCriteria(
        likelihood=2,         # Occasional
        severity_people=5,    # Catastrophic
        severity_env=4,       # Critical
        severity_asset=5,     # Catastrophic
        detectability=2       # Good detection (pressure transmitter)
    ),

    "Toxic gas release (H2S)": RiskCriteria(
        likelihood=3,         # Probable
        severity_people=4,    # Critical
        severity_env=3,       # Marginal
        severity_asset=2,     # Negligible
        detectability=3       # Moderate detection
    ),

    "Loss of cooling water": RiskCriteria(
        likelihood=4,         # Frequent
        severity_people=2,    # Negligible
        severity_env=1,       # Minimal
        severity_asset=3,     # Marginal
        detectability=1       # Excellent detection (flow transmitter)
    ),

    "Runaway polymerization": RiskCriteria(
        likelihood=2,         # Occasional
        severity_people=4,    # Critical
        severity_env=3,       # Marginal
        severity_asset=4,     # Critical
        detectability=3       # Moderate detection
    ),

    "Flange gasket leak": RiskCriteria(
        likelihood=3,         # Probable
        severity_people=2,    # Negligible
        severity_env=2,       # Negligible
        severity_asset=1,     # Minimal
        detectability=4       # Poor detection (visual inspection)
    )
}

# Perform ranking
ranking = mcda.rank_scenarios(scenarios)

print("=== Multi-Criteria Risk Ranking ===\n")
print(ranking.to_string(index=False))

# Sensitivity analysis for highest risk scenario
top_scenario_name = ranking.iloc[0]['Scenario']
top_scenario_criteria = scenarios[top_scenario_name]

mcda.sensitivity_analysis(top_scenario_criteria, top_scenario_name)

# Expected output:
# === Multi-Criteria Risk Ranking ===
#
#                        Scenario  Likelihood  Sev_People  Sev_Env  Sev_Asset  Detectability    RPN      Priority
#  Reactor overpressure (no relief)           2           5        4          5              2   80.0  Critical (P1)
#         Runaway polymerization           2           4        3          4              3   66.0      High (P2)
#          Toxic gas release (H2S)           3           4        3          2              3   66.0      High (P2)
#            Loss of cooling water           4           2        1          3              1   48.0    Medium (P3)
#                Flange gasket leak           3           2        2          1              4   44.0    Medium (P3)
#
# === Sensitivity Analysis: Reactor overpressure (no relief) ===
# Baseline RPN: 80.0
#
# Impact of +1 change in each criterion:
#   Likelihood +1:        80.0 → 85.0 (Ī”+5.0)
#   Severity_People +1:   80.0 → 80.0 (Ī”+0.0)  # Already at max (5)
#   Detectability -1:     80.0 → 79.0 (Ī”-1.0)

Learning Objectives Review

Upon completing this chapter, you should be able to explain:

Fundamental Understanding

Practical Skills

Application Skills


Next Steps

In Chapter 2, we learned about HAZOP, QRA, and F-N curves.

In Chapter 3:

will be covered.

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer