🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 1: Fundamentals of Process Safety

Complete Implementation of Hazard Identification, Risk Assessment, and Layer of Protection Analysis (LOPA)

📖 Reading Time: 25-30 minutes 📊 Difficulty: Intermediate 💻 Code Examples: 8

This chapter covers the fundamentals of Fundamentals of Process Safety, which overview of process safety. You will learn concept of process safety, hazard identification frameworks, and Conduct Layer of Protection Analysis (LOPA).

Learning Objectives

By reading this chapter, you will be able to:


1.1 Overview of Process Safety

What is Process Safety

Process Safety is a systematic approach to managing major hazards (fires, explosions, toxic gas releases, etc.) in chemical processes. While Occupational Safety focuses on preventing individual injuries, process safety focuses on Equipment Integrity and operational management.

History of Major Accidents

Accident Year Fatalities Main Cause Lesson Learned
Flixborough 1974 28 Cyclohexane release due to poor pipe design Importance of Change Management
Bhopal 1984 3,787+ Methyl isocyanate (MIC) release Need for multiple protection layers
Piper Alpha 1988 167 Gas leak → fire → explosion Permit-to-Work system
Texas City 2005 15 Distillation column overflow → explosion Importance of safety culture

Key Insight: The common factor in these accidents is that the root causes were not just technical failures but Management System Failures.


1.2 Hazard Identification Framework

Example 1: Implementation of Hazard Identification System

The first step in Process Hazard Analysis (PHA) is systematic hazard identification.

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0

# ===================================
# Example 1: Hazard Identification System
# ===================================

import pandas as pd
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class HazardCategory(Enum):
    """Hazard Category"""
    PHYSICAL = "Physical Hazard"
    CHEMICAL = "Chemical Hazard"
    BIOLOGICAL = "Biological Hazard"
    ERGONOMIC = "Ergonomic Hazard"

class Severity(Enum):
    """Severity Level"""
    CATASTROPHIC = 5  # Catastrophic (multiple fatalities)
    CRITICAL = 4      # Critical (one or more fatalities)
    MARGINAL = 3      # Marginal (serious injury)
    NEGLIGIBLE = 2    # Negligible (minor injury)
    MINIMAL = 1       # Minimal (first aid only)

@dataclass
class Hazard:
    """Hazard Information"""
    id: str
    name: str
    category: HazardCategory
    description: str
    potential_causes: List[str]
    potential_consequences: List[str]
    severity: Severity
    existing_safeguards: List[str]

class HazardIdentificationSystem:
    """Hazard Identification System"""

    def __init__(self):
        self.hazards: List[Hazard] = []
        self._initialize_common_hazards()

    def _initialize_common_hazards(self):
        """Initialize common chemical process hazards"""

        # Hazard 1: High-pressure gas release
        self.add_hazard(Hazard(
            id="HAZ-001",
            name="High Pressure Gas Release",
            category=HazardCategory.PHYSICAL,
            description="High-pressure hydrogen gas release from reactor",
            potential_causes=[
                "Gasket degradation in pipe flanges",
                "Pipe perforation due to corrosion",
                "Valve seat wear",
                "Rupture disc burst due to overpressure"
            ],
            potential_consequences=[
                "Fire/explosion (if ignition source present)",
                "Asphyxiation (oxygen deficiency in confined spaces)",
                "Cryogenic burns (in case of LNG)",
                "Environmental contamination"
            ],
            severity=Severity.CATASTROPHIC,
            existing_safeguards=[
                "Gas detector (0.4% LEL setpoint)",
                "Emergency shutdown valve (ESD)",
                "Pressure relief valve (PRV)",
                "Vent system"
            ]
        ))

        # Hazard 2: Runaway exothermic reaction
        self.add_hazard(Hazard(
            id="HAZ-002",
            name="Runaway Exothermic Reaction",
            category=HazardCategory.CHEMICAL,
            description="Reaction runaway due to cooling system failure",
            potential_causes=[
                "Cooling water pump failure",
                "Temperature control system failure",
                "Localized overheating due to agitator stoppage",
                "Excessive catalyst addition"
            ],
            potential_consequences=[
                "Reactor rupture",
                "Toxic gas release",
                "Fire/explosion",
                "Cascade disaster to surrounding facilities"
            ],
            severity=Severity.CRITICAL,
            existing_safeguards=[
                "Independent high temperature interlock (120°C)",
                "Emergency cooling system",
                "Rupture disc + quencher",
                "Raw material emergency shutdown"
            ]
        ))

        # Hazard 3: Flammable liquid release
        self.add_hazard(Hazard(
            id="HAZ-003",
            name="Flammable Liquid Release",
            category=HazardCategory.CHEMICAL,
            description="Liquid release from toluene storage tank",
            potential_causes=[
                "Tank bottom corrosion",
                "Overflow (level gauge failure)",
                "Loading hose rupture",
                "Pipe rupture due to earthquake"
            ],
            potential_consequences=[
                "Pool fire",
                "Vapor cloud explosion (VCE)",
                "Soil and groundwater contamination",
                "Health impacts on nearby residents"
            ],
            severity=Severity.CRITICAL,
            existing_safeguards=[
                "Dike (110% capacity)",
                "High level alarm + interlock",
                "Leak detection system",
                "Foam fire suppression"
            ]
        ))

    def add_hazard(self, hazard: Hazard):
        """Add hazard"""
        self.hazards.append(hazard)

    def get_hazards_by_severity(self, min_severity: Severity) -> List[Hazard]:
        """Get hazards with severity greater than or equal to specified level"""
        return [h for h in self.hazards if h.severity.value >= min_severity.value]

    def generate_hazard_register(self) -> pd.DataFrame:
        """Generate hazard register (list)"""
        data = []
        for h in self.hazards:
            data.append({
                'ID': h.id,
                'Hazard Name': h.name,
                'Category': h.category.value,
                'Severity': h.severity.name,
                'Severity Score': h.severity.value,
                'Main Causes': '; '.join(h.potential_causes[:2]),  # First two
                'Main Consequences': '; '.join(h.potential_consequences[:2]),
                'Existing Safeguards Count': len(h.existing_safeguards)
            })

        df = pd.DataFrame(data)
        return df.sort_values('Severity Score', ascending=False)


# Usage example
hazard_system = HazardIdentificationSystem()

# Extract high-risk hazards
critical_hazards = hazard_system.get_hazards_by_severity(Severity.CRITICAL)
print(f"Number of critical hazards: {len(critical_hazards)}\n")

# Generate hazard register
hazard_register = hazard_system.generate_hazard_register()
print("=== Hazard Register ===")
print(hazard_register.to_string(index=False))

# Expected output:
# Number of critical hazards: 2
#
# === Hazard Register ===
#       ID    Hazard Name         Category      Severity  Severity Score ...
#  HAZ-001  High Pressure Gas Release  Physical Hazard  CATASTROPHIC     5 ...
#  HAZ-002  Runaway Exothermic Reaction  Chemical Hazard  CRITICAL         4 ...
#  HAZ-003  Flammable Liquid Release Chemical Hazard  CRITICAL         4 ...

1.3 Fundamentals of Risk Assessment

Risk Matrix Concept

Risk is assessed as a combination of Likelihood and Consequence:

$$ \text{Risk} = \text{Likelihood} \times \text{Consequence} $$

Example 2: Risk Matrix Implementation

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - seaborn>=0.12.0

# ===================================
# Example 2: Risk Matrix
# ===================================

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from enum import Enum

class Likelihood(Enum):
    """Likelihood Level"""
    FREQUENT = 5      # Frequent (once per year or more)
    PROBABLE = 4      # Probable (once per 2-10 years)
    OCCASIONAL = 3    # Occasional (once per 10-100 years)
    REMOTE = 2        # Remote (once per 100-1000 years)
    IMPROBABLE = 1    # Improbable (less than once per 1000 years)

class RiskLevel(Enum):
    """Risk Level"""
    EXTREME = "Extreme"
    HIGH = "High"
    MEDIUM = "Medium"
    LOW = "Low"

class RiskMatrix:
    """Risk Matrix Evaluation System"""

    def __init__(self):
        # Risk matrix definition (5x5)
        # Rows: Severity (1-5), Columns: Likelihood (1-5)
        self.matrix = np.array([
            [1,  2,  3,  5,  8 ],  # Severity 1 (MINIMAL)
            [2,  4,  6,  10, 15],  # Severity 2 (NEGLIGIBLE)
            [4,  8,  12, 18, 25],  # Severity 3 (MARGINAL)
            [8,  15, 20, 25, 30],  # Severity 4 (CRITICAL)
            [12, 20, 25, 30, 35]   # Severity 5 (CATASTROPHIC)
        ])

        # Risk level thresholds
        self.thresholds = {
            RiskLevel.EXTREME: 25,  # ≥25
            RiskLevel.HIGH: 15,     # 15-24
            RiskLevel.MEDIUM: 8,    # 8-14
            RiskLevel.LOW: 0        # <8
        }

    def calculate_risk_score(self, severity: Severity, likelihood: Likelihood) -> int:
        """Calculate risk score"""
        return self.matrix[severity.value - 1, likelihood.value - 1]

    def determine_risk_level(self, risk_score: int) -> RiskLevel:
        """Determine risk level"""
        if risk_score >= self.thresholds[RiskLevel.EXTREME]:
            return RiskLevel.EXTREME
        elif risk_score >= self.thresholds[RiskLevel.HIGH]:
            return RiskLevel.HIGH
        elif risk_score >= self.thresholds[RiskLevel.MEDIUM]:
            return RiskLevel.MEDIUM
        else:
            return RiskLevel.LOW

    def assess_risk(self, hazard_name: str, severity: Severity,
                    likelihood: Likelihood) -> Dict:
        """Comprehensive risk assessment"""
        risk_score = self.calculate_risk_score(severity, likelihood)
        risk_level = self.determine_risk_level(risk_score)

        # Response action
        if risk_level == RiskLevel.EXTREME:
            action = "Immediate action required. Consider operation shutdown."
        elif risk_level == RiskLevel.HIGH:
            action = "Implement risk reduction measures with priority (within 3 months)."
        elif risk_level == RiskLevel.MEDIUM:
            action = "Plan risk reduction measures (within 1 year)."
        else:
            action = "Maintain current safety measures."

        return {
            'Hazard': hazard_name,
            'Severity': severity.name,
            'Likelihood': likelihood.name,
            'Risk Score': risk_score,
            'Risk Level': risk_level.value,
            'Recommended Action': action
        }

    def visualize_matrix(self, assessments: List[Dict] = None):
        """Visualize risk matrix"""
        fig, ax = plt.subplots(figsize=(10, 8))

        # Create heatmap
        sns.heatmap(self.matrix, annot=True, fmt='d', cmap='YlOrRd',
                    cbar_kws={'label': 'Risk Score'},
                    xticklabels=['Improbable\n(1)', 'Remote\n(2)', 'Occasional\n(3)',
                                 'Probable\n(4)', 'Frequent\n(5)'],
                    yticklabels=['Minimal\n(1)', 'Negligible\n(2)', 'Marginal\n(3)',
                                 'Critical\n(4)', 'Catastrophic\n(5)'],
                    ax=ax)

        ax.set_xlabel('Likelihood', fontsize=12, fontweight='bold')
        ax.set_ylabel('Severity', fontsize=12, fontweight='bold')
        ax.set_title('Process Safety Risk Matrix', fontsize=14, fontweight='bold')

        # Plot assessment results
        if assessments:
            for assess in assessments:
                sev_map = {'MINIMAL': 1, 'NEGLIGIBLE': 2, 'MARGINAL': 3,
                           'CRITICAL': 4, 'CATASTROPHIC': 5}
                like_map = {'IMPROBABLE': 1, 'REMOTE': 2, 'OCCASIONAL': 3,
                            'PROBABLE': 4, 'FREQUENT': 5}

                sev_idx = sev_map[assess['Severity']] - 1
                like_idx = like_map[assess['Likelihood']] - 1

                ax.plot(like_idx + 0.5, sev_idx + 0.5, 'bo', markersize=15,
                        markeredgecolor='white', markeredgewidth=2)

        plt.tight_layout()
        return fig


# Usage example
risk_matrix = RiskMatrix()

# Assess multiple hazards
assessments = [
    risk_matrix.assess_risk("High Pressure Gas Release", Severity.CATASTROPHIC, Likelihood.OCCASIONAL),
    risk_matrix.assess_risk("Runaway Exothermic Reaction", Severity.CRITICAL, Likelihood.REMOTE),
    risk_matrix.assess_risk("Flammable Liquid Release", Severity.CRITICAL, Likelihood.OCCASIONAL)
]

# Display results
print("=== Risk Assessment Results ===\n")
for assess in assessments:
    print(f"Hazard: {assess['Hazard']}")
    print(f"  Risk Score: {assess['Risk Score']} ({assess['Risk Level']})")
    print(f"  Recommended Action: {assess['Recommended Action']}\n")

# Visualization
fig = risk_matrix.visualize_matrix(assessments)
# plt.show()  # Auto-display in Jupyter environment

# Expected output:
# === Risk Assessment Results ===
#
# Hazard: High Pressure Gas Release
#   Risk Score: 25 (Extreme)
#   Recommended Action: Immediate action required. Consider operation shutdown.
#
# Hazard: Runaway Exothermic Reaction
#   Risk Score: 15 (High)
#   Recommended Action: Implement risk reduction measures with priority (within 3 months).

1.4 Layer of Protection Analysis (LOPA)

LOPA Concept

LOPA (Layer of Protection Analysis) is a method for quantitatively evaluating the effectiveness of Independent Protection Layers (IPLs) for risk scenarios.

Example 3: LOPA Implementation and SIL Calculation

# ===================================
# Example 3: Layer of Protection Analysis (LOPA) and SIL Calculation
# ===================================

from dataclasses import dataclass
from typing import List
import math

@dataclass
class ProtectionLayer:
    """Protection Layer (IPL)"""
    name: str
    pfd: float  # Probability of Failure on Demand

    @property
    def risk_reduction_factor(self) -> float:
        """Risk Reduction Factor (RRF)"""
        return 1.0 / self.pfd if self.pfd > 0 else float('inf')

class SIL(Enum):
    """Safety Integrity Level"""
    SIL_4 = (1e-5, 1e-4, "10^-5 to 10^-4")
    SIL_3 = (1e-4, 1e-3, "10^-4 to 10^-3")
    SIL_2 = (1e-3, 1e-2, "10^-3 to 10^-2")
    SIL_1 = (1e-2, 1e-1, "10^-2 to 10^-1")
    NO_SIL = (1e-1, 1.0, "> 10^-1")

    def __init__(self, lower, upper, range_str):
        self.lower = lower
        self.upper = upper
        self.range_str = range_str

class LOPAAnalysis:
    """Layer of Protection Analysis System"""

    def __init__(self, scenario_name: str, initiating_event_frequency: float,
                 consequence_severity: Severity):
        """
        Args:
            scenario_name: Scenario name
            initiating_event_frequency: Initiating event frequency (events/year)
            consequence_severity: Consequence severity
        """
        self.scenario_name = scenario_name
        self.initiating_event_frequency = initiating_event_frequency
        self.consequence_severity = consequence_severity
        self.protection_layers: List[ProtectionLayer] = []

    def add_protection_layer(self, layer: ProtectionLayer):
        """Add protection layer"""
        self.protection_layers.append(layer)

    def calculate_mitigated_frequency(self) -> float:
        """Calculate mitigated event frequency"""
        total_pfd = self.initiating_event_frequency

        for layer in self.protection_layers:
            total_pfd *= layer.pfd

        return total_pfd

    def determine_required_sil(self, tolerable_frequency: float = 1e-4) -> SIL:
        """Determine required SIL level

        Args:
            tolerable_frequency: Tolerable event frequency (events/year)
        """
        current_frequency = self.calculate_mitigated_frequency()

        if current_frequency <= tolerable_frequency:
            return SIL.NO_SIL

        # Additional risk reduction required
        required_pfd = tolerable_frequency / self.initiating_event_frequency

        # Consider PFD of existing protection layers
        for layer in self.protection_layers:
            required_pfd /= layer.pfd

        # SIL determination
        for sil in [SIL.SIL_4, SIL.SIL_3, SIL.SIL_2, SIL.SIL_1]:
            if sil.lower <= required_pfd < sil.upper:
                return sil

        return SIL.NO_SIL

    def generate_lopa_report(self, tolerable_frequency: float = 1e-4) -> str:
        """Generate LOPA report"""
        mitigated_freq = self.calculate_mitigated_frequency()
        required_sil = self.determine_required_sil(tolerable_frequency)

        report = f"""
{'='*60}
LOPA Analysis Report
{'='*60}

Scenario: {self.scenario_name}
Consequence Severity: {self.consequence_severity.name}

--- Initiating Event ---
Frequency: {self.initiating_event_frequency:.2e} events/year

--- Independent Protection Layers (IPL) ---
"""

        total_rrf = 1.0
        for i, layer in enumerate(self.protection_layers, 1):
            rrf = layer.risk_reduction_factor
            total_rrf *= rrf
            report += f"{i}. {layer.name}\n"
            report += f"   PFD: {layer.pfd:.2e}\n"
            report += f"   RRF: {rrf:.0f}\n\n"

        report += f"""--- Risk Assessment ---
Total Risk Reduction Factor: {total_rrf:.0f}
Mitigated Event Frequency: {mitigated_freq:.2e} events/year
Tolerable Frequency Target: {tolerable_frequency:.2e} events/year

Risk Status: {'ACCEPTABLE' if mitigated_freq <= tolerable_frequency else 'UNACCEPTABLE'}

--- SIL Requirement ---
"""

        if required_sil == SIL.NO_SIL:
            report += "Required SIL: None (existing IPLs are sufficient)\n"
        else:
            report += f"Required SIL: {required_sil.name}\n"
            report += f"Target PFD Range: {required_sil.range_str}\n"
            report += f"\nRecommendation: Implement SIF (Safety Instrumented Function) with {required_sil.name}\n"

        report += f"\n{'='*60}\n"

        return report


# Usage example: LOPA analysis for reactor overpressure scenario
lopa = LOPAAnalysis(
    scenario_name="Reactor overpressure leading to rupture",
    initiating_event_frequency=1e-2,  # 0.01 events/year (once per 100 years)
    consequence_severity=Severity.CATASTROPHIC
)

# Add protection layers
lopa.add_protection_layer(ProtectionLayer(
    name="Basic Process Control System (BPCS)",
    pfd=1e-1  # 90% effectiveness
))

lopa.add_protection_layer(ProtectionLayer(
    name="High Pressure Alarm (operator response)",
    pfd=1e-1  # 90% effectiveness
))

lopa.add_protection_layer(ProtectionLayer(
    name="Pressure Relief Valve (PRV)",
    pfd=1e-2  # 99% effectiveness
))

# Generate LOPA report
report = lopa.generate_lopa_report(tolerable_frequency=1e-5)
print(report)

# Expected output:
# ============================================================
# LOPA Analysis Report
# ============================================================
#
# Scenario: Reactor overpressure leading to rupture
# Consequence Severity: CATASTROPHIC
#
# --- Initiating Event ---
# Frequency: 1.00e-02 events/year
#
# --- Independent Protection Layers (IPL) ---
# 1. Basic Process Control System (BPCS)
#    PFD: 1.00e-01
#    RRF: 10
#
# 2. High Pressure Alarm (operator response)
#    PFD: 1.00e-01
#    RRF: 10
#
# 3. Pressure Relief Valve (PRV)
#    PFD: 1.00e-02
#    RRF: 100
#
# --- Risk Assessment ---
# Total Risk Reduction Factor: 10000
# Mitigated Event Frequency: 1.00e-06 events/year
# Tolerable Frequency Target: 1.00e-05 events/year
#
# Risk Status: ACCEPTABLE
#
# --- SIL Requirement ---
# Required SIL: None (existing IPLs are sufficient)

1.5 Consequence Modeling

Example 4: Gas Dispersion Model (Gaussian Plume)

We implement a Gaussian Plume model to predict the impact zone of toxic gas releases.

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0

# ===================================
# Example 4: Gas Dispersion Model (Gaussian Plume)
# ===================================

import numpy as np
import matplotlib.pyplot as plt
from scipy.special import erf

class GaussianPlumeModel:
    """Gaussian Plume Dispersion Model (Steady State)"""

    def __init__(self, emission_rate: float, wind_speed: float,
                 stack_height: float = 0.0, stability_class: str = 'D'):
        """
        Args:
            emission_rate: Emission rate [g/s]
            wind_speed: Wind speed [m/s]
            stack_height: Release height [m]
            stability_class: Atmospheric stability class (A-F, Pasquill classification)
        """
        self.Q = emission_rate
        self.u = wind_speed
        self.H = stack_height
        self.stability_class = stability_class

    def _pasquill_gifford_sigma(self, x: float) -> tuple:
        """Pasquill-Gifford dispersion parameters

        Args:
            x: Downwind distance [m]

        Returns:
            (sigma_y, sigma_z): Lateral and vertical dispersion coefficients [m]
        """
        # Simplified Pasquill-Gifford formula (stability class D: neutral)
        # In actual implementation, coefficients vary by stability_class

        coefficients = {
            'A': (0.22, 0.20),  # Very unstable
            'B': (0.16, 0.12),  # Unstable
            'C': (0.11, 0.08),  # Slightly unstable
            'D': (0.08, 0.06),  # Neutral (default)
            'E': (0.06, 0.03),  # Slightly stable
            'F': (0.04, 0.016)  # Stable
        }

        a_y, a_z = coefficients.get(self.stability_class, (0.08, 0.06))

        # Dispersion coefficient calculation (empirical formula)
        sigma_y = a_y * x * (1 + 0.0001 * x)**(-0.5)
        sigma_z = a_z * x

        return sigma_y, sigma_z

    def concentration(self, x: float, y: float, z: float) -> float:
        """Calculate concentration at specific position

        Args:
            x: Downwind distance [m]
            y: Crosswind distance [m]
            z: Height above ground [m]

        Returns:
            Concentration [g/m^3]
        """
        if x <= 0:
            return 0.0

        sigma_y, sigma_z = self._pasquill_gifford_sigma(x)

        # Gaussian Plume equation
        C = (self.Q / (2 * np.pi * self.u * sigma_y * sigma_z)) * \
            np.exp(-0.5 * (y / sigma_y)**2) * \
            (np.exp(-0.5 * ((z - self.H) / sigma_z)**2) +
             np.exp(-0.5 * ((z + self.H) / sigma_z)**2))  # Ground reflection term

        return C

    def ground_level_centerline_concentration(self, x: float) -> float:
        """Downwind centerline ground-level concentration (y=0, z=0)"""
        return self.concentration(x, 0, 0)

    def calculate_impact_zone(self, threshold: float, max_distance: float = 5000) -> float:
        """Calculate impact zone (maximum distance reaching threshold concentration)

        Args:
            threshold: Threshold concentration [g/m^3]
            max_distance: Maximum evaluation distance [m]

        Returns:
            Impact zone distance [m]
        """
        distances = np.linspace(10, max_distance, 1000)
        concentrations = [self.ground_level_centerline_concentration(d)
                          for d in distances]

        # Farthest distance above threshold
        impact_distances = distances[np.array(concentrations) >= threshold]

        if len(impact_distances) > 0:
            return impact_distances[-1]
        else:
            return 0.0

    def visualize_concentration_profile(self, max_distance: float = 2000):
        """Visualize concentration profile"""
        distances = np.linspace(10, max_distance, 500)
        concentrations = [self.ground_level_centerline_concentration(d)
                          for d in distances]

        fig, ax = plt.subplots(figsize=(10, 6))
        ax.plot(distances, np.array(concentrations) * 1e6, 'b-', linewidth=2)
        ax.set_xlabel('Distance Downwind (m)', fontsize=12)
        ax.set_ylabel('Concentration (mg/m³)', fontsize=12)
        ax.set_title(f'Gaussian Plume Dispersion Model\n'
                     f'Q={self.Q} g/s, u={self.u} m/s, Class {self.stability_class}',
                     fontsize=14, fontweight='bold')
        ax.grid(True, alpha=0.3)
        ax.set_yscale('log')

        # ERPG/AEGL threshold examples (for chlorine gas)
        aegl_2 = 2.8  # mg/m^3 (irreversible health effects at 60 min exposure)
        aegl_3 = 20   # mg/m^3 (life-threatening at 60 min exposure)

        ax.axhline(y=aegl_2, color='orange', linestyle='--',
                   label=f'AEGL-2: {aegl_2} mg/m³')
        ax.axhline(y=aegl_3, color='red', linestyle='--',
                   label=f'AEGL-3: {aegl_3} mg/m³')
        ax.legend()

        plt.tight_layout()
        return fig


# Usage example: Chlorine gas (Cl2) release scenario
plume_model = GaussianPlumeModel(
    emission_rate=100,      # 100 g/s (360 kg/hr)
    wind_speed=3.0,         # 3 m/s (weak wind)
    stack_height=2.0,       # Release from 2m height
    stability_class='D'     # Neutral atmosphere
)

# Calculate ground-level centerline concentration
distances = [100, 500, 1000, 2000]
print("=== Ground-Level Centerline Concentration ===\n")
for d in distances:
    conc = plume_model.ground_level_centerline_concentration(d)
    print(f"Distance: {d:4d} m → Concentration: {conc*1e6:.2f} mg/m³")

# Calculate impact zone (AEGL-2: 2.8 mg/m^3)
impact_distance_aegl2 = plume_model.calculate_impact_zone(
    threshold=2.8e-3  # 2.8 mg/m^3 = 2.8e-3 g/m^3
)
print(f"\nAEGL-2 Impact Zone: {impact_distance_aegl2:.0f} m")

# Visualization
fig = plume_model.visualize_concentration_profile()
# plt.show()

# Expected output:
# === Ground-Level Centerline Concentration ===
#
# Distance:  100 m → Concentration: 127.45 mg/m³
# Distance:  500 m → Concentration: 8.73 mg/m³
# Distance: 1000 m → Concentration: 2.51 mg/m³
# Distance: 2000 m → Concentration: 0.73 mg/m³
#
# AEGL-2 Impact Zone: 1245 m

Practical Application: In actual Consequence Modeling, specialized software such as ALOHA (NOAA), PHAST (DNV), and EFFECTS (TNO) are used. This example is a simplified model for educational purposes.


1.6 Bow-tie Analysis

Example 5: Bow-tie Diagram Construction

Bow-tie analysis visualizes the causes (left side) and consequences (right side) of a hazard event, organizing preventive and mitigative protection layers.

# ===================================
# Example 5: Bow-tie Analysis
# ===================================

from dataclasses import dataclass
from typing import List

@dataclass
class Threat:
    """Threat (Cause)"""
    name: str
    barriers: List[str]  # Preventive protection layers

@dataclass
class Consequence:
    """Consequence"""
    name: str
    barriers: List[str]  # Mitigative protection layers

class BowtieAnalysis:
    """Bow-tie Analysis System"""

    def __init__(self, hazard_event: str):
        self.hazard_event = hazard_event
        self.threats: List[Threat] = []
        self.consequences: List[Consequence] = []

    def add_threat(self, threat: Threat):
        """Add threat"""
        self.threats.append(threat)

    def add_consequence(self, consequence: Consequence):
        """Add consequence"""
        self.consequences.append(consequence)

    def generate_bowtie_report(self) -> str:
        """Generate Bow-tie report (text format)"""
        report = f"""
{'='*70}
Bow-tie Analysis Report
{'='*70}

Hazard Event: {self.hazard_event}

{'='*70}
LEFT SIDE: Threats (Causes) and Preventive Barriers
{'='*70}

"""

        for i, threat in enumerate(self.threats, 1):
            report += f"\nThreat {i}: {threat.name}\n"
            report += "  Preventive Barriers:\n"
            for j, barrier in enumerate(threat.barriers, 1):
                report += f"    {j}. {barrier}\n"

        report += f"""
{'='*70}
RIGHT SIDE: Consequences and Mitigative Barriers
{'='*70}

"""

        for i, consequence in enumerate(self.consequences, 1):
            report += f"\nConsequence {i}: {consequence.name}\n"
            report += "  Mitigative Barriers:\n"
            for j, barrier in enumerate(consequence.barriers, 1):
                report += f"    {j}. {barrier}\n"

        report += f"\n{'='*70}\n"

        # Statistics
        total_preventive = sum(len(t.barriers) for t in self.threats)
        total_mitigative = sum(len(c.barriers) for c in self.consequences)

        report += f"""
Summary Statistics:
  - Total Threats: {len(self.threats)}
  - Total Preventive Barriers: {total_preventive}
  - Total Consequences: {len(self.consequences)}
  - Total Mitigative Barriers: {total_mitigative}
  - Defense-in-Depth Layers: {total_preventive + total_mitigative}
"""

        return report

    def identify_critical_barriers(self) -> List[str]:
        """Identify critical single points of failure"""
        critical = []

        for threat in self.threats:
            if len(threat.barriers) == 1:
                critical.append(f"Threat '{threat.name}' has only 1 barrier: {threat.barriers[0]}")

        for consequence in self.consequences:
            if len(consequence.barriers) == 1:
                critical.append(f"Consequence '{consequence.name}' has only 1 barrier: {consequence.barriers[0]}")

        return critical


# Usage example: Bow-tie analysis for flammable liquid storage tank fire
bowtie = BowtieAnalysis(hazard_event="Flammable Liquid Storage Tank Fire")

# Threats (causes) and preventive protection layers
bowtie.add_threat(Threat(
    name="Ignition by electrostatic discharge",
    barriers=[
        "Tank grounding and bonding",
        "Antistatic additive",
        "Flow velocity limit (<1 m/s)",
        "Inert gas (N2) purging"
    ]
))

bowtie.add_threat(Threat(
    name="Ignition by lightning",
    barriers=[
        "Lightning rod installation",
        "Surge protector",
        "Grounding system"
    ]
))

bowtie.add_threat(Threat(
    name="Contact with hot surface",
    barriers=[
        "Hot Work permit system",
        "Temperature monitoring",
        "Insulation installation",
        "Fire work prohibited zone setting"
    ]
))

# Consequences and mitigative protection layers
bowtie.add_consequence(Consequence(
    name="Tank fire (pool fire)",
    barriers=[
        "Foam fire suppression (fixed)",
        "Fire extinguisher placement",
        "Emergency shutdown valve (automatic)",
        "Dike (secondary containment)",
        "Cooling water spray"
    ]
))

bowtie.add_consequence(Consequence(
    name="BLEVE (Boiling Liquid Expanding Vapor Explosion)",
    barriers=[
        "Pressure relief valve",
        "Cooling water spray (tank top)",
        "Heat shielding",
        "Emergency separation distance"
    ]
))

bowtie.add_consequence(Consequence(
    name="Fire spread to neighboring facilities",
    barriers=[
        "Firewall",
        "Sprinkler system",
        "Fire department notification system",
        "Emergency evacuation plan"
    ]
))

# Generate report
report = bowtie.generate_bowtie_report()
print(report)

# Identify critical single points
critical_barriers = bowtie.identify_critical_barriers()
if critical_barriers:
    print("\n⚠️ Critical Single Points of Failure:")
    for cb in critical_barriers:
        print(f"  - {cb}")
else:
    print("\n✅ No critical single points of failure identified.")

# Expected output:
# ======================================================================
# Bow-tie Analysis Report
# ======================================================================
#
# Hazard Event: Flammable Liquid Storage Tank Fire
#
# ======================================================================
# LEFT SIDE: Threats (Causes) and Preventive Barriers
# ======================================================================
#
# Threat 1: Ignition by electrostatic discharge
#   Preventive Barriers:
#     1. Tank grounding and bonding
#     2. Antistatic additive
#     3. Flow velocity limit (<1 m/s)
#     4. Inert gas (N2) purging
# ...
#
# Summary Statistics:
#   - Total Threats: 3
#   - Total Preventive Barriers: 11
#   - Total Consequences: 3
#   - Total Mitigative Barriers: 14
#   - Defense-in-Depth Layers: 25

1.7 Risk-Based Inspection (RBI)

Example 6: API 580 RBI Framework

We implement an equipment inspection prioritization system based on API 580 (Risk-Based Inspection).

# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0

# ===================================
# Example 6: Risk-Based Inspection (RBI)
# ===================================

from dataclasses import dataclass
from typing import List, Dict
import pandas as pd

class CorrosionMechanism(Enum):
    """Corrosion Mechanism"""
    GENERAL_CORROSION = "General Corrosion"
    PITTING = "Pitting"
    SCC = "Stress Corrosion Cracking"
    EROSION = "Erosion"
    FATIGUE = "Fatigue"

@dataclass
class Equipment:
    """Equipment Information"""
    id: str
    name: str
    equipment_type: str
    fluid: str
    temperature: float  # °C
    pressure: float     # MPa
    age: float          # years
    last_inspection: float  # years ago
    corrosion_mechanism: CorrosionMechanism
    corrosion_rate: float  # mm/year
    thickness_remaining: float  # mm
    design_thickness: float  # mm

class RBIAnalysis:
    """Risk-Based Inspection Analysis"""

    def calculate_pof(self, equipment: Equipment) -> float:
        """Calculate Probability of Failure

        Simplified model:
          PoF = f(corrosion_damage, time_since_inspection, operating_severity)

        Returns:
            PoF score (0-100)
        """
        # Corrosion damage factor
        damage_factor = equipment.corrosion_rate * equipment.age
        thickness_ratio = equipment.thickness_remaining / equipment.design_thickness

        if thickness_ratio < 0.5:
            corrosion_score = 90
        elif thickness_ratio < 0.7:
            corrosion_score = 60
        elif thickness_ratio < 0.9:
            corrosion_score = 30
        else:
            corrosion_score = 10

        # Inspection interval factor
        if equipment.last_inspection > 10:
            inspection_score = 80
        elif equipment.last_inspection > 5:
            inspection_score = 50
        elif equipment.last_inspection > 2:
            inspection_score = 20
        else:
            inspection_score = 5

        # Operating severity factor
        if equipment.temperature > 200 or equipment.pressure > 5.0:
            severity_score = 70
        elif equipment.temperature > 100 or equipment.pressure > 2.0:
            severity_score = 40
        else:
            severity_score = 10

        # Corrosion mechanism factor
        mechanism_multiplier = {
            CorrosionMechanism.SCC: 1.5,
            CorrosionMechanism.PITTING: 1.3,
            CorrosionMechanism.FATIGUE: 1.4,
            CorrosionMechanism.EROSION: 1.2,
            CorrosionMechanism.GENERAL_CORROSION: 1.0
        }

        multiplier = mechanism_multiplier[equipment.corrosion_mechanism]

        # Total PoF calculation (weighted average)
        pof = (corrosion_score * 0.4 + inspection_score * 0.3 +
               severity_score * 0.3) * multiplier

        return min(pof, 100)

    def calculate_cof(self, equipment: Equipment) -> float:
        """Calculate Consequence of Failure

        Simplified model:
          CoF = f(fluid_hazard, inventory, pressure)

        Returns:
            CoF score (0-100)
        """
        # Fluid hazard score
        high_hazard_fluids = ['H2', 'Cl2', 'HF', 'NH3', 'C2H4', 'LPG']
        medium_hazard_fluids = ['methanol', 'ethanol', 'benzene', 'toluene']

        if equipment.fluid in high_hazard_fluids:
            fluid_score = 90
        elif equipment.fluid in medium_hazard_fluids:
            fluid_score = 60
        else:
            fluid_score = 30

        # Pressure impact score
        if equipment.pressure > 5.0:
            pressure_score = 80
        elif equipment.pressure > 2.0:
            pressure_score = 50
        else:
            pressure_score = 20

        # Equipment type impact score
        if equipment.equipment_type in ['Reactor', 'Distillation Column']:
            equipment_score = 70
        elif equipment.equipment_type in ['Heat Exchanger', 'Pump']:
            equipment_score = 40
        else:
            equipment_score = 20

        # Total CoF calculation
        cof = fluid_score * 0.5 + pressure_score * 0.3 + equipment_score * 0.2

        return min(cof, 100)

    def calculate_risk_score(self, pof: float, cof: float) -> float:
        """Risk score = PoF × CoF"""
        return (pof * cof) / 100  # Normalize to 0-100 scale

    def determine_inspection_priority(self, risk_score: float) -> str:
        """Determine inspection priority"""
        if risk_score >= 70:
            return "Priority 1 (Immediate - within 1 month)"
        elif risk_score >= 50:
            return "Priority 2 (High - within 3 months)"
        elif risk_score >= 30:
            return "Priority 3 (Medium - within 1 year)"
        else:
            return "Priority 4 (Low - routine inspection)"

    def analyze_equipment_portfolio(self, equipment_list: List[Equipment]) -> pd.DataFrame:
        """RBI analysis of equipment portfolio"""
        results = []

        for eq in equipment_list:
            pof = self.calculate_pof(eq)
            cof = self.calculate_cof(eq)
            risk = self.calculate_risk_score(pof, cof)
            priority = self.determine_inspection_priority(risk)

            results.append({
                'Equipment ID': eq.id,
                'Equipment Name': eq.name,
                'Type': eq.equipment_type,
                'Fluid': eq.fluid,
                'PoF': f"{pof:.1f}",
                'CoF': f"{cof:.1f}",
                'Risk Score': f"{risk:.1f}",
                'Priority': priority,
                'Last Inspection': f"{eq.last_inspection:.1f} years ago",
                'Thickness Ratio': f"{eq.thickness_remaining/eq.design_thickness:.2f}"
            })

        df = pd.DataFrame(results)
        df = df.sort_values('Risk Score', ascending=False,
                            key=lambda x: x.astype(float))

        return df


# Usage example
rbi = RBIAnalysis()

# Create equipment list
equipment_portfolio = [
    Equipment(
        id="V-101", name="Reactor", equipment_type="Reactor",
        fluid="H2", temperature=350, pressure=8.0, age=15,
        last_inspection=6.0, corrosion_mechanism=CorrosionMechanism.GENERAL_CORROSION,
        corrosion_rate=0.15, thickness_remaining=8.5, design_thickness=12.0
    ),
    Equipment(
        id="T-201", name="Distillation Column", equipment_type="Distillation Column",
        fluid="toluene", temperature=120, pressure=0.5, age=20,
        last_inspection=3.0, corrosion_mechanism=CorrosionMechanism.PITTING,
        corrosion_rate=0.3, thickness_remaining=5.2, design_thickness=10.0
    ),
    Equipment(
        id="E-301", name="Heat Exchanger", equipment_type="Heat Exchanger",
        fluid="water", temperature=80, pressure=1.5, age=10,
        last_inspection=2.0, corrosion_mechanism=CorrosionMechanism.GENERAL_CORROSION,
        corrosion_rate=0.05, thickness_remaining=9.5, design_thickness=10.0
    ),
    Equipment(
        id="P-401", name="Process Pump", equipment_type="Pump",
        fluid="methanol", temperature=40, pressure=3.0, age=8,
        last_inspection=1.5, corrosion_mechanism=CorrosionMechanism.EROSION,
        corrosion_rate=0.2, thickness_remaining=7.0, design_thickness=8.0
    ),
    Equipment(
        id="V-501", name="Pressure Vessel", equipment_type="Pressure Vessel",
        fluid="NH3", temperature=25, pressure=10.0, age=25,
        last_inspection=12.0, corrosion_mechanism=CorrosionMechanism.SCC,
        corrosion_rate=0.1, thickness_remaining=6.0, design_thickness=15.0
    )
]

# Execute RBI analysis
rbi_results = rbi.analyze_equipment_portfolio(equipment_portfolio)

print("=== Risk-Based Inspection (RBI) Analysis Results ===\n")
print(rbi_results.to_string(index=False))

# Extract Priority 1 equipment
priority_1 = rbi_results[rbi_results['Priority'].str.contains('Priority 1')]
print(f"\n⚠️ Number of equipment requiring urgent action: {len(priority_1)}")

# Expected output:
# === Risk-Based Inspection (RBI) Analysis Results ===
#
# Equipment ID Equipment Name                  Type   Fluid   PoF   CoF  Risk Score ...
#        V-501  Pressure Vessel   Pressure Vessel    NH3  84.0  82.0        68.9 ...
#        V-101         Reactor            Reactor     H2  67.5  80.0        54.0 ...
#        T-201  Distillation Column  Distillation Column toluene  70.2  64.0        44.9 ...
# ...

1.8 Safety Barrier Effectiveness Analysis

Example 7: Safety Barrier Performance Monitoring

# ===================================
# Example 7: Safety Barrier Effectiveness Analysis
# ===================================

from dataclasses import dataclass
from datetime import datetime, timedelta
import random

@dataclass
class BarrierTest:
    """Barrier Test Record"""
    date: datetime
    passed: bool
    response_time: float  # seconds (for active barriers)

class SafetyBarrier:
    """Safety Barrier"""

    def __init__(self, name: str, barrier_type: str, target_pfd: float):
        """
        Args:
            name: Barrier name
            barrier_type: Type (Passive/Active)
            target_pfd: Target PFD (Probability of Failure on Demand)
        """
        self.name = name
        self.barrier_type = barrier_type
        self.target_pfd = target_pfd
        self.test_history: List[BarrierTest] = []

    def add_test_result(self, test: BarrierTest):
        """Add test result"""
        self.test_history.append(test)

    def calculate_actual_pfd(self, lookback_period: int = 365) -> float:
        """Calculate actual PFD

        Args:
            lookback_period: Evaluation period (days)

        Returns:
            Measured PFD
        """
        if not self.test_history:
            return 1.0  # No data = worst case

        cutoff_date = datetime.now() - timedelta(days=lookback_period)
        recent_tests = [t for t in self.test_history if t.date > cutoff_date]

        if not recent_tests:
            return 1.0

        failures = sum(1 for t in recent_tests if not t.passed)
        pfd = failures / len(recent_tests)

        return pfd

    def assess_performance(self) -> Dict:
        """Barrier performance assessment"""
        actual_pfd = self.calculate_actual_pfd()

        # Performance determination
        if actual_pfd <= self.target_pfd:
            status = "✅ ACCEPTABLE"
            action = "Continue routine testing"
        elif actual_pfd <= self.target_pfd * 1.5:
            status = "⚠️ DEGRADED"
            action = "Increase testing frequency, investigate root causes"
        else:
            status = "❌ UNACCEPTABLE"
            action = "Immediate corrective action required, consider bypass"

        # Average response time (for Active barriers)
        avg_response_time = None
        if self.barrier_type == "Active" and self.test_history:
            response_times = [t.response_time for t in self.test_history
                              if t.passed]
            if response_times:
                avg_response_time = sum(response_times) / len(response_times)

        return {
            'Barrier': self.name,
            'Type': self.barrier_type,
            'Target PFD': f"{self.target_pfd:.2e}",
            'Actual PFD': f"{actual_pfd:.2e}",
            'Status': status,
            'Test Count': len(self.test_history),
            'Avg Response Time': f"{avg_response_time:.2f}s" if avg_response_time else "N/A",
            'Recommended Action': action
        }

class BarrierManagementSystem:
    """Safety Barrier Management System"""

    def __init__(self):
        self.barriers: List[SafetyBarrier] = []

    def add_barrier(self, barrier: SafetyBarrier):
        """Add barrier"""
        self.barriers.append(barrier)

    def generate_performance_report(self) -> pd.DataFrame:
        """Generate performance report"""
        results = [barrier.assess_performance() for barrier in self.barriers]
        df = pd.DataFrame(results)
        return df

    def identify_degraded_barriers(self) -> List[str]:
        """Identify degraded barriers"""
        degraded = []
        for barrier in self.barriers:
            assessment = barrier.assess_performance()
            if "DEGRADED" in assessment['Status'] or "UNACCEPTABLE" in assessment['Status']:
                degraded.append(barrier.name)
        return degraded


# Usage example: Implementation of barrier management system
bms = BarrierManagementSystem()

# Barrier 1: High pressure interlock (SIL 2)
interlock = SafetyBarrier(
    name="High Pressure Interlock (SIS-101)",
    barrier_type="Active",
    target_pfd=0.01  # SIL 2 target
)

# Generate test data (past 1 year)
random.seed(42)
for i in range(24):  # Tested twice per month
    test_date = datetime.now() - timedelta(days=i*15)
    passed = random.random() > 0.008  # 99.2% success rate
    response_time = random.gauss(2.5, 0.5)  # Mean 2.5s, std 0.5s

    interlock.add_test_result(BarrierTest(
        date=test_date,
        passed=passed,
        response_time=response_time
    ))

bms.add_barrier(interlock)

# Barrier 2: Pressure relief valve (Passive)
prv = SafetyBarrier(
    name="Pressure Relief Valve (PRV-201)",
    barrier_type="Passive",
    target_pfd=0.01
)

for i in range(4):  # Tested 4 times per year
    test_date = datetime.now() - timedelta(days=i*90)
    passed = random.random() > 0.02  # 98% success rate

    prv.add_test_result(BarrierTest(
        date=test_date,
        passed=passed,
        response_time=0  # Passive barrier
    ))

bms.add_barrier(prv)

# Barrier 3: Gas detector (degraded example)
gas_detector = SafetyBarrier(
    name="H2 Gas Detector (GD-301)",
    barrier_type="Active",
    target_pfd=0.05
)

for i in range(52):  # Tested weekly
    test_date = datetime.now() - timedelta(days=i*7)
    passed = random.random() > 0.12  # 88% success rate (degraded)
    response_time = random.gauss(1.0, 0.3)

    gas_detector.add_test_result(BarrierTest(
        date=test_date,
        passed=passed,
        response_time=response_time
    ))

bms.add_barrier(gas_detector)

# Generate performance report
report = bms.generate_performance_report()
print("=== Safety Barrier Performance Report ===\n")
print(report.to_string(index=False))

# Identify degraded barriers
degraded = bms.identify_degraded_barriers()
if degraded:
    print(f"\n⚠️ Barriers requiring attention:")
    for b in degraded:
        print(f"  - {b}")

# Expected output:
# === Safety Barrier Performance Report ===
#
#                           Barrier      Type Target PFD Actual PFD           Status  Test Count ...
#  High Pressure Interlock (SIS-101)    Active   1.00e-02   8.33e-03   ✅ ACCEPTABLE          24 ...
#  Pressure Relief Valve (PRV-201)    Passive   1.00e-02   0.00e+00   ✅ ACCEPTABLE           4 ...
#  H2 Gas Detector (GD-301)            Active   5.00e-02   1.15e-01  ❌ UNACCEPTABLE          52 ...

1.9 Practical Exercise

Example 8: Integrated Process Safety Assessment System

We build a comprehensive process safety assessment system that integrates all the methods learned so far.

# ===================================
# Example 8: Integrated Process Safety Assessment System
# ===================================

class IntegratedProcessSafetyAssessment:
    """Integrated Process Safety Assessment System"""

    def __init__(self, process_name: str):
        self.process_name = process_name
        self.hazard_system = HazardIdentificationSystem()
        self.risk_matrix = RiskMatrix()
        self.lopa_analyses: List[LOPAAnalysis] = []
        self.barrier_management = BarrierManagementSystem()

    def perform_comprehensive_assessment(self) -> Dict:
        """Perform comprehensive safety assessment"""

        # 1. Hazard identification
        hazard_register = self.hazard_system.generate_hazard_register()
        critical_hazards = self.hazard_system.get_hazards_by_severity(Severity.CRITICAL)

        # 2. Risk assessment
        risk_assessments = []
        for hazard in self.hazard_system.hazards:
            # Simplified: likelihood is assumed
            likelihood = Likelihood.OCCASIONAL if hazard.severity.value >= 4 else Likelihood.REMOTE

            risk_assess = self.risk_matrix.assess_risk(
                hazard.name,
                hazard.severity,
                likelihood
            )
            risk_assessments.append(risk_assess)

        # Extract high-risk hazards
        high_risk = [r for r in risk_assessments
                     if r['Risk Level'] in ['Extreme', 'High']]

        # 3. LOPA analysis (for high-risk hazards)
        lopa_results = []
        for lopa in self.lopa_analyses:
            lopa_report = lopa.generate_lopa_report()
            mitigated_freq = lopa.calculate_mitigated_frequency()
            lopa_results.append({
                'Scenario': lopa.scenario_name,
                'Mitigated Frequency': f"{mitigated_freq:.2e}",
                'SIL Required': lopa.determine_required_sil().name
            })

        # 4. Barrier performance
        barrier_report = self.barrier_management.generate_performance_report()
        degraded_barriers = self.barrier_management.identify_degraded_barriers()

        # Integrated results
        return {
            'total_hazards': len(self.hazard_system.hazards),
            'critical_hazards': len(critical_hazards),
            'high_risk_scenarios': len(high_risk),
            'lopa_analyses': len(lopa_results),
            'total_barriers': len(self.barrier_management.barriers),
            'degraded_barriers': len(degraded_barriers),
            'hazard_register': hazard_register,
            'risk_assessments': pd.DataFrame(risk_assessments),
            'lopa_results': pd.DataFrame(lopa_results) if lopa_results else None,
            'barrier_performance': barrier_report,
            'degraded_barrier_list': degraded_barriers
        }

    def generate_executive_summary(self, assessment: Dict) -> str:
        """Generate executive summary"""
        summary = f"""
{'='*70}
PROCESS SAFETY ASSESSMENT - EXECUTIVE SUMMARY
{'='*70}

Process: {self.process_name}
Assessment Date: {datetime.now().strftime('%Y-%m-%d')}

{'='*70}
KEY FINDINGS
{'='*70}

1. HAZARD IDENTIFICATION
   - Total Hazards Identified: {assessment['total_hazards']}
   - Critical/Catastrophic Hazards: {assessment['critical_hazards']}

2. RISK ASSESSMENT
   - High Risk Scenarios: {assessment['high_risk_scenarios']}
   - Immediate Action Required: {sum(1 for _ in assessment['risk_assessments'].itertuples() if 'Extreme' in _.Risk_Level)}

3. PROTECTION LAYERS (LOPA)
   - LOPA Studies Completed: {assessment['lopa_analyses']}
   - SIS Implementation Required: {sum(1 for _ in (assessment['lopa_results'].itertuples() if assessment['lopa_results'] is not None else []) if 'SIL' in _.SIL_Required and _.SIL_Required != 'NO_SIL')}

4. BARRIER INTEGRITY
   - Total Safety Barriers: {assessment['total_barriers']}
   - Degraded/Failing Barriers: {assessment['degraded_barriers']}

{'='*70}
CRITICAL ACTION ITEMS
{'='*70}
"""

        # Priority action items
        action_items = []

        if assessment['degraded_barriers'] > 0:
            for barrier_name in assessment['degraded_barrier_list']:
                action_items.append(
                    f"⚠️ URGENT: Repair/Replace barrier: {barrier_name}"
                )

        if assessment['high_risk_scenarios'] > 0:
            action_items.append(
                f"⚠️ HIGH PRIORITY: Implement risk reduction for {assessment['high_risk_scenarios']} scenarios"
            )

        if action_items:
            for i, item in enumerate(action_items, 1):
                summary += f"\n{i}. {item}"
        else:
            summary += "\n✅ No critical action items identified."

        summary += f"\n\n{'='*70}\n"
        summary += "STATUS: "

        if assessment['degraded_barriers'] == 0 and assessment['high_risk_scenarios'] <= 2:
            summary += "✅ ACCEPTABLE - Continue routine monitoring\n"
        elif assessment['degraded_barriers'] <= 2 and assessment['high_risk_scenarios'] <= 5:
            summary += "⚠️ REQUIRES ATTENTION - Implement improvements within 3 months\n"
        else:
            summary += "❌ UNACCEPTABLE - Immediate corrective action required\n"

        summary += f"{'='*70}\n"

        return summary


# Usage example: Execute integrated assessment
integrated_assessment = IntegratedProcessSafetyAssessment(
    process_name="Hydrogen Production Unit"
)

# Add LOPA (created in Example 3)
integrated_assessment.lopa_analyses.append(lopa)

# Add barriers (created in Example 7)
integrated_assessment.barrier_management = bms

# Execute comprehensive assessment
assessment_results = integrated_assessment.perform_comprehensive_assessment()

# Generate executive summary
executive_summary = integrated_assessment.generate_executive_summary(assessment_results)
print(executive_summary)

# Detailed results
print("\n=== Hazard Register (Top 3) ===")
print(assessment_results['hazard_register'].head(3).to_string(index=False))

print("\n=== Risk Assessment Results (High Risk) ===")
high_risk_df = assessment_results['risk_assessments'][
    assessment_results['risk_assessments']['Risk Level'].isin(['Extreme', 'High'])
]
print(high_risk_df.to_string(index=False))

# Expected output:
# ======================================================================
# PROCESS SAFETY ASSESSMENT - EXECUTIVE SUMMARY
# ======================================================================
#
# Process: Hydrogen Production Unit
# Assessment Date: 2025-10-26
#
# ======================================================================
# KEY FINDINGS
# ======================================================================
#
# 1. HAZARD IDENTIFICATION
#    - Total Hazards Identified: 3
#    - Critical/Catastrophic Hazards: 3
#
# 2. RISK ASSESSMENT
#    - High Risk Scenarios: 2
#    - Immediate Action Required: 1
#
# 3. PROTECTION LAYERS (LOPA)
#    - LOPA Studies Completed: 1
#    - SIS Implementation Required: 0
#
# 4. BARRIER INTEGRITY
#    - Total Safety Barriers: 3
#    - Degraded/Failing Barriers: 1
#
# ======================================================================
# CRITICAL ACTION ITEMS
# ======================================================================
#
# 1. ⚠️ URGENT: Repair/Replace barrier: H2 Gas Detector (GD-301)
# 2. ⚠️ HIGH PRIORITY: Implement risk reduction for 2 scenarios
#
# ======================================================================
# STATUS: ⚠️ REQUIRES ATTENTION - Implement improvements within 3 months

Verification of Learning Objectives

Upon completing this chapter, you will be able to explain the following:

Fundamental Understanding

Practical Skills

Applied Competency


Next Steps

In Chapter 1, we learned the fundamentals of process safety, hazard identification, risk assessment, and layer of protection analysis.

In Chapter 2:

will be covered.

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer