This chapter covers the fundamentals of Fundamentals of Process Safety, which overview of process safety. You will learn concept of process safety, hazard identification frameworks, and Conduct Layer of Protection Analysis (LOPA).
Learning Objectives
By reading this chapter, you will be able to:
- ✅ Understand the concept of process safety and lessons from major accidents
- ✅ Implement hazard identification frameworks
- ✅ Perform risk assessment using risk matrices (likelihood × consequence)
- ✅ Conduct Layer of Protection Analysis (LOPA) and calculate SIL
- ✅ Visualize risk analysis with Bow-tie diagrams
- ✅ Implement consequence modeling calculations
1.1 Overview of Process Safety
What is Process Safety
Process Safety is a systematic approach to managing major hazards (fires, explosions, toxic gas releases, etc.) in chemical processes. While Occupational Safety focuses on preventing individual injuries, process safety focuses on Equipment Integrity and operational management.
History of Major Accidents
| Accident | Year | Fatalities | Main Cause | Lesson Learned |
|---|---|---|---|---|
| Flixborough | 1974 | 28 | Cyclohexane release due to poor pipe design | Importance of Change Management |
| Bhopal | 1984 | 3,787+ | Methyl isocyanate (MIC) release | Need for multiple protection layers |
| Piper Alpha | 1988 | 167 | Gas leak → fire → explosion | Permit-to-Work system |
| Texas City | 2005 | 15 | Distillation column overflow → explosion | Importance of safety culture |
Key Insight: The common factor in these accidents is that the root causes were not just technical failures but Management System Failures.
1.2 Hazard Identification Framework
Example 1: Implementation of Hazard Identification System
The first step in Process Hazard Analysis (PHA) is systematic hazard identification.
# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
# ===================================
# Example 1: Hazard Identification System
# ===================================
import pandas as pd
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
class HazardCategory(Enum):
"""Hazard Category"""
PHYSICAL = "Physical Hazard"
CHEMICAL = "Chemical Hazard"
BIOLOGICAL = "Biological Hazard"
ERGONOMIC = "Ergonomic Hazard"
class Severity(Enum):
"""Severity Level"""
CATASTROPHIC = 5 # Catastrophic (multiple fatalities)
CRITICAL = 4 # Critical (one or more fatalities)
MARGINAL = 3 # Marginal (serious injury)
NEGLIGIBLE = 2 # Negligible (minor injury)
MINIMAL = 1 # Minimal (first aid only)
@dataclass
class Hazard:
"""Hazard Information"""
id: str
name: str
category: HazardCategory
description: str
potential_causes: List[str]
potential_consequences: List[str]
severity: Severity
existing_safeguards: List[str]
class HazardIdentificationSystem:
"""Hazard Identification System"""
def __init__(self):
self.hazards: List[Hazard] = []
self._initialize_common_hazards()
def _initialize_common_hazards(self):
"""Initialize common chemical process hazards"""
# Hazard 1: High-pressure gas release
self.add_hazard(Hazard(
id="HAZ-001",
name="High Pressure Gas Release",
category=HazardCategory.PHYSICAL,
description="High-pressure hydrogen gas release from reactor",
potential_causes=[
"Gasket degradation in pipe flanges",
"Pipe perforation due to corrosion",
"Valve seat wear",
"Rupture disc burst due to overpressure"
],
potential_consequences=[
"Fire/explosion (if ignition source present)",
"Asphyxiation (oxygen deficiency in confined spaces)",
"Cryogenic burns (in case of LNG)",
"Environmental contamination"
],
severity=Severity.CATASTROPHIC,
existing_safeguards=[
"Gas detector (0.4% LEL setpoint)",
"Emergency shutdown valve (ESD)",
"Pressure relief valve (PRV)",
"Vent system"
]
))
# Hazard 2: Runaway exothermic reaction
self.add_hazard(Hazard(
id="HAZ-002",
name="Runaway Exothermic Reaction",
category=HazardCategory.CHEMICAL,
description="Reaction runaway due to cooling system failure",
potential_causes=[
"Cooling water pump failure",
"Temperature control system failure",
"Localized overheating due to agitator stoppage",
"Excessive catalyst addition"
],
potential_consequences=[
"Reactor rupture",
"Toxic gas release",
"Fire/explosion",
"Cascade disaster to surrounding facilities"
],
severity=Severity.CRITICAL,
existing_safeguards=[
"Independent high temperature interlock (120°C)",
"Emergency cooling system",
"Rupture disc + quencher",
"Raw material emergency shutdown"
]
))
# Hazard 3: Flammable liquid release
self.add_hazard(Hazard(
id="HAZ-003",
name="Flammable Liquid Release",
category=HazardCategory.CHEMICAL,
description="Liquid release from toluene storage tank",
potential_causes=[
"Tank bottom corrosion",
"Overflow (level gauge failure)",
"Loading hose rupture",
"Pipe rupture due to earthquake"
],
potential_consequences=[
"Pool fire",
"Vapor cloud explosion (VCE)",
"Soil and groundwater contamination",
"Health impacts on nearby residents"
],
severity=Severity.CRITICAL,
existing_safeguards=[
"Dike (110% capacity)",
"High level alarm + interlock",
"Leak detection system",
"Foam fire suppression"
]
))
def add_hazard(self, hazard: Hazard):
"""Add hazard"""
self.hazards.append(hazard)
def get_hazards_by_severity(self, min_severity: Severity) -> List[Hazard]:
"""Get hazards with severity greater than or equal to specified level"""
return [h for h in self.hazards if h.severity.value >= min_severity.value]
def generate_hazard_register(self) -> pd.DataFrame:
"""Generate hazard register (list)"""
data = []
for h in self.hazards:
data.append({
'ID': h.id,
'Hazard Name': h.name,
'Category': h.category.value,
'Severity': h.severity.name,
'Severity Score': h.severity.value,
'Main Causes': '; '.join(h.potential_causes[:2]), # First two
'Main Consequences': '; '.join(h.potential_consequences[:2]),
'Existing Safeguards Count': len(h.existing_safeguards)
})
df = pd.DataFrame(data)
return df.sort_values('Severity Score', ascending=False)
# Usage example
hazard_system = HazardIdentificationSystem()
# Extract high-risk hazards
critical_hazards = hazard_system.get_hazards_by_severity(Severity.CRITICAL)
print(f"Number of critical hazards: {len(critical_hazards)}\n")
# Generate hazard register
hazard_register = hazard_system.generate_hazard_register()
print("=== Hazard Register ===")
print(hazard_register.to_string(index=False))
# Expected output:
# Number of critical hazards: 2
#
# === Hazard Register ===
# ID Hazard Name Category Severity Severity Score ...
# HAZ-001 High Pressure Gas Release Physical Hazard CATASTROPHIC 5 ...
# HAZ-002 Runaway Exothermic Reaction Chemical Hazard CRITICAL 4 ...
# HAZ-003 Flammable Liquid Release Chemical Hazard CRITICAL 4 ...
1.3 Fundamentals of Risk Assessment
Risk Matrix Concept
Risk is assessed as a combination of Likelihood and Consequence:
$$ \text{Risk} = \text{Likelihood} \times \text{Consequence} $$
Example 2: Risk Matrix Implementation
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - seaborn>=0.12.0
# ===================================
# Example 2: Risk Matrix
# ===================================
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from enum import Enum
class Likelihood(Enum):
"""Likelihood Level"""
FREQUENT = 5 # Frequent (once per year or more)
PROBABLE = 4 # Probable (once per 2-10 years)
OCCASIONAL = 3 # Occasional (once per 10-100 years)
REMOTE = 2 # Remote (once per 100-1000 years)
IMPROBABLE = 1 # Improbable (less than once per 1000 years)
class RiskLevel(Enum):
"""Risk Level"""
EXTREME = "Extreme"
HIGH = "High"
MEDIUM = "Medium"
LOW = "Low"
class RiskMatrix:
"""Risk Matrix Evaluation System"""
def __init__(self):
# Risk matrix definition (5x5)
# Rows: Severity (1-5), Columns: Likelihood (1-5)
self.matrix = np.array([
[1, 2, 3, 5, 8 ], # Severity 1 (MINIMAL)
[2, 4, 6, 10, 15], # Severity 2 (NEGLIGIBLE)
[4, 8, 12, 18, 25], # Severity 3 (MARGINAL)
[8, 15, 20, 25, 30], # Severity 4 (CRITICAL)
[12, 20, 25, 30, 35] # Severity 5 (CATASTROPHIC)
])
# Risk level thresholds
self.thresholds = {
RiskLevel.EXTREME: 25, # ≥25
RiskLevel.HIGH: 15, # 15-24
RiskLevel.MEDIUM: 8, # 8-14
RiskLevel.LOW: 0 # <8
}
def calculate_risk_score(self, severity: Severity, likelihood: Likelihood) -> int:
"""Calculate risk score"""
return self.matrix[severity.value - 1, likelihood.value - 1]
def determine_risk_level(self, risk_score: int) -> RiskLevel:
"""Determine risk level"""
if risk_score >= self.thresholds[RiskLevel.EXTREME]:
return RiskLevel.EXTREME
elif risk_score >= self.thresholds[RiskLevel.HIGH]:
return RiskLevel.HIGH
elif risk_score >= self.thresholds[RiskLevel.MEDIUM]:
return RiskLevel.MEDIUM
else:
return RiskLevel.LOW
def assess_risk(self, hazard_name: str, severity: Severity,
likelihood: Likelihood) -> Dict:
"""Comprehensive risk assessment"""
risk_score = self.calculate_risk_score(severity, likelihood)
risk_level = self.determine_risk_level(risk_score)
# Response action
if risk_level == RiskLevel.EXTREME:
action = "Immediate action required. Consider operation shutdown."
elif risk_level == RiskLevel.HIGH:
action = "Implement risk reduction measures with priority (within 3 months)."
elif risk_level == RiskLevel.MEDIUM:
action = "Plan risk reduction measures (within 1 year)."
else:
action = "Maintain current safety measures."
return {
'Hazard': hazard_name,
'Severity': severity.name,
'Likelihood': likelihood.name,
'Risk Score': risk_score,
'Risk Level': risk_level.value,
'Recommended Action': action
}
def visualize_matrix(self, assessments: List[Dict] = None):
"""Visualize risk matrix"""
fig, ax = plt.subplots(figsize=(10, 8))
# Create heatmap
sns.heatmap(self.matrix, annot=True, fmt='d', cmap='YlOrRd',
cbar_kws={'label': 'Risk Score'},
xticklabels=['Improbable\n(1)', 'Remote\n(2)', 'Occasional\n(3)',
'Probable\n(4)', 'Frequent\n(5)'],
yticklabels=['Minimal\n(1)', 'Negligible\n(2)', 'Marginal\n(3)',
'Critical\n(4)', 'Catastrophic\n(5)'],
ax=ax)
ax.set_xlabel('Likelihood', fontsize=12, fontweight='bold')
ax.set_ylabel('Severity', fontsize=12, fontweight='bold')
ax.set_title('Process Safety Risk Matrix', fontsize=14, fontweight='bold')
# Plot assessment results
if assessments:
for assess in assessments:
sev_map = {'MINIMAL': 1, 'NEGLIGIBLE': 2, 'MARGINAL': 3,
'CRITICAL': 4, 'CATASTROPHIC': 5}
like_map = {'IMPROBABLE': 1, 'REMOTE': 2, 'OCCASIONAL': 3,
'PROBABLE': 4, 'FREQUENT': 5}
sev_idx = sev_map[assess['Severity']] - 1
like_idx = like_map[assess['Likelihood']] - 1
ax.plot(like_idx + 0.5, sev_idx + 0.5, 'bo', markersize=15,
markeredgecolor='white', markeredgewidth=2)
plt.tight_layout()
return fig
# Usage example
risk_matrix = RiskMatrix()
# Assess multiple hazards
assessments = [
risk_matrix.assess_risk("High Pressure Gas Release", Severity.CATASTROPHIC, Likelihood.OCCASIONAL),
risk_matrix.assess_risk("Runaway Exothermic Reaction", Severity.CRITICAL, Likelihood.REMOTE),
risk_matrix.assess_risk("Flammable Liquid Release", Severity.CRITICAL, Likelihood.OCCASIONAL)
]
# Display results
print("=== Risk Assessment Results ===\n")
for assess in assessments:
print(f"Hazard: {assess['Hazard']}")
print(f" Risk Score: {assess['Risk Score']} ({assess['Risk Level']})")
print(f" Recommended Action: {assess['Recommended Action']}\n")
# Visualization
fig = risk_matrix.visualize_matrix(assessments)
# plt.show() # Auto-display in Jupyter environment
# Expected output:
# === Risk Assessment Results ===
#
# Hazard: High Pressure Gas Release
# Risk Score: 25 (Extreme)
# Recommended Action: Immediate action required. Consider operation shutdown.
#
# Hazard: Runaway Exothermic Reaction
# Risk Score: 15 (High)
# Recommended Action: Implement risk reduction measures with priority (within 3 months).
1.4 Layer of Protection Analysis (LOPA)
LOPA Concept
LOPA (Layer of Protection Analysis) is a method for quantitatively evaluating the effectiveness of Independent Protection Layers (IPLs) for risk scenarios.
Example 3: LOPA Implementation and SIL Calculation
# ===================================
# Example 3: Layer of Protection Analysis (LOPA) and SIL Calculation
# ===================================
from dataclasses import dataclass
from typing import List
import math
@dataclass
class ProtectionLayer:
"""Protection Layer (IPL)"""
name: str
pfd: float # Probability of Failure on Demand
@property
def risk_reduction_factor(self) -> float:
"""Risk Reduction Factor (RRF)"""
return 1.0 / self.pfd if self.pfd > 0 else float('inf')
class SIL(Enum):
"""Safety Integrity Level"""
SIL_4 = (1e-5, 1e-4, "10^-5 to 10^-4")
SIL_3 = (1e-4, 1e-3, "10^-4 to 10^-3")
SIL_2 = (1e-3, 1e-2, "10^-3 to 10^-2")
SIL_1 = (1e-2, 1e-1, "10^-2 to 10^-1")
NO_SIL = (1e-1, 1.0, "> 10^-1")
def __init__(self, lower, upper, range_str):
self.lower = lower
self.upper = upper
self.range_str = range_str
class LOPAAnalysis:
"""Layer of Protection Analysis System"""
def __init__(self, scenario_name: str, initiating_event_frequency: float,
consequence_severity: Severity):
"""
Args:
scenario_name: Scenario name
initiating_event_frequency: Initiating event frequency (events/year)
consequence_severity: Consequence severity
"""
self.scenario_name = scenario_name
self.initiating_event_frequency = initiating_event_frequency
self.consequence_severity = consequence_severity
self.protection_layers: List[ProtectionLayer] = []
def add_protection_layer(self, layer: ProtectionLayer):
"""Add protection layer"""
self.protection_layers.append(layer)
def calculate_mitigated_frequency(self) -> float:
"""Calculate mitigated event frequency"""
total_pfd = self.initiating_event_frequency
for layer in self.protection_layers:
total_pfd *= layer.pfd
return total_pfd
def determine_required_sil(self, tolerable_frequency: float = 1e-4) -> SIL:
"""Determine required SIL level
Args:
tolerable_frequency: Tolerable event frequency (events/year)
"""
current_frequency = self.calculate_mitigated_frequency()
if current_frequency <= tolerable_frequency:
return SIL.NO_SIL
# Additional risk reduction required
required_pfd = tolerable_frequency / self.initiating_event_frequency
# Consider PFD of existing protection layers
for layer in self.protection_layers:
required_pfd /= layer.pfd
# SIL determination
for sil in [SIL.SIL_4, SIL.SIL_3, SIL.SIL_2, SIL.SIL_1]:
if sil.lower <= required_pfd < sil.upper:
return sil
return SIL.NO_SIL
def generate_lopa_report(self, tolerable_frequency: float = 1e-4) -> str:
"""Generate LOPA report"""
mitigated_freq = self.calculate_mitigated_frequency()
required_sil = self.determine_required_sil(tolerable_frequency)
report = f"""
{'='*60}
LOPA Analysis Report
{'='*60}
Scenario: {self.scenario_name}
Consequence Severity: {self.consequence_severity.name}
--- Initiating Event ---
Frequency: {self.initiating_event_frequency:.2e} events/year
--- Independent Protection Layers (IPL) ---
"""
total_rrf = 1.0
for i, layer in enumerate(self.protection_layers, 1):
rrf = layer.risk_reduction_factor
total_rrf *= rrf
report += f"{i}. {layer.name}\n"
report += f" PFD: {layer.pfd:.2e}\n"
report += f" RRF: {rrf:.0f}\n\n"
report += f"""--- Risk Assessment ---
Total Risk Reduction Factor: {total_rrf:.0f}
Mitigated Event Frequency: {mitigated_freq:.2e} events/year
Tolerable Frequency Target: {tolerable_frequency:.2e} events/year
Risk Status: {'ACCEPTABLE' if mitigated_freq <= tolerable_frequency else 'UNACCEPTABLE'}
--- SIL Requirement ---
"""
if required_sil == SIL.NO_SIL:
report += "Required SIL: None (existing IPLs are sufficient)\n"
else:
report += f"Required SIL: {required_sil.name}\n"
report += f"Target PFD Range: {required_sil.range_str}\n"
report += f"\nRecommendation: Implement SIF (Safety Instrumented Function) with {required_sil.name}\n"
report += f"\n{'='*60}\n"
return report
# Usage example: LOPA analysis for reactor overpressure scenario
lopa = LOPAAnalysis(
scenario_name="Reactor overpressure leading to rupture",
initiating_event_frequency=1e-2, # 0.01 events/year (once per 100 years)
consequence_severity=Severity.CATASTROPHIC
)
# Add protection layers
lopa.add_protection_layer(ProtectionLayer(
name="Basic Process Control System (BPCS)",
pfd=1e-1 # 90% effectiveness
))
lopa.add_protection_layer(ProtectionLayer(
name="High Pressure Alarm (operator response)",
pfd=1e-1 # 90% effectiveness
))
lopa.add_protection_layer(ProtectionLayer(
name="Pressure Relief Valve (PRV)",
pfd=1e-2 # 99% effectiveness
))
# Generate LOPA report
report = lopa.generate_lopa_report(tolerable_frequency=1e-5)
print(report)
# Expected output:
# ============================================================
# LOPA Analysis Report
# ============================================================
#
# Scenario: Reactor overpressure leading to rupture
# Consequence Severity: CATASTROPHIC
#
# --- Initiating Event ---
# Frequency: 1.00e-02 events/year
#
# --- Independent Protection Layers (IPL) ---
# 1. Basic Process Control System (BPCS)
# PFD: 1.00e-01
# RRF: 10
#
# 2. High Pressure Alarm (operator response)
# PFD: 1.00e-01
# RRF: 10
#
# 3. Pressure Relief Valve (PRV)
# PFD: 1.00e-02
# RRF: 100
#
# --- Risk Assessment ---
# Total Risk Reduction Factor: 10000
# Mitigated Event Frequency: 1.00e-06 events/year
# Tolerable Frequency Target: 1.00e-05 events/year
#
# Risk Status: ACCEPTABLE
#
# --- SIL Requirement ---
# Required SIL: None (existing IPLs are sufficient)
1.5 Consequence Modeling
Example 4: Gas Dispersion Model (Gaussian Plume)
We implement a Gaussian Plume model to predict the impact zone of toxic gas releases.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# ===================================
# Example 4: Gas Dispersion Model (Gaussian Plume)
# ===================================
import numpy as np
import matplotlib.pyplot as plt
from scipy.special import erf
class GaussianPlumeModel:
"""Gaussian Plume Dispersion Model (Steady State)"""
def __init__(self, emission_rate: float, wind_speed: float,
stack_height: float = 0.0, stability_class: str = 'D'):
"""
Args:
emission_rate: Emission rate [g/s]
wind_speed: Wind speed [m/s]
stack_height: Release height [m]
stability_class: Atmospheric stability class (A-F, Pasquill classification)
"""
self.Q = emission_rate
self.u = wind_speed
self.H = stack_height
self.stability_class = stability_class
def _pasquill_gifford_sigma(self, x: float) -> tuple:
"""Pasquill-Gifford dispersion parameters
Args:
x: Downwind distance [m]
Returns:
(sigma_y, sigma_z): Lateral and vertical dispersion coefficients [m]
"""
# Simplified Pasquill-Gifford formula (stability class D: neutral)
# In actual implementation, coefficients vary by stability_class
coefficients = {
'A': (0.22, 0.20), # Very unstable
'B': (0.16, 0.12), # Unstable
'C': (0.11, 0.08), # Slightly unstable
'D': (0.08, 0.06), # Neutral (default)
'E': (0.06, 0.03), # Slightly stable
'F': (0.04, 0.016) # Stable
}
a_y, a_z = coefficients.get(self.stability_class, (0.08, 0.06))
# Dispersion coefficient calculation (empirical formula)
sigma_y = a_y * x * (1 + 0.0001 * x)**(-0.5)
sigma_z = a_z * x
return sigma_y, sigma_z
def concentration(self, x: float, y: float, z: float) -> float:
"""Calculate concentration at specific position
Args:
x: Downwind distance [m]
y: Crosswind distance [m]
z: Height above ground [m]
Returns:
Concentration [g/m^3]
"""
if x <= 0:
return 0.0
sigma_y, sigma_z = self._pasquill_gifford_sigma(x)
# Gaussian Plume equation
C = (self.Q / (2 * np.pi * self.u * sigma_y * sigma_z)) * \
np.exp(-0.5 * (y / sigma_y)**2) * \
(np.exp(-0.5 * ((z - self.H) / sigma_z)**2) +
np.exp(-0.5 * ((z + self.H) / sigma_z)**2)) # Ground reflection term
return C
def ground_level_centerline_concentration(self, x: float) -> float:
"""Downwind centerline ground-level concentration (y=0, z=0)"""
return self.concentration(x, 0, 0)
def calculate_impact_zone(self, threshold: float, max_distance: float = 5000) -> float:
"""Calculate impact zone (maximum distance reaching threshold concentration)
Args:
threshold: Threshold concentration [g/m^3]
max_distance: Maximum evaluation distance [m]
Returns:
Impact zone distance [m]
"""
distances = np.linspace(10, max_distance, 1000)
concentrations = [self.ground_level_centerline_concentration(d)
for d in distances]
# Farthest distance above threshold
impact_distances = distances[np.array(concentrations) >= threshold]
if len(impact_distances) > 0:
return impact_distances[-1]
else:
return 0.0
def visualize_concentration_profile(self, max_distance: float = 2000):
"""Visualize concentration profile"""
distances = np.linspace(10, max_distance, 500)
concentrations = [self.ground_level_centerline_concentration(d)
for d in distances]
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(distances, np.array(concentrations) * 1e6, 'b-', linewidth=2)
ax.set_xlabel('Distance Downwind (m)', fontsize=12)
ax.set_ylabel('Concentration (mg/m³)', fontsize=12)
ax.set_title(f'Gaussian Plume Dispersion Model\n'
f'Q={self.Q} g/s, u={self.u} m/s, Class {self.stability_class}',
fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.set_yscale('log')
# ERPG/AEGL threshold examples (for chlorine gas)
aegl_2 = 2.8 # mg/m^3 (irreversible health effects at 60 min exposure)
aegl_3 = 20 # mg/m^3 (life-threatening at 60 min exposure)
ax.axhline(y=aegl_2, color='orange', linestyle='--',
label=f'AEGL-2: {aegl_2} mg/m³')
ax.axhline(y=aegl_3, color='red', linestyle='--',
label=f'AEGL-3: {aegl_3} mg/m³')
ax.legend()
plt.tight_layout()
return fig
# Usage example: Chlorine gas (Cl2) release scenario
plume_model = GaussianPlumeModel(
emission_rate=100, # 100 g/s (360 kg/hr)
wind_speed=3.0, # 3 m/s (weak wind)
stack_height=2.0, # Release from 2m height
stability_class='D' # Neutral atmosphere
)
# Calculate ground-level centerline concentration
distances = [100, 500, 1000, 2000]
print("=== Ground-Level Centerline Concentration ===\n")
for d in distances:
conc = plume_model.ground_level_centerline_concentration(d)
print(f"Distance: {d:4d} m → Concentration: {conc*1e6:.2f} mg/m³")
# Calculate impact zone (AEGL-2: 2.8 mg/m^3)
impact_distance_aegl2 = plume_model.calculate_impact_zone(
threshold=2.8e-3 # 2.8 mg/m^3 = 2.8e-3 g/m^3
)
print(f"\nAEGL-2 Impact Zone: {impact_distance_aegl2:.0f} m")
# Visualization
fig = plume_model.visualize_concentration_profile()
# plt.show()
# Expected output:
# === Ground-Level Centerline Concentration ===
#
# Distance: 100 m → Concentration: 127.45 mg/m³
# Distance: 500 m → Concentration: 8.73 mg/m³
# Distance: 1000 m → Concentration: 2.51 mg/m³
# Distance: 2000 m → Concentration: 0.73 mg/m³
#
# AEGL-2 Impact Zone: 1245 m
Practical Application: In actual Consequence Modeling, specialized software such as ALOHA (NOAA), PHAST (DNV), and EFFECTS (TNO) are used. This example is a simplified model for educational purposes.
1.6 Bow-tie Analysis
Example 5: Bow-tie Diagram Construction
Bow-tie analysis visualizes the causes (left side) and consequences (right side) of a hazard event, organizing preventive and mitigative protection layers.
# ===================================
# Example 5: Bow-tie Analysis
# ===================================
from dataclasses import dataclass
from typing import List
@dataclass
class Threat:
"""Threat (Cause)"""
name: str
barriers: List[str] # Preventive protection layers
@dataclass
class Consequence:
"""Consequence"""
name: str
barriers: List[str] # Mitigative protection layers
class BowtieAnalysis:
"""Bow-tie Analysis System"""
def __init__(self, hazard_event: str):
self.hazard_event = hazard_event
self.threats: List[Threat] = []
self.consequences: List[Consequence] = []
def add_threat(self, threat: Threat):
"""Add threat"""
self.threats.append(threat)
def add_consequence(self, consequence: Consequence):
"""Add consequence"""
self.consequences.append(consequence)
def generate_bowtie_report(self) -> str:
"""Generate Bow-tie report (text format)"""
report = f"""
{'='*70}
Bow-tie Analysis Report
{'='*70}
Hazard Event: {self.hazard_event}
{'='*70}
LEFT SIDE: Threats (Causes) and Preventive Barriers
{'='*70}
"""
for i, threat in enumerate(self.threats, 1):
report += f"\nThreat {i}: {threat.name}\n"
report += " Preventive Barriers:\n"
for j, barrier in enumerate(threat.barriers, 1):
report += f" {j}. {barrier}\n"
report += f"""
{'='*70}
RIGHT SIDE: Consequences and Mitigative Barriers
{'='*70}
"""
for i, consequence in enumerate(self.consequences, 1):
report += f"\nConsequence {i}: {consequence.name}\n"
report += " Mitigative Barriers:\n"
for j, barrier in enumerate(consequence.barriers, 1):
report += f" {j}. {barrier}\n"
report += f"\n{'='*70}\n"
# Statistics
total_preventive = sum(len(t.barriers) for t in self.threats)
total_mitigative = sum(len(c.barriers) for c in self.consequences)
report += f"""
Summary Statistics:
- Total Threats: {len(self.threats)}
- Total Preventive Barriers: {total_preventive}
- Total Consequences: {len(self.consequences)}
- Total Mitigative Barriers: {total_mitigative}
- Defense-in-Depth Layers: {total_preventive + total_mitigative}
"""
return report
def identify_critical_barriers(self) -> List[str]:
"""Identify critical single points of failure"""
critical = []
for threat in self.threats:
if len(threat.barriers) == 1:
critical.append(f"Threat '{threat.name}' has only 1 barrier: {threat.barriers[0]}")
for consequence in self.consequences:
if len(consequence.barriers) == 1:
critical.append(f"Consequence '{consequence.name}' has only 1 barrier: {consequence.barriers[0]}")
return critical
# Usage example: Bow-tie analysis for flammable liquid storage tank fire
bowtie = BowtieAnalysis(hazard_event="Flammable Liquid Storage Tank Fire")
# Threats (causes) and preventive protection layers
bowtie.add_threat(Threat(
name="Ignition by electrostatic discharge",
barriers=[
"Tank grounding and bonding",
"Antistatic additive",
"Flow velocity limit (<1 m/s)",
"Inert gas (N2) purging"
]
))
bowtie.add_threat(Threat(
name="Ignition by lightning",
barriers=[
"Lightning rod installation",
"Surge protector",
"Grounding system"
]
))
bowtie.add_threat(Threat(
name="Contact with hot surface",
barriers=[
"Hot Work permit system",
"Temperature monitoring",
"Insulation installation",
"Fire work prohibited zone setting"
]
))
# Consequences and mitigative protection layers
bowtie.add_consequence(Consequence(
name="Tank fire (pool fire)",
barriers=[
"Foam fire suppression (fixed)",
"Fire extinguisher placement",
"Emergency shutdown valve (automatic)",
"Dike (secondary containment)",
"Cooling water spray"
]
))
bowtie.add_consequence(Consequence(
name="BLEVE (Boiling Liquid Expanding Vapor Explosion)",
barriers=[
"Pressure relief valve",
"Cooling water spray (tank top)",
"Heat shielding",
"Emergency separation distance"
]
))
bowtie.add_consequence(Consequence(
name="Fire spread to neighboring facilities",
barriers=[
"Firewall",
"Sprinkler system",
"Fire department notification system",
"Emergency evacuation plan"
]
))
# Generate report
report = bowtie.generate_bowtie_report()
print(report)
# Identify critical single points
critical_barriers = bowtie.identify_critical_barriers()
if critical_barriers:
print("\n⚠️ Critical Single Points of Failure:")
for cb in critical_barriers:
print(f" - {cb}")
else:
print("\n✅ No critical single points of failure identified.")
# Expected output:
# ======================================================================
# Bow-tie Analysis Report
# ======================================================================
#
# Hazard Event: Flammable Liquid Storage Tank Fire
#
# ======================================================================
# LEFT SIDE: Threats (Causes) and Preventive Barriers
# ======================================================================
#
# Threat 1: Ignition by electrostatic discharge
# Preventive Barriers:
# 1. Tank grounding and bonding
# 2. Antistatic additive
# 3. Flow velocity limit (<1 m/s)
# 4. Inert gas (N2) purging
# ...
#
# Summary Statistics:
# - Total Threats: 3
# - Total Preventive Barriers: 11
# - Total Consequences: 3
# - Total Mitigative Barriers: 14
# - Defense-in-Depth Layers: 25
1.7 Risk-Based Inspection (RBI)
Example 6: API 580 RBI Framework
We implement an equipment inspection prioritization system based on API 580 (Risk-Based Inspection).
# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
# ===================================
# Example 6: Risk-Based Inspection (RBI)
# ===================================
from dataclasses import dataclass
from typing import List, Dict
import pandas as pd
class CorrosionMechanism(Enum):
"""Corrosion Mechanism"""
GENERAL_CORROSION = "General Corrosion"
PITTING = "Pitting"
SCC = "Stress Corrosion Cracking"
EROSION = "Erosion"
FATIGUE = "Fatigue"
@dataclass
class Equipment:
"""Equipment Information"""
id: str
name: str
equipment_type: str
fluid: str
temperature: float # °C
pressure: float # MPa
age: float # years
last_inspection: float # years ago
corrosion_mechanism: CorrosionMechanism
corrosion_rate: float # mm/year
thickness_remaining: float # mm
design_thickness: float # mm
class RBIAnalysis:
"""Risk-Based Inspection Analysis"""
def calculate_pof(self, equipment: Equipment) -> float:
"""Calculate Probability of Failure
Simplified model:
PoF = f(corrosion_damage, time_since_inspection, operating_severity)
Returns:
PoF score (0-100)
"""
# Corrosion damage factor
damage_factor = equipment.corrosion_rate * equipment.age
thickness_ratio = equipment.thickness_remaining / equipment.design_thickness
if thickness_ratio < 0.5:
corrosion_score = 90
elif thickness_ratio < 0.7:
corrosion_score = 60
elif thickness_ratio < 0.9:
corrosion_score = 30
else:
corrosion_score = 10
# Inspection interval factor
if equipment.last_inspection > 10:
inspection_score = 80
elif equipment.last_inspection > 5:
inspection_score = 50
elif equipment.last_inspection > 2:
inspection_score = 20
else:
inspection_score = 5
# Operating severity factor
if equipment.temperature > 200 or equipment.pressure > 5.0:
severity_score = 70
elif equipment.temperature > 100 or equipment.pressure > 2.0:
severity_score = 40
else:
severity_score = 10
# Corrosion mechanism factor
mechanism_multiplier = {
CorrosionMechanism.SCC: 1.5,
CorrosionMechanism.PITTING: 1.3,
CorrosionMechanism.FATIGUE: 1.4,
CorrosionMechanism.EROSION: 1.2,
CorrosionMechanism.GENERAL_CORROSION: 1.0
}
multiplier = mechanism_multiplier[equipment.corrosion_mechanism]
# Total PoF calculation (weighted average)
pof = (corrosion_score * 0.4 + inspection_score * 0.3 +
severity_score * 0.3) * multiplier
return min(pof, 100)
def calculate_cof(self, equipment: Equipment) -> float:
"""Calculate Consequence of Failure
Simplified model:
CoF = f(fluid_hazard, inventory, pressure)
Returns:
CoF score (0-100)
"""
# Fluid hazard score
high_hazard_fluids = ['H2', 'Cl2', 'HF', 'NH3', 'C2H4', 'LPG']
medium_hazard_fluids = ['methanol', 'ethanol', 'benzene', 'toluene']
if equipment.fluid in high_hazard_fluids:
fluid_score = 90
elif equipment.fluid in medium_hazard_fluids:
fluid_score = 60
else:
fluid_score = 30
# Pressure impact score
if equipment.pressure > 5.0:
pressure_score = 80
elif equipment.pressure > 2.0:
pressure_score = 50
else:
pressure_score = 20
# Equipment type impact score
if equipment.equipment_type in ['Reactor', 'Distillation Column']:
equipment_score = 70
elif equipment.equipment_type in ['Heat Exchanger', 'Pump']:
equipment_score = 40
else:
equipment_score = 20
# Total CoF calculation
cof = fluid_score * 0.5 + pressure_score * 0.3 + equipment_score * 0.2
return min(cof, 100)
def calculate_risk_score(self, pof: float, cof: float) -> float:
"""Risk score = PoF × CoF"""
return (pof * cof) / 100 # Normalize to 0-100 scale
def determine_inspection_priority(self, risk_score: float) -> str:
"""Determine inspection priority"""
if risk_score >= 70:
return "Priority 1 (Immediate - within 1 month)"
elif risk_score >= 50:
return "Priority 2 (High - within 3 months)"
elif risk_score >= 30:
return "Priority 3 (Medium - within 1 year)"
else:
return "Priority 4 (Low - routine inspection)"
def analyze_equipment_portfolio(self, equipment_list: List[Equipment]) -> pd.DataFrame:
"""RBI analysis of equipment portfolio"""
results = []
for eq in equipment_list:
pof = self.calculate_pof(eq)
cof = self.calculate_cof(eq)
risk = self.calculate_risk_score(pof, cof)
priority = self.determine_inspection_priority(risk)
results.append({
'Equipment ID': eq.id,
'Equipment Name': eq.name,
'Type': eq.equipment_type,
'Fluid': eq.fluid,
'PoF': f"{pof:.1f}",
'CoF': f"{cof:.1f}",
'Risk Score': f"{risk:.1f}",
'Priority': priority,
'Last Inspection': f"{eq.last_inspection:.1f} years ago",
'Thickness Ratio': f"{eq.thickness_remaining/eq.design_thickness:.2f}"
})
df = pd.DataFrame(results)
df = df.sort_values('Risk Score', ascending=False,
key=lambda x: x.astype(float))
return df
# Usage example
rbi = RBIAnalysis()
# Create equipment list
equipment_portfolio = [
Equipment(
id="V-101", name="Reactor", equipment_type="Reactor",
fluid="H2", temperature=350, pressure=8.0, age=15,
last_inspection=6.0, corrosion_mechanism=CorrosionMechanism.GENERAL_CORROSION,
corrosion_rate=0.15, thickness_remaining=8.5, design_thickness=12.0
),
Equipment(
id="T-201", name="Distillation Column", equipment_type="Distillation Column",
fluid="toluene", temperature=120, pressure=0.5, age=20,
last_inspection=3.0, corrosion_mechanism=CorrosionMechanism.PITTING,
corrosion_rate=0.3, thickness_remaining=5.2, design_thickness=10.0
),
Equipment(
id="E-301", name="Heat Exchanger", equipment_type="Heat Exchanger",
fluid="water", temperature=80, pressure=1.5, age=10,
last_inspection=2.0, corrosion_mechanism=CorrosionMechanism.GENERAL_CORROSION,
corrosion_rate=0.05, thickness_remaining=9.5, design_thickness=10.0
),
Equipment(
id="P-401", name="Process Pump", equipment_type="Pump",
fluid="methanol", temperature=40, pressure=3.0, age=8,
last_inspection=1.5, corrosion_mechanism=CorrosionMechanism.EROSION,
corrosion_rate=0.2, thickness_remaining=7.0, design_thickness=8.0
),
Equipment(
id="V-501", name="Pressure Vessel", equipment_type="Pressure Vessel",
fluid="NH3", temperature=25, pressure=10.0, age=25,
last_inspection=12.0, corrosion_mechanism=CorrosionMechanism.SCC,
corrosion_rate=0.1, thickness_remaining=6.0, design_thickness=15.0
)
]
# Execute RBI analysis
rbi_results = rbi.analyze_equipment_portfolio(equipment_portfolio)
print("=== Risk-Based Inspection (RBI) Analysis Results ===\n")
print(rbi_results.to_string(index=False))
# Extract Priority 1 equipment
priority_1 = rbi_results[rbi_results['Priority'].str.contains('Priority 1')]
print(f"\n⚠️ Number of equipment requiring urgent action: {len(priority_1)}")
# Expected output:
# === Risk-Based Inspection (RBI) Analysis Results ===
#
# Equipment ID Equipment Name Type Fluid PoF CoF Risk Score ...
# V-501 Pressure Vessel Pressure Vessel NH3 84.0 82.0 68.9 ...
# V-101 Reactor Reactor H2 67.5 80.0 54.0 ...
# T-201 Distillation Column Distillation Column toluene 70.2 64.0 44.9 ...
# ...
1.8 Safety Barrier Effectiveness Analysis
Example 7: Safety Barrier Performance Monitoring
# ===================================
# Example 7: Safety Barrier Effectiveness Analysis
# ===================================
from dataclasses import dataclass
from datetime import datetime, timedelta
import random
@dataclass
class BarrierTest:
"""Barrier Test Record"""
date: datetime
passed: bool
response_time: float # seconds (for active barriers)
class SafetyBarrier:
"""Safety Barrier"""
def __init__(self, name: str, barrier_type: str, target_pfd: float):
"""
Args:
name: Barrier name
barrier_type: Type (Passive/Active)
target_pfd: Target PFD (Probability of Failure on Demand)
"""
self.name = name
self.barrier_type = barrier_type
self.target_pfd = target_pfd
self.test_history: List[BarrierTest] = []
def add_test_result(self, test: BarrierTest):
"""Add test result"""
self.test_history.append(test)
def calculate_actual_pfd(self, lookback_period: int = 365) -> float:
"""Calculate actual PFD
Args:
lookback_period: Evaluation period (days)
Returns:
Measured PFD
"""
if not self.test_history:
return 1.0 # No data = worst case
cutoff_date = datetime.now() - timedelta(days=lookback_period)
recent_tests = [t for t in self.test_history if t.date > cutoff_date]
if not recent_tests:
return 1.0
failures = sum(1 for t in recent_tests if not t.passed)
pfd = failures / len(recent_tests)
return pfd
def assess_performance(self) -> Dict:
"""Barrier performance assessment"""
actual_pfd = self.calculate_actual_pfd()
# Performance determination
if actual_pfd <= self.target_pfd:
status = "✅ ACCEPTABLE"
action = "Continue routine testing"
elif actual_pfd <= self.target_pfd * 1.5:
status = "⚠️ DEGRADED"
action = "Increase testing frequency, investigate root causes"
else:
status = "❌ UNACCEPTABLE"
action = "Immediate corrective action required, consider bypass"
# Average response time (for Active barriers)
avg_response_time = None
if self.barrier_type == "Active" and self.test_history:
response_times = [t.response_time for t in self.test_history
if t.passed]
if response_times:
avg_response_time = sum(response_times) / len(response_times)
return {
'Barrier': self.name,
'Type': self.barrier_type,
'Target PFD': f"{self.target_pfd:.2e}",
'Actual PFD': f"{actual_pfd:.2e}",
'Status': status,
'Test Count': len(self.test_history),
'Avg Response Time': f"{avg_response_time:.2f}s" if avg_response_time else "N/A",
'Recommended Action': action
}
class BarrierManagementSystem:
"""Safety Barrier Management System"""
def __init__(self):
self.barriers: List[SafetyBarrier] = []
def add_barrier(self, barrier: SafetyBarrier):
"""Add barrier"""
self.barriers.append(barrier)
def generate_performance_report(self) -> pd.DataFrame:
"""Generate performance report"""
results = [barrier.assess_performance() for barrier in self.barriers]
df = pd.DataFrame(results)
return df
def identify_degraded_barriers(self) -> List[str]:
"""Identify degraded barriers"""
degraded = []
for barrier in self.barriers:
assessment = barrier.assess_performance()
if "DEGRADED" in assessment['Status'] or "UNACCEPTABLE" in assessment['Status']:
degraded.append(barrier.name)
return degraded
# Usage example: Implementation of barrier management system
bms = BarrierManagementSystem()
# Barrier 1: High pressure interlock (SIL 2)
interlock = SafetyBarrier(
name="High Pressure Interlock (SIS-101)",
barrier_type="Active",
target_pfd=0.01 # SIL 2 target
)
# Generate test data (past 1 year)
random.seed(42)
for i in range(24): # Tested twice per month
test_date = datetime.now() - timedelta(days=i*15)
passed = random.random() > 0.008 # 99.2% success rate
response_time = random.gauss(2.5, 0.5) # Mean 2.5s, std 0.5s
interlock.add_test_result(BarrierTest(
date=test_date,
passed=passed,
response_time=response_time
))
bms.add_barrier(interlock)
# Barrier 2: Pressure relief valve (Passive)
prv = SafetyBarrier(
name="Pressure Relief Valve (PRV-201)",
barrier_type="Passive",
target_pfd=0.01
)
for i in range(4): # Tested 4 times per year
test_date = datetime.now() - timedelta(days=i*90)
passed = random.random() > 0.02 # 98% success rate
prv.add_test_result(BarrierTest(
date=test_date,
passed=passed,
response_time=0 # Passive barrier
))
bms.add_barrier(prv)
# Barrier 3: Gas detector (degraded example)
gas_detector = SafetyBarrier(
name="H2 Gas Detector (GD-301)",
barrier_type="Active",
target_pfd=0.05
)
for i in range(52): # Tested weekly
test_date = datetime.now() - timedelta(days=i*7)
passed = random.random() > 0.12 # 88% success rate (degraded)
response_time = random.gauss(1.0, 0.3)
gas_detector.add_test_result(BarrierTest(
date=test_date,
passed=passed,
response_time=response_time
))
bms.add_barrier(gas_detector)
# Generate performance report
report = bms.generate_performance_report()
print("=== Safety Barrier Performance Report ===\n")
print(report.to_string(index=False))
# Identify degraded barriers
degraded = bms.identify_degraded_barriers()
if degraded:
print(f"\n⚠️ Barriers requiring attention:")
for b in degraded:
print(f" - {b}")
# Expected output:
# === Safety Barrier Performance Report ===
#
# Barrier Type Target PFD Actual PFD Status Test Count ...
# High Pressure Interlock (SIS-101) Active 1.00e-02 8.33e-03 ✅ ACCEPTABLE 24 ...
# Pressure Relief Valve (PRV-201) Passive 1.00e-02 0.00e+00 ✅ ACCEPTABLE 4 ...
# H2 Gas Detector (GD-301) Active 5.00e-02 1.15e-01 ❌ UNACCEPTABLE 52 ...
1.9 Practical Exercise
Example 8: Integrated Process Safety Assessment System
We build a comprehensive process safety assessment system that integrates all the methods learned so far.
# ===================================
# Example 8: Integrated Process Safety Assessment System
# ===================================
class IntegratedProcessSafetyAssessment:
"""Integrated Process Safety Assessment System"""
def __init__(self, process_name: str):
self.process_name = process_name
self.hazard_system = HazardIdentificationSystem()
self.risk_matrix = RiskMatrix()
self.lopa_analyses: List[LOPAAnalysis] = []
self.barrier_management = BarrierManagementSystem()
def perform_comprehensive_assessment(self) -> Dict:
"""Perform comprehensive safety assessment"""
# 1. Hazard identification
hazard_register = self.hazard_system.generate_hazard_register()
critical_hazards = self.hazard_system.get_hazards_by_severity(Severity.CRITICAL)
# 2. Risk assessment
risk_assessments = []
for hazard in self.hazard_system.hazards:
# Simplified: likelihood is assumed
likelihood = Likelihood.OCCASIONAL if hazard.severity.value >= 4 else Likelihood.REMOTE
risk_assess = self.risk_matrix.assess_risk(
hazard.name,
hazard.severity,
likelihood
)
risk_assessments.append(risk_assess)
# Extract high-risk hazards
high_risk = [r for r in risk_assessments
if r['Risk Level'] in ['Extreme', 'High']]
# 3. LOPA analysis (for high-risk hazards)
lopa_results = []
for lopa in self.lopa_analyses:
lopa_report = lopa.generate_lopa_report()
mitigated_freq = lopa.calculate_mitigated_frequency()
lopa_results.append({
'Scenario': lopa.scenario_name,
'Mitigated Frequency': f"{mitigated_freq:.2e}",
'SIL Required': lopa.determine_required_sil().name
})
# 4. Barrier performance
barrier_report = self.barrier_management.generate_performance_report()
degraded_barriers = self.barrier_management.identify_degraded_barriers()
# Integrated results
return {
'total_hazards': len(self.hazard_system.hazards),
'critical_hazards': len(critical_hazards),
'high_risk_scenarios': len(high_risk),
'lopa_analyses': len(lopa_results),
'total_barriers': len(self.barrier_management.barriers),
'degraded_barriers': len(degraded_barriers),
'hazard_register': hazard_register,
'risk_assessments': pd.DataFrame(risk_assessments),
'lopa_results': pd.DataFrame(lopa_results) if lopa_results else None,
'barrier_performance': barrier_report,
'degraded_barrier_list': degraded_barriers
}
def generate_executive_summary(self, assessment: Dict) -> str:
"""Generate executive summary"""
summary = f"""
{'='*70}
PROCESS SAFETY ASSESSMENT - EXECUTIVE SUMMARY
{'='*70}
Process: {self.process_name}
Assessment Date: {datetime.now().strftime('%Y-%m-%d')}
{'='*70}
KEY FINDINGS
{'='*70}
1. HAZARD IDENTIFICATION
- Total Hazards Identified: {assessment['total_hazards']}
- Critical/Catastrophic Hazards: {assessment['critical_hazards']}
2. RISK ASSESSMENT
- High Risk Scenarios: {assessment['high_risk_scenarios']}
- Immediate Action Required: {sum(1 for _ in assessment['risk_assessments'].itertuples() if 'Extreme' in _.Risk_Level)}
3. PROTECTION LAYERS (LOPA)
- LOPA Studies Completed: {assessment['lopa_analyses']}
- SIS Implementation Required: {sum(1 for _ in (assessment['lopa_results'].itertuples() if assessment['lopa_results'] is not None else []) if 'SIL' in _.SIL_Required and _.SIL_Required != 'NO_SIL')}
4. BARRIER INTEGRITY
- Total Safety Barriers: {assessment['total_barriers']}
- Degraded/Failing Barriers: {assessment['degraded_barriers']}
{'='*70}
CRITICAL ACTION ITEMS
{'='*70}
"""
# Priority action items
action_items = []
if assessment['degraded_barriers'] > 0:
for barrier_name in assessment['degraded_barrier_list']:
action_items.append(
f"⚠️ URGENT: Repair/Replace barrier: {barrier_name}"
)
if assessment['high_risk_scenarios'] > 0:
action_items.append(
f"⚠️ HIGH PRIORITY: Implement risk reduction for {assessment['high_risk_scenarios']} scenarios"
)
if action_items:
for i, item in enumerate(action_items, 1):
summary += f"\n{i}. {item}"
else:
summary += "\n✅ No critical action items identified."
summary += f"\n\n{'='*70}\n"
summary += "STATUS: "
if assessment['degraded_barriers'] == 0 and assessment['high_risk_scenarios'] <= 2:
summary += "✅ ACCEPTABLE - Continue routine monitoring\n"
elif assessment['degraded_barriers'] <= 2 and assessment['high_risk_scenarios'] <= 5:
summary += "⚠️ REQUIRES ATTENTION - Implement improvements within 3 months\n"
else:
summary += "❌ UNACCEPTABLE - Immediate corrective action required\n"
summary += f"{'='*70}\n"
return summary
# Usage example: Execute integrated assessment
integrated_assessment = IntegratedProcessSafetyAssessment(
process_name="Hydrogen Production Unit"
)
# Add LOPA (created in Example 3)
integrated_assessment.lopa_analyses.append(lopa)
# Add barriers (created in Example 7)
integrated_assessment.barrier_management = bms
# Execute comprehensive assessment
assessment_results = integrated_assessment.perform_comprehensive_assessment()
# Generate executive summary
executive_summary = integrated_assessment.generate_executive_summary(assessment_results)
print(executive_summary)
# Detailed results
print("\n=== Hazard Register (Top 3) ===")
print(assessment_results['hazard_register'].head(3).to_string(index=False))
print("\n=== Risk Assessment Results (High Risk) ===")
high_risk_df = assessment_results['risk_assessments'][
assessment_results['risk_assessments']['Risk Level'].isin(['Extreme', 'High'])
]
print(high_risk_df.to_string(index=False))
# Expected output:
# ======================================================================
# PROCESS SAFETY ASSESSMENT - EXECUTIVE SUMMARY
# ======================================================================
#
# Process: Hydrogen Production Unit
# Assessment Date: 2025-10-26
#
# ======================================================================
# KEY FINDINGS
# ======================================================================
#
# 1. HAZARD IDENTIFICATION
# - Total Hazards Identified: 3
# - Critical/Catastrophic Hazards: 3
#
# 2. RISK ASSESSMENT
# - High Risk Scenarios: 2
# - Immediate Action Required: 1
#
# 3. PROTECTION LAYERS (LOPA)
# - LOPA Studies Completed: 1
# - SIS Implementation Required: 0
#
# 4. BARRIER INTEGRITY
# - Total Safety Barriers: 3
# - Degraded/Failing Barriers: 1
#
# ======================================================================
# CRITICAL ACTION ITEMS
# ======================================================================
#
# 1. ⚠️ URGENT: Repair/Replace barrier: H2 Gas Detector (GD-301)
# 2. ⚠️ HIGH PRIORITY: Implement risk reduction for 2 scenarios
#
# ======================================================================
# STATUS: ⚠️ REQUIRES ATTENTION - Implement improvements within 3 months
Verification of Learning Objectives
Upon completing this chapter, you will be able to explain the following:
Fundamental Understanding
- ✅ Understand the concept of process safety and lessons from major accidents
- ✅ Explain the difference between hazard vs risk
- ✅ Understand the structure of risk matrices
- ✅ Know the principles of LOPA and SIL levels
Practical Skills
- ✅ Implement hazard identification frameworks
- ✅ Perform risk assessment with risk matrices
- ✅ Calculate SIL using LOPA methodology
- ✅ Predict impact zones with Gaussian Plume models
- ✅ Organize protection layers with Bow-tie analysis
- ✅ Determine inspection priorities using RBI methodology
Applied Competency
- ✅ Identify hazards in actual chemical processes
- ✅ Build comprehensive process safety assessment systems
- ✅ Monitor and evaluate safety barrier performance
- ✅ Create executive summaries to support decision-making
Next Steps
In Chapter 1, we learned the fundamentals of process safety, hazard identification, risk assessment, and layer of protection analysis.
In Chapter 2:
- 📋 Detailed HAZOP study methodology
- 📋 Deviation analysis using guide words
- 📋 Quantitative Risk Assessment (QRA)
- 📋 F-N curves and risk criteria
will be covered.