This chapter covers HAZOP and Risk Assessment. You will learn Applying Guide Words (No, Conducting Deviation Analysis, and Implementing Quantitative Risk Assessment (QRA).
Learning Objectives
By completing this chapter, you will master:
- ā Understanding and practicing HAZOP study methodology
- ā Applying Guide Words (No, More, Less, Reverse, etc.)
- ā Conducting Deviation Analysis
- ā Implementing Quantitative Risk Assessment (QRA)
- ā Creating F-N curves and making risk judgments
- ā Auto-generating HAZOP reports
2.1 Fundamentals of HAZOP Study
What is HAZOP?
HAZOP (Hazard and Operability Study) is a methodology for systematically identifying hazards and operational problems at the process design stage. Developed by ICI in the 1960s, it is now a standard for chemical plant design.
HAZOP Principles
HAZOP applies Guide Words to Process Parameters to generate Deviations, then analyzes their causes, consequences, and countermeasures.
$$ \text{Deviation} = \text{Guide Word} + \text{Process Parameter} $$
Key Guide Words
| Guide Word | Meaning | Application Example |
|---|---|---|
| No/Not/None | Complete negation | No flow, No pressure |
| More | Quantitative increase | More flow, More temperature |
| Less | Quantitative decrease | Less flow, Less pressure |
| As Well As | Addition | Impurity as well as product |
| Part Of | Partial absence | Part of composition (missing component) |
| Reverse | Opposite direction | Reverse flow |
| Other Than | Completely different | Other than normal operation |
Example 1: HAZOP Guide Word Engine
# ===================================
# Example 1: HAZOP Guide Word Engine
# ===================================
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
class GuideWord(Enum):
"""HAZOP Guide Words"""
NO = "No/Not/None"
MORE = "More"
LESS = "Less"
AS_WELL_AS = "As Well As"
PART_OF = "Part Of"
REVERSE = "Reverse"
OTHER_THAN = "Other Than"
class ProcessParameter(Enum):
"""Process Parameters"""
FLOW = "Flow"
PRESSURE = "Pressure"
TEMPERATURE = "Temperature"
LEVEL = "Level"
COMPOSITION = "Composition"
VISCOSITY = "Viscosity"
TIME = "Time"
MIXING = "Mixing"
@dataclass
class Deviation:
"""Deviation"""
guide_word: GuideWord
parameter: ProcessParameter
description: str
potential_causes: List[str]
potential_consequences: List[str]
existing_safeguards: List[str]
recommendations: List[str]
risk_rank: str # High/Medium/Low
class HAZOPGuideWordEngine:
"""HAZOP Guide Word Application Engine"""
def __init__(self):
# Guide word and parameter combination rules
self.valid_combinations = {
GuideWord.NO: [ProcessParameter.FLOW, ProcessParameter.PRESSURE,
ProcessParameter.LEVEL, ProcessParameter.MIXING],
GuideWord.MORE: [ProcessParameter.FLOW, ProcessParameter.PRESSURE,
ProcessParameter.TEMPERATURE, ProcessParameter.LEVEL],
GuideWord.LESS: [ProcessParameter.FLOW, ProcessParameter.PRESSURE,
ProcessParameter.TEMPERATURE, ProcessParameter.LEVEL],
GuideWord.REVERSE: [ProcessParameter.FLOW],
GuideWord.AS_WELL_AS: [ProcessParameter.COMPOSITION],
GuideWord.PART_OF: [ProcessParameter.COMPOSITION],
GuideWord.OTHER_THAN: [ProcessParameter.COMPOSITION, ProcessParameter.TIME]
}
def is_valid_combination(self, guide_word: GuideWord,
parameter: ProcessParameter) -> bool:
"""Check if guide word and parameter combination is valid"""
if guide_word in self.valid_combinations:
return parameter in self.valid_combinations[guide_word]
return False
def generate_deviation_description(self, guide_word: GuideWord,
parameter: ProcessParameter,
node_name: str = "Node") -> str:
"""Generate deviation description"""
return f"{guide_word.value} {parameter.value} at {node_name}"
def apply_guide_words(self, node_name: str,
parameters: List[ProcessParameter]) -> List[str]:
"""Apply guide words to a node"""
deviations = []
for param in parameters:
for guide_word in GuideWord:
if self.is_valid_combination(guide_word, param):
deviation = self.generate_deviation_description(
guide_word, param, node_name
)
deviations.append(deviation)
return deviations
# Usage example
hazop_engine = HAZOPGuideWordEngine()
# Node: Reactor feed line
node = "Reactor Feed Line (P&ID: R-101)"
parameters = [
ProcessParameter.FLOW,
ProcessParameter.PRESSURE,
ProcessParameter.TEMPERATURE,
ProcessParameter.COMPOSITION
]
# Apply guide words
deviations = hazop_engine.apply_guide_words(node, parameters)
print(f"=== HAZOP Node: {node} ===\n")
print(f"Total Deviations Generated: {len(deviations)}\n")
print("Deviations:")
for i, dev in enumerate(deviations, 1):
print(f"{i:2d}. {dev}")
# Expected output:
# === HAZOP Node: Reactor Feed Line (P&ID: R-101) ===
#
# Total Deviations Generated: 13
#
# Deviations:
# 1. No/Not/None Flow at Reactor Feed Line (P&ID: R-101)
# 2. More Flow at Reactor Feed Line (P&ID: R-101)
# 3. Less Flow at Reactor Feed Line (P&ID: R-101)
# 4. Reverse Flow at Reactor Feed Line (P&ID: R-101)
# 5. No/Not/None Pressure at Reactor Feed Line (P&ID: R-101)
# 6. More Pressure at Reactor Feed Line (P&ID: R-101)
# 7. Less Pressure at Reactor Feed Line (P&ID: R-101)
# 8. More Temperature at Reactor Feed Line (P&ID: R-101)
# 9. Less Temperature at Reactor Feed Line (P&ID: R-101)
# 10. As Well As Composition at Reactor Feed Line (P&ID: R-101)
# 11. Part Of Composition at Reactor Feed Line (P&ID: R-101)
# 12. Other Than Composition at Reactor Feed Line (P&ID: R-101)
# 13. Other Than Time at Reactor Feed Line (P&ID: R-101)
2.2 Deviation Analysis
Example 2: Deviation Analysis System Implementation
Analyze causes, consequences, existing safeguards, and recommendations for each deviation.
# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
# ===================================
# Example 2: Deviation Analysis System
# ===================================
import pandas as pd
class DeviationAnalyzer:
"""Deviation Analysis System"""
def __init__(self, node_name: str, equipment_type: str):
self.node_name = node_name
self.equipment_type = equipment_type
self.deviations: List[Deviation] = []
self._load_knowledge_base()
def _load_knowledge_base(self):
"""Load knowledge base (typical causes and consequences)"""
# In actual systems, learn from past HAZOP databases
self.knowledge_base = {
("NO", "FLOW"): {
"causes": [
"Pump failure",
"Valve closed (inadvertent or maintenance)",
"Line blockage (solidification, fouling)",
"Suction strainer plugged",
"Upstream vessel empty"
],
"consequences": [
"Reactor starvation (loss of cooling/reactant)",
"Pump cavitation/damage",
"Product off-spec",
"Temperature excursion (if flow is coolant)"
],
"safeguards": [
"Low flow alarm",
"Pump status indication",
"Level indication on upstream vessel"
]
},
("MORE", "FLOW"): {
"causes": [
"Control valve fails open",
"Pump speed increase (VFD malfunction)",
"Upstream pressure increase",
"Parallel line opened unintentionally"
],
"consequences": [
"Reactor flooding",
"Downstream vessel overflow",
"Product off-spec (dilution)",
"Increased pressure drop"
],
"safeguards": [
"High flow alarm",
"Level high alarm on receiving vessel",
"Flow control with override"
]
},
("MORE", "TEMPERATURE"): {
"causes": [
"Cooling system failure",
"Heat exchanger fouling (reduced heat transfer)",
"Exothermic reaction runaway",
"External fire",
"Temperature transmitter failure"
],
"consequences": [
"Thermal decomposition",
"Reactor overpressure",
"Product degradation",
"Equipment damage (metallurgy limits)"
],
"safeguards": [
"High temperature alarm",
"Independent high temperature interlock",
"Pressure relief valve",
"Emergency cooling system"
]
},
("REVERSE", "FLOW"): {
"causes": [
"Pump stopped with downstream pressure higher",
"Check valve failure",
"Incorrect valve lineup",
"Siphon effect"
],
"consequences": [
"Contamination of upstream system",
"Pump damage (reverse rotation)",
"Loss of containment",
"Mixing of incompatible materials"
],
"safeguards": [
"Check valve",
"Backflow prevention valve",
"Pump discharge valve"
]
}
}
def analyze_deviation(self, guide_word: GuideWord,
parameter: ProcessParameter) -> Deviation:
"""Analyze deviation"""
# Retrieve information from knowledge base
key = (guide_word.name, parameter.name)
kb_entry = self.knowledge_base.get(key, {
"causes": ["To be determined in HAZOP meeting"],
"consequences": ["To be determined in HAZOP meeting"],
"safeguards": ["To be determined in HAZOP meeting"]
})
# Risk ranking (simplified)
high_risk_combos = [
("NO", "FLOW"), ("MORE", "TEMPERATURE"), ("MORE", "PRESSURE")
]
risk_rank = "High" if key in high_risk_combos else "Medium"
# Generate recommendations
recommendations = self._generate_recommendations(
guide_word, parameter, kb_entry["safeguards"]
)
deviation = Deviation(
guide_word=guide_word,
parameter=parameter,
description=f"{guide_word.value} {parameter.value} at {self.node_name}",
potential_causes=kb_entry["causes"],
potential_consequences=kb_entry["consequences"],
existing_safeguards=kb_entry["safeguards"],
recommendations=recommendations,
risk_rank=risk_rank
)
self.deviations.append(deviation)
return deviation
def _generate_recommendations(self, guide_word: GuideWord,
parameter: ProcessParameter,
existing_safeguards: List[str]) -> List[str]:
"""Generate recommendations"""
recommendations = []
# Check for missing alarms
if "alarm" not in ' '.join(existing_safeguards).lower():
recommendations.append(
f"Install {parameter.value.lower()} alarm"
)
# Check for missing interlocks
if guide_word in [GuideWord.MORE, GuideWord.NO] and \
"interlock" not in ' '.join(existing_safeguards).lower():
recommendations.append(
f"Consider {parameter.value.lower()} interlock (SIL assessment required)"
)
# Redundancy check
if len(existing_safeguards) < 2:
recommendations.append(
"Review adequacy of protection layers (LOPA recommended)"
)
if not recommendations:
recommendations.append("Existing safeguards adequate - continue routine maintenance")
return recommendations
def generate_hazop_worksheet(self) -> pd.DataFrame:
"""Generate HAZOP worksheet"""
data = []
for dev in self.deviations:
data.append({
'Node': self.node_name,
'Deviation': dev.description,
'Causes': '; '.join(dev.potential_causes[:2]) + '...', # First 2
'Consequences': '; '.join(dev.potential_consequences[:2]) + '...',
'Safeguards': '; '.join(dev.existing_safeguards[:2]) + '...',
'Risk': dev.risk_rank,
'Actions': '; '.join(dev.recommendations[:1]) # Top priority recommendation
})
return pd.DataFrame(data)
# Usage example
analyzer = DeviationAnalyzer(
node_name="Reactor Feed Line (R-101)",
equipment_type="Piping"
)
# Analyze critical deviations
critical_deviations = [
(GuideWord.NO, ProcessParameter.FLOW),
(GuideWord.MORE, ProcessParameter.FLOW),
(GuideWord.MORE, ProcessParameter.TEMPERATURE),
(GuideWord.REVERSE, ProcessParameter.FLOW)
]
for gw, param in critical_deviations:
analyzer.analyze_deviation(gw, param)
# Generate HAZOP worksheet
worksheet = analyzer.generate_hazop_worksheet()
print("=== HAZOP Worksheet ===\n")
print(worksheet.to_string(index=False))
print("\n=== Detailed Analysis: No Flow ===")
no_flow_dev = analyzer.deviations[0]
print(f"\nDeviation: {no_flow_dev.description}")
print(f"\nPotential Causes:")
for cause in no_flow_dev.potential_causes:
print(f" - {cause}")
print(f"\nPotential Consequences:")
for cons in no_flow_dev.potential_consequences:
print(f" - {cons}")
print(f"\nRecommendations:")
for rec in no_flow_dev.recommendations:
print(f" - {rec}")
# Expected output:
# === HAZOP Worksheet ===
#
# Node Deviation Risk ...
# Reactor Feed Line (R-101) No/Not/None Flow at Reactor... High ...
# Reactor Feed Line (R-101) More Flow at Reactor... Medium ...
# Reactor Feed Line (R-101) More Temperature at Reactor... High ...
# Reactor Feed Line (R-101) Reverse Flow at Reactor... Medium ...
2.3 HAZOP Automation System
Example 3: P&ID Analysis and HAZOP Auto-generation
Automatically generate HAZOP nodes and deviations from P&ID (Piping and Instrumentation Diagram) information.
# ===================================
# Example 3: P&ID Analysis and HAZOP Auto-generation
# ===================================
from dataclasses import dataclass
from typing import List, Dict, Set
@dataclass
class PIDElement:
"""P&ID Element"""
id: str
element_type: str # Vessel, Pump, HeatExchanger, Valve, Line
fluid: str
design_pressure: float # MPa
design_temperature: float # °C
connected_to: List[str] # Connected element IDs
@dataclass
class HAZOPNode:
"""HAZOP Node (study unit)"""
id: str
name: str
elements: List[PIDElement]
intent: str # Design Intent
parameters_of_interest: List[ProcessParameter]
class PIDHAZOPAutomation:
"""P&ID-based HAZOP Automation System"""
def __init__(self):
self.pid_elements: Dict[str, PIDElement] = {}
self.hazop_nodes: List[HAZOPNode] = []
def add_pid_element(self, element: PIDElement):
"""Add P&ID element"""
self.pid_elements[element.id] = element
def identify_hazop_nodes(self) -> List[HAZOPNode]:
"""Automatically identify HAZOP nodes
Node division criteria:
- Functional units (reaction, separation, heat exchange)
- Process condition change points (phase change, temperature/pressure change)
- Control loop boundaries
"""
nodes = []
# Simplified: Treat each major equipment as a node
for elem_id, elem in self.pid_elements.items():
if elem.element_type in ['Vessel', 'Pump', 'HeatExchanger']:
# Infer design intent
intent = self._infer_design_intent(elem)
# Determine relevant parameters
parameters = self._determine_parameters(elem)
node = HAZOPNode(
id=f"NODE-{elem_id}",
name=f"{elem.element_type}: {elem_id}",
elements=[elem],
intent=intent,
parameters_of_interest=parameters
)
nodes.append(node)
self.hazop_nodes = nodes
return nodes
def _infer_design_intent(self, element: PIDElement) -> str:
"""Infer design intent"""
intents = {
'Vessel': f"Contain {element.fluid} at {element.design_pressure} MPa, {element.design_temperature}°C",
'Pump': f"Transfer {element.fluid} at design flow rate",
'HeatExchanger': f"Heat/cool {element.fluid} to target temperature"
}
return intents.get(element.element_type, "To be determined")
def _determine_parameters(self, element: PIDElement) -> List[ProcessParameter]:
"""Determine parameters to study"""
param_map = {
'Vessel': [ProcessParameter.LEVEL, ProcessParameter.PRESSURE,
ProcessParameter.TEMPERATURE, ProcessParameter.COMPOSITION],
'Pump': [ProcessParameter.FLOW, ProcessParameter.PRESSURE],
'HeatExchanger': [ProcessParameter.FLOW, ProcessParameter.TEMPERATURE,
ProcessParameter.PRESSURE]
}
return param_map.get(element.element_type,
[ProcessParameter.FLOW, ProcessParameter.PRESSURE])
def generate_hazop_study(self) -> Dict:
"""Generate complete HAZOP study"""
# Identify nodes
if not self.hazop_nodes:
self.identify_hazop_nodes()
# Conduct HAZOP for each node
study_results = []
for node in self.hazop_nodes:
analyzer = DeviationAnalyzer(node.name, node.elements[0].element_type)
# Apply guide words
for param in node.parameters_of_interest:
for guide_word in GuideWord:
if analyzer._load_knowledge_base() or True: # simplified
# Analyze only valid combinations
engine = HAZOPGuideWordEngine()
if engine.is_valid_combination(guide_word, param):
try:
deviation = analyzer.analyze_deviation(guide_word, param)
study_results.append({
'Node': node.name,
'Intent': node.intent,
'Deviation': deviation.description,
'Risk': deviation.risk_rank
})
except:
pass
return {
'total_nodes': len(self.hazop_nodes),
'total_deviations': len(study_results),
'results': pd.DataFrame(study_results)
}
# Usage example: Build P&ID for simple process
pid_system = PIDHAZOPAutomation()
# Add P&ID elements
pid_system.add_pid_element(PIDElement(
id="R-101",
element_type="Vessel",
fluid="Ethylene/Catalyst",
design_pressure=3.0,
design_temperature=150,
connected_to=["P-101", "E-101"]
))
pid_system.add_pid_element(PIDElement(
id="P-101",
element_type="Pump",
fluid="Product mixture",
design_pressure=5.0,
design_temperature=80,
connected_to=["R-101", "T-201"]
))
pid_system.add_pid_element(PIDElement(
id="E-101",
element_type="HeatExchanger",
fluid="Reactor coolant",
design_pressure=1.5,
design_temperature=100,
connected_to=["R-101"]
))
# Automatically identify HAZOP nodes
nodes = pid_system.identify_hazop_nodes()
print(f"=== HAZOP Nodes Identified: {len(nodes)} ===\n")
for node in nodes:
print(f"Node: {node.name}")
print(f" Intent: {node.intent}")
print(f" Parameters: {[p.value for p in node.parameters_of_interest]}\n")
# Auto-generate HAZOP study
study = pid_system.generate_hazop_study()
print(f"\n=== HAZOP Study Summary ===")
print(f"Total Nodes: {study['total_nodes']}")
print(f"Total Deviations: {study['total_deviations']}\n")
print(study['results'].head(10).to_string(index=False))
# Expected output:
# === HAZOP Nodes Identified: 3 ===
#
# Node: Vessel: R-101
# Intent: Contain Ethylene/Catalyst at 3.0 MPa, 150°C
# Parameters: ['Level', 'Pressure', 'Temperature', 'Composition']
#
# Node: Pump: P-101
# Intent: Transfer Product mixture at design flow rate
# Parameters: ['Flow', 'Pressure']
# ...
2.4 Quantitative Risk Assessment (QRA)
Example 4: Event Tree Analysis (ETA)
Probabilistically evaluate the event sequence from initiating event to final outcome.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# ===================================
# Example 4: Event Tree Analysis (ETA)
# ===================================
from dataclasses import dataclass
from typing import List, Optional
import matplotlib.pyplot as plt
import numpy as np
@dataclass
class EventNode:
"""Event tree node"""
name: str
success_probability: float
failure_probability: float
def __post_init__(self):
assert abs(self.success_probability + self.failure_probability - 1.0) < 1e-6, \
"Success + Failure probability must equal 1.0"
@dataclass
class Outcome:
"""Final outcome"""
path: List[str]
probability: float
consequence: str
severity: str # Catastrophic, Critical, Marginal, Negligible
class EventTreeAnalysis:
"""Event Tree Analysis System"""
def __init__(self, initiating_event: str, frequency: float):
"""
Args:
initiating_event: Initiating event name
frequency: Initiating event frequency (events/year)
"""
self.initiating_event = initiating_event
self.frequency = frequency
self.safety_functions: List[EventNode] = []
self.outcomes: List[Outcome] = []
def add_safety_function(self, node: EventNode):
"""Add safety function"""
self.safety_functions.append(node)
def calculate_outcomes(self):
"""Calculate all outcome paths and probabilities"""
self.outcomes = []
# Generate all possible combinations (2^n paths)
n_functions = len(self.safety_functions)
for i in range(2 ** n_functions):
path = []
probability = self.frequency
# Determine Success/Failure using binary representation
binary = format(i, f'0{n_functions}b')
for j, bit in enumerate(binary):
function = self.safety_functions[j]
if bit == '0': # Success
path.append(f"{function.name}: Success")
probability *= function.success_probability
else: # Failure
path.append(f"{function.name}: Failure")
probability *= function.failure_probability
# Determine consequence severity
failure_count = binary.count('1')
consequence, severity = self._determine_consequence(failure_count)
self.outcomes.append(Outcome(
path=path,
probability=probability,
consequence=consequence,
severity=severity
))
def _determine_consequence(self, failure_count: int) -> tuple:
"""Determine consequence based on failure count"""
n = len(self.safety_functions)
if failure_count == 0:
return "Safe shutdown", "Negligible"
elif failure_count == 1:
return "Minor release (contained by secondary barrier)", "Marginal"
elif failure_count == 2:
return "Significant release (requires emergency response)", "Critical"
else:
return "Major release (offsite consequences)", "Catastrophic"
def generate_eta_report(self) -> str:
"""Generate event tree analysis report"""
if not self.outcomes:
self.calculate_outcomes()
report = f"""
{'='*80}
EVENT TREE ANALYSIS (ETA) REPORT
{'='*80}
Initiating Event: {self.initiating_event}
Frequency: {self.frequency:.2e} events/year
Safety Functions:
"""
for i, sf in enumerate(self.safety_functions, 1):
report += f"{i}. {sf.name}\n"
report += f" Success Prob: {sf.success_probability:.4f} ({sf.success_probability*100:.2f}%)\n"
report += f" Failure Prob: {sf.failure_probability:.4f} ({sf.failure_probability*100:.2f}%)\n\n"
report += f"""
{'='*80}
OUTCOMES
{'='*80}
"""
# Sort by severity
severity_order = {"Catastrophic": 0, "Critical": 1, "Marginal": 2, "Negligible": 3}
sorted_outcomes = sorted(self.outcomes,
key=lambda x: (severity_order[x.severity], -x.probability))
for i, outcome in enumerate(sorted_outcomes, 1):
report += f"\nOutcome {i}: {outcome.consequence}\n"
report += f" Severity: {outcome.severity}\n"
report += f" Frequency: {outcome.probability:.2e} events/year\n"
report += f" Path:\n"
for step in outcome.path:
report += f" ā {step}\n"
# Calculate total risk
total_risk = sum(o.probability for o in self.outcomes)
catastrophic_risk = sum(o.probability for o in self.outcomes
if o.severity == "Catastrophic")
report += f"""
{'='*80}
RISK SUMMARY
{'='*80}
Total Event Frequency (all paths): {total_risk:.2e} events/year
Catastrophic Outcome Frequency: {catastrophic_risk:.2e} events/year
Risk Acceptance Criteria (example):
Catastrophic: < 1e-6 events/year {'ā
ACCEPTABLE' if catastrophic_risk < 1e-6 else 'ā UNACCEPTABLE'}
Critical: < 1e-4 events/year
"""
return report
# Usage example: ETA for flammable gas leak scenario
eta = EventTreeAnalysis(
initiating_event="Large flange gasket failure causing H2 release",
frequency=1e-3 # 0.001 events/year (once per 1000 years)
)
# Add safety functions (multiple protection layers)
eta.add_safety_function(EventNode(
name="Gas detection system activates",
success_probability=0.95,
failure_probability=0.05
))
eta.add_safety_function(EventNode(
name="Automatic isolation valve closes",
success_probability=0.98,
failure_probability=0.02
))
eta.add_safety_function(EventNode(
name="Ignition prevention (no ignition sources)",
success_probability=0.90,
failure_probability=0.10
))
# Execute event tree analysis
report = eta.generate_eta_report()
print(report)
# Detailed analysis: Worst case scenario
worst_case = max(eta.outcomes, key=lambda x: x.probability
if x.severity == "Catastrophic" else 0)
print("\n=== WORST CASE SCENARIO ===")
print(f"Consequence: {worst_case.consequence}")
print(f"Frequency: {worst_case.probability:.2e} events/year")
print(f"Return Period: {1/worst_case.probability:.0f} years")
# Expected output:
# ============================================================================
# EVENT TREE ANALYSIS (ETA) REPORT
# ============================================================================
#
# Initiating Event: Large flange gasket failure causing H2 release
# Frequency: 1.00e-03 events/year
#
# Safety Functions:
# 1. Gas detection system activates
# Success Prob: 0.9500 (95.00%)
# Failure Prob: 0.0500 (5.00%)
#
# 2. Automatic isolation valve closes
# Success Prob: 0.9800 (98.00%)
# Failure Prob: 0.0200 (2.00%)
# ...
#
# Outcome 1: Major release (offsite consequences)
# Severity: Catastrophic
# Frequency: 1.00e-07 events/year (All 3 safety functions fail)
# Path:
# ā Gas detection system activates: Failure
# ā Automatic isolation valve closes: Failure
# ā Ignition prevention (no ignition sources): Failure
2.5 Failure Frequency Data and Reliability
Example 5: Generic Failure Rate Database
# Requirements:
# - Python 3.9+
# - pandas>=2.0.0, <2.2.0
# ===================================
# Example 5: Generic Failure Rate Database
# ===================================
import pandas as pd
from dataclasses import dataclass
from typing import Dict
@dataclass
class FailureRateData:
"""Failure rate data"""
equipment_type: str
failure_mode: str
failure_rate: float # failures per million hours (10^-6/hr)
source: str # Data source (OREDA, PDS, etc.)
confidence_factor: float # 1.0 = nominal, >1.0 = conservative
class GenericFailureRateDatabase:
"""Generic failure rate database"""
def __init__(self):
self.database: Dict[str, FailureRateData] = {}
self._initialize_database()
def _initialize_database(self):
"""Initialize database (based on OREDA, PDS, etc. data)"""
failure_rates = [
# Valves
("Isolation Valve", "Fail to close on demand", 5.0, "OREDA 2015", 1.0),
("Control Valve", "Fail open", 10.0, "PDS 2010", 1.0),
("Check Valve", "Reverse flow (fail to close)", 50.0, "API 581", 1.5),
("Pressure Relief Valve", "Fail to open on demand", 10.0, "API 581", 1.0),
# Pumps
("Centrifugal Pump", "Fail to start", 5.0, "OREDA 2015", 1.0),
("Centrifugal Pump", "Fail to run (shutdown)", 50.0, "OREDA 2015", 1.0),
# Instrumentation
("Pressure Transmitter", "Out of calibration", 2.0, "EXIDA", 1.0),
("Temperature Transmitter", "Fail dangerous", 1.5, "EXIDA", 1.0),
("Level Transmitter", "Fail dangerous", 3.0, "EXIDA", 1.0),
("Gas Detector", "Fail to detect", 100.0, "Industry avg", 2.0),
# Safety Systems
("SIS Logic Solver", "Fail dangerous", 0.5, "IEC 61508", 1.0),
("Emergency Shutdown Valve", "Fail to close", 3.0, "API 581", 1.0),
# Vessels & Piping
("Pressure Vessel", "Catastrophic rupture", 0.01, "API 579", 1.0),
("Piping (per km)", "Major leak (>10% cross-section)", 0.5, "UKOPA", 1.0),
("Flange Gasket", "Leak", 10.0, "Industry avg", 1.5),
# Heat Exchangers
("Shell & Tube HX", "Tube leak", 20.0, "OREDA 2015", 1.0)
]
for eq_type, mode, rate, source, cf in failure_rates:
key = f"{eq_type}_{mode}"
self.database[key] = FailureRateData(eq_type, mode, rate, source, cf)
def get_failure_rate(self, equipment_type: str, failure_mode: str,
apply_confidence_factor: bool = True) -> float:
"""Get failure rate
Returns:
Failure rate in failures per year
"""
key = f"{equipment_type}_{failure_mode}"
data = self.database.get(key)
if not data:
raise ValueError(f"No data for {equipment_type} - {failure_mode}")
rate_per_year = data.failure_rate * 8760 / 1e6 # 10^-6/hr ā /year
if apply_confidence_factor:
rate_per_year *= data.confidence_factor
return rate_per_year
def get_pfd_from_test_interval(self, equipment_type: str, failure_mode: str,
test_interval_months: int = 12) -> float:
"""Calculate Probability of Failure on Demand (PFD)
Simplified formula: PFD ā Ī» * T / 2
Ī»: failure rate (/year)
T: test interval (year)
"""
failure_rate = self.get_failure_rate(equipment_type, failure_mode)
test_interval_years = test_interval_months / 12.0
pfd = (failure_rate * test_interval_years) / 2.0
return min(pfd, 1.0) # PFD max 1.0
def generate_reliability_datasheet(self, equipment_list: List[tuple]) -> pd.DataFrame:
"""Generate reliability datasheet
Args:
equipment_list: [(equipment_type, failure_mode, test_interval_months), ...]
"""
data = []
for eq_type, mode, test_interval in equipment_list:
failure_rate = self.get_failure_rate(eq_type, mode)
pfd = self.get_pfd_from_test_interval(eq_type, mode, test_interval)
# SIL capability determination
if pfd < 1e-4:
sil_capability = "SIL 3"
elif pfd < 1e-3:
sil_capability = "SIL 2"
elif pfd < 1e-2:
sil_capability = "SIL 1"
else:
sil_capability = "No SIL"
data.append({
'Equipment': eq_type,
'Failure Mode': mode,
'Failure Rate': f"{failure_rate:.2e} /yr",
'Test Interval': f"{test_interval} months",
'PFD': f"{pfd:.2e}",
'SIL Capability': sil_capability,
'MTTF': f"{1/failure_rate:.1f} years" if failure_rate > 0 else "N/A"
})
return pd.DataFrame(data)
# Usage example
failure_db = GenericFailureRateDatabase()
# Create reliability datasheet
equipment_list = [
("Isolation Valve", "Fail to close on demand", 12),
("Pressure Relief Valve", "Fail to open on demand", 24),
("Gas Detector", "Fail to detect", 3),
("SIS Logic Solver", "Fail dangerous", 12),
("Emergency Shutdown Valve", "Fail to close", 6)
]
reliability_sheet = failure_db.generate_reliability_datasheet(equipment_list)
print("=== Reliability Datasheet ===\n")
print(reliability_sheet.to_string(index=False))
# Get individual failure rate
prv_failure_rate = failure_db.get_failure_rate(
"Pressure Relief Valve",
"Fail to open on demand"
)
print(f"\n=== PRV Reliability ===")
print(f"Failure Rate: {prv_failure_rate:.2e} /year")
print(f"MTTF: {1/prv_failure_rate:.1f} years")
# Expected output:
# === Reliability Datasheet ===
#
# Equipment Failure Mode Failure Rate Test Interval PFD SIL Capability MTTF
# Isolation Valve Fail to close on demand 4.38e-05 /yr 12 months 2.19e-05 SIL 3 22831.1 years
# Pressure Relief Valve Fail to open on demand 8.76e-05 /yr 24 months 8.76e-05 SIL 3 11415.5 years
# Gas Detector Fail to detect 8.76e-04 /yr 3 months 1.09e-04 SIL 3 1141.6 years
# SIS Logic Solver Fail dangerous 4.38e-06 /yr 12 months 2.19e-06 SIL 4 228310.5 years
# Emergency Shutdown Valve Fail to close 2.63e-05 /yr 6 months 6.57e-06 SIL 4 38041.8 years
2.6 F-N Curve and Risk Criteria
Example 6: F-N Curve (Frequency-Number curve)
Create F-N curves to assess societal risk.
# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# ===================================
# Example 6: F-N Curve and Risk Criteria
# ===================================
import matplotlib.pyplot as plt
import numpy as np
from dataclasses import dataclass
from typing import List
@dataclass
class RiskScenario:
"""Risk scenario"""
name: str
frequency: float # events/year
fatalities: int # Expected number of fatalities
class FNCurveAnalysis:
"""F-N Curve Analysis System"""
def __init__(self, facility_name: str):
self.facility_name = facility_name
self.scenarios: List[RiskScenario] = []
# Risk acceptance criteria lines (Dutch criteria example)
self.alarp_upper_limit = lambda N: 1e-3 / (N**2) # Upper ALARP
self.alarp_lower_limit = lambda N: 1e-5 / (N**2) # Lower ALARP
self.negligible_limit = lambda N: 1e-7 / (N**2) # Negligible
def add_scenario(self, scenario: RiskScenario):
"""Add scenario"""
self.scenarios.append(scenario)
def calculate_fn_curve_data(self) -> tuple:
"""Calculate F-N curve data
Returns:
(N_values, F_values): Number of fatalities and cumulative frequency
"""
# Sort by fatalities
sorted_scenarios = sorted(self.scenarios, key=lambda x: x.fatalities)
N_values = []
F_values = []
for scenario in sorted_scenarios:
N = scenario.fatalities
# Cumulative frequency of events with N or more fatalities
F = sum(s.frequency for s in self.scenarios if s.fatalities >= N)
N_values.append(N)
F_values.append(F)
return np.array(N_values), np.array(F_values)
def plot_fn_curve(self, save_path: str = None):
"""Plot F-N curve"""
N, F = self.calculate_fn_curve_data()
fig, ax = plt.subplots(figsize=(10, 8))
# Plot F-N curve
ax.loglog(N, F, 'bo-', linewidth=2, markersize=8,
label='Facility Risk Profile', zorder=5)
# Plot individual scenarios
for scenario in self.scenarios:
ax.plot(scenario.fatalities, scenario.frequency,
'rx', markersize=10, markeredgewidth=2)
ax.annotate(scenario.name,
xy=(scenario.fatalities, scenario.frequency),
xytext=(10, 10), textcoords='offset points',
fontsize=8, alpha=0.7)
# Risk acceptance criteria lines
N_range = np.logspace(0, 3, 100) # 1-1000 people
ax.loglog(N_range, [self.alarp_upper_limit(n) for n in N_range],
'r--', linewidth=2, label='ALARP Upper Limit (Unacceptable)')
ax.loglog(N_range, [self.alarp_lower_limit(n) for n in N_range],
'y--', linewidth=2, label='ALARP Lower Limit')
ax.loglog(N_range, [self.negligible_limit(n) for n in N_range],
'g--', linewidth=2, label='Negligible Risk')
# Fill ALARP region
ax.fill_between(N_range,
[self.alarp_lower_limit(n) for n in N_range],
[self.alarp_upper_limit(n) for n in N_range],
alpha=0.2, color='yellow', label='ALARP Region')
ax.set_xlabel('Number of Fatalities (N)', fontsize=12, fontweight='bold')
ax.set_ylabel('Cumulative Frequency (F) [events/year]', fontsize=12, fontweight='bold')
ax.set_title(f'F-N Curve: {self.facility_name}\nSocietal Risk Assessment',
fontsize=14, fontweight='bold')
ax.grid(True, which='both', alpha=0.3)
ax.legend(loc='upper right')
ax.set_xlim([1, 1000])
ax.set_ylim([1e-8, 1e-2])
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
return fig
def assess_alarp_status(self) -> Dict:
"""Assess ALARP status"""
results = {
'unacceptable': [],
'alarp': [],
'broadly_acceptable': []
}
for scenario in self.scenarios:
N = scenario.fatalities
F = scenario.frequency
if F > self.alarp_upper_limit(N):
results['unacceptable'].append(scenario)
elif F > self.alarp_lower_limit(N):
results['alarp'].append(scenario)
else:
results['broadly_acceptable'].append(scenario)
return results
def generate_alarp_report(self) -> str:
"""Generate ALARP report"""
status = self.assess_alarp_status()
report = f"""
{'='*80}
F-N CURVE ANALYSIS - ALARP ASSESSMENT
{'='*80}
Facility: {self.facility_name}
Total Scenarios Analyzed: {len(self.scenarios)}
{'='*80}
RISK CLASSIFICATION
{'='*80}
UNACCEPTABLE REGION (Above ALARP Upper Limit):
Scenarios: {len(status['unacceptable'])}
"""
if status['unacceptable']:
for s in status['unacceptable']:
report += f" ā {s.name}: F={s.frequency:.2e}/yr, N={s.fatalities}\n"
report += " ACTION: Immediate risk reduction REQUIRED\n"
else:
report += " ā
No scenarios in unacceptable region\n"
report += f"""
ALARP REGION (Risk Reduction As Low As Reasonably Practicable):
Scenarios: {len(status['alarp'])}
"""
if status['alarp']:
for s in status['alarp']:
report += f" ā ļø {s.name}: F={s.frequency:.2e}/yr, N={s.fatalities}\n"
report += " ACTION: Demonstrate ALARP (cost-benefit analysis)\n"
else:
report += " ā
No scenarios in ALARP region\n"
report += f"""
BROADLY ACCEPTABLE REGION (Below ALARP Lower Limit):
Scenarios: {len(status['broadly_acceptable'])}
"""
if status['broadly_acceptable']:
for s in status['broadly_acceptable']:
report += f" ā
{s.name}: F={s.frequency:.2e}/yr, N={s.fatalities}\n"
report += " ACTION: Maintain current safety measures\n"
report += f"\n{'='*80}\n"
return report
# Usage example: F-N curve analysis for LNG receiving terminal
fn_analysis = FNCurveAnalysis(facility_name="LNG Import Terminal")
# Add risk scenarios
fn_analysis.add_scenario(RiskScenario(
name="Small LNG leak (no ignition)",
frequency=1e-2,
fatalities=0
))
fn_analysis.add_scenario(RiskScenario(
name="Medium LNG leak ā jet fire",
frequency=5e-4,
fatalities=2
))
fn_analysis.add_scenario(RiskScenario(
name="Large LNG leak ā pool fire",
frequency=1e-4,
fatalities=10
))
fn_analysis.add_scenario(RiskScenario(
name="Catastrophic tank failure ā VCE",
frequency=1e-6,
fatalities=100
))
fn_analysis.add_scenario(RiskScenario(
name="LNG carrier collision ā BLEVE",
frequency=5e-7,
fatalities=300
))
# Generate ALARP report
alarp_report = fn_analysis.generate_alarp_report()
print(alarp_report)
# Plot F-N curve
fig = fn_analysis.plot_fn_curve()
# plt.show()
# Expected output:
# ============================================================================
# F-N CURVE ANALYSIS - ALARP ASSESSMENT
# ============================================================================
#
# Facility: LNG Import Terminal
# Total Scenarios Analyzed: 5
#
# ============================================================================
# RISK CLASSIFICATION
# ============================================================================
#
# UNACCEPTABLE REGION (Above ALARP Upper Limit):
# Scenarios: 0
# ā
No scenarios in unacceptable region
#
# ALARP REGION (Risk Reduction As Low As Reasonably Practicable):
# Scenarios: 2
# ā ļø Medium LNG leak ā jet fire: F=5.00e-04/yr, N=2
# ā ļø Large LNG leak ā pool fire: F=1.00e-04/yr, N=10
# ACTION: Demonstrate ALARP (cost-benefit analysis)
#
# BROADLY ACCEPTABLE REGION (Below ALARP Lower Limit):
# Scenarios: 2
# ā
Catastrophic tank failure ā VCE: F=1.00e-06/yr, N=100
# ā
LNG carrier collision ā BLEVE: F=5.00e-07/yr, N=300
# ACTION: Maintain current safety measures
2.7 HAZOP Report Auto-generation
Example 7: Comprehensive HAZOP Report Generator
# ===================================
# Example 7: HAZOP Report Auto-generation
# ===================================
from datetime import datetime
import json
class HAZOPReportGenerator:
"""HAZOP study complete report generation system"""
def __init__(self, project_info: Dict):
self.project_info = project_info
self.hazop_nodes = []
self.team_members = []
def add_team_member(self, name: str, role: str):
"""Add team member"""
self.team_members.append({'name': name, 'role': role})
def generate_full_report(self, deviations_by_node: Dict[str, List[Deviation]]) -> str:
"""Generate complete HAZOP report
Args:
deviations_by_node: {node_name: [Deviation, ...], ...}
"""
report = self._generate_cover_page()
report += self._generate_executive_summary(deviations_by_node)
report += self._generate_methodology_section()
report += self._generate_detailed_worksheets(deviations_by_node)
report += self._generate_action_items(deviations_by_node)
report += self._generate_appendices()
return report
def _generate_cover_page(self) -> str:
"""Cover page"""
return f"""
{'='*80}
HAZARD AND OPERABILITY STUDY (HAZOP)
FINAL REPORT
{'='*80}
Project: {self.project_info.get('name', 'N/A')}
Facility: {self.project_info.get('facility', 'N/A')}
Location: {self.project_info.get('location', 'N/A')}
Study Date: {self.project_info.get('study_date', datetime.now().strftime('%Y-%m-%d'))}
Report Date: {datetime.now().strftime('%Y-%m-%d')}
Document No: {self.project_info.get('doc_number', 'HAZOP-001')}
Revision: {self.project_info.get('revision', '0')}
{'='*80}
STUDY TEAM
{'='*80}
"""
def _generate_executive_summary(self, deviations_by_node: Dict) -> str:
"""Executive summary"""
total_deviations = sum(len(devs) for devs in deviations_by_node.values())
high_risk = sum(1 for devs in deviations_by_node.values()
for dev in devs if dev.risk_rank == "High")
summary = f"""
EXECUTIVE SUMMARY
{'='*80}
Study Scope:
- Total HAZOP Nodes: {len(deviations_by_node)}
- Total Deviations: {total_deviations}
- High Risk Scenarios: {high_risk}
Key Findings:
1. {high_risk} high-risk scenarios identified requiring immediate attention
2. All scenarios have been assessed with existing safeguards documented
3. Recommendations provided for risk reduction where needed
Overall Assessment:
"""
if high_risk == 0:
summary += " ā
No high-risk scenarios. Facility design is robust.\n"
elif high_risk <= 3:
summary += f" ā ļø {high_risk} high-risk scenarios require mitigation.\n"
else:
summary += f" ā {high_risk} high-risk scenarios - major design review recommended.\n"
return summary + "\n"
def _generate_methodology_section(self) -> str:
"""Methodology section"""
return f"""
METHODOLOGY
{'='*80}
1. HAZOP Technique:
- Guide Words applied to process parameters
- Systematic deviation analysis
- Cause-Consequence-Safeguard approach
2. Risk Ranking:
- High: Immediate action required
- Medium: Risk reduction should be considered
- Low: Acceptable with existing safeguards
3. Documentation:
- P&ID reviewed: {self.project_info.get('pid_revision', 'Rev 0')}
- Process conditions: {self.project_info.get('process_basis', 'Normal operation')}
"""
def _generate_detailed_worksheets(self, deviations_by_node: Dict) -> str:
"""Detailed worksheets"""
worksheets = f"""
DETAILED HAZOP WORKSHEETS
{'='*80}
"""
for node_name, deviations in deviations_by_node.items():
worksheets += f"\nNode: {node_name}\n"
worksheets += f"{'-'*80}\n\n"
for i, dev in enumerate(deviations, 1):
worksheets += f"Deviation {i}: {dev.description}\n"
worksheets += f"Risk Rank: {dev.risk_rank}\n\n"
worksheets += "Potential Causes:\n"
for cause in dev.potential_causes:
worksheets += f" - {cause}\n"
worksheets += "\nPotential Consequences:\n"
for cons in dev.potential_consequences:
worksheets += f" - {cons}\n"
worksheets += "\nExisting Safeguards:\n"
for sg in dev.existing_safeguards:
worksheets += f" - {sg}\n"
worksheets += "\nRecommendations:\n"
for rec in dev.recommendations:
worksheets += f" - {rec}\n"
worksheets += "\n" + "-"*80 + "\n\n"
return worksheets
def _generate_action_items(self, deviations_by_node: Dict) -> str:
"""Action items"""
actions = f"""
ACTION ITEM REGISTER
{'='*80}
"""
action_id = 1
for node_name, deviations in deviations_by_node.items():
for dev in deviations:
for rec in dev.recommendations:
if "adequate" not in rec.lower(): # Skip no-action recommendations
priority = "HIGH" if dev.risk_rank == "High" else "MEDIUM"
actions += f"Action {action_id:03d}: [{priority}] {rec}\n"
actions += f" Node: {node_name}\n"
actions += f" Deviation: {dev.description}\n"
actions += f" Responsible: TBD\n"
actions += f" Target Date: TBD\n\n"
action_id += 1
return actions
def _generate_appendices(self) -> str:
"""Appendices"""
return f"""
APPENDICES
{'='*80}
Appendix A: P&ID Drawings
Appendix B: Study Team Attendance Records
Appendix C: Risk Matrix Definition
Appendix D: Abbreviations and Definitions
"""
def export_to_json(self, deviations_by_node: Dict, filepath: str):
"""Export to JSON format"""
export_data = {
'project_info': self.project_info,
'study_date': datetime.now().isoformat(),
'nodes': {}
}
for node_name, deviations in deviations_by_node.items():
export_data['nodes'][node_name] = [
{
'description': dev.description,
'guide_word': dev.guide_word.value,
'parameter': dev.parameter.value,
'causes': dev.potential_causes,
'consequences': dev.potential_consequences,
'safeguards': dev.existing_safeguards,
'recommendations': dev.recommendations,
'risk_rank': dev.risk_rank
}
for dev in deviations
]
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
# Usage example
project_info = {
'name': 'Ethylene Polymerization Plant',
'facility': 'Reactor Section',
'location': 'Yokkaichi, Japan',
'study_date': '2025-10-15',
'doc_number': 'HAZOP-EPP-001',
'revision': '0',
'pid_revision': 'Rev 2',
'process_basis': 'Normal operation at 85% capacity'
}
report_gen = HAZOPReportGenerator(project_info)
# Add team members
report_gen.add_team_member("Dr. Smith", "HAZOP Leader")
report_gen.add_team_member("Eng. Tanaka", "Process Engineer")
report_gen.add_team_member("Eng. Kim", "Instrument Engineer")
# Deviation data (generated from Example 2)
analyzer = DeviationAnalyzer("Reactor R-101", "Vessel")
analyzer.analyze_deviation(GuideWord.NO, ProcessParameter.FLOW)
analyzer.analyze_deviation(GuideWord.MORE, ProcessParameter.TEMPERATURE)
deviations_data = {
"Reactor R-101": analyzer.deviations
}
# Generate report
full_report = report_gen.generate_full_report(deviations_data)
print(full_report[:2000]) # Display first 2000 characters
# JSON export
# report_gen.export_to_json(deviations_data, "hazop_study_results.json")
# print("\nā
HAZOP study exported to hazop_study_results.json")
# Expected output:
# ============================================================================
# HAZARD AND OPERABILITY STUDY (HAZOP)
# FINAL REPORT
# ============================================================================
#
# Project: Ethylene Polymerization Plant
# Facility: Reactor Section
# Location: Yokkaichi, Japan
# ...
2.8 Risk Ranking and Prioritization
Example 8: Multi-Criteria Decision Analysis (MCDA)
Comprehensively evaluate multiple risk scenarios and determine countermeasure priorities.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
# ===================================
# Example 8: Multi-Criteria Decision Analysis (MCDA)
# ===================================
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class RiskCriteria:
"""Risk assessment criteria"""
likelihood: float # 1-5
severity_people: float # 1-5 (people impact)
severity_env: float # 1-5 (environmental impact)
severity_asset: float # 1-5 (asset loss)
detectability: float # 1-5 (detectability, lower is better)
class MultiCriteriaRiskRanking:
"""Multi-criteria risk ranking system"""
def __init__(self):
# Criteria weights (sum to 1.0)
self.weights = {
'likelihood': 0.25,
'severity_people': 0.35, # Highest weight on people impact
'severity_env': 0.20,
'severity_asset': 0.15,
'detectability': 0.05
}
def calculate_risk_priority_number(self, criteria: RiskCriteria) -> float:
"""Calculate Risk Priority Number (RPN)"""
# Weighted average score
weighted_score = (
criteria.likelihood * self.weights['likelihood'] +
criteria.severity_people * self.weights['severity_people'] +
criteria.severity_env * self.weights['severity_env'] +
criteria.severity_asset * self.weights['severity_asset'] +
criteria.detectability * self.weights['detectability']
)
# Normalize to 0-100 scale
rpn = weighted_score * 20
return rpn
def rank_scenarios(self, scenarios: Dict[str, RiskCriteria]) -> pd.DataFrame:
"""Rank scenarios"""
data = []
for scenario_name, criteria in scenarios.items():
rpn = self.calculate_risk_priority_number(criteria)
# Determine priority level
if rpn >= 80:
priority = "Critical (P1)"
elif rpn >= 60:
priority = "High (P2)"
elif rpn >= 40:
priority = "Medium (P3)"
else:
priority = "Low (P4)"
data.append({
'Scenario': scenario_name,
'Likelihood': criteria.likelihood,
'Sev_People': criteria.severity_people,
'Sev_Env': criteria.severity_env,
'Sev_Asset': criteria.severity_asset,
'Detectability': criteria.detectability,
'RPN': f"{rpn:.1f}",
'Priority': priority
})
df = pd.DataFrame(data)
df = df.sort_values('RPN', ascending=False, key=lambda x: x.astype(float))
return df
def sensitivity_analysis(self, criteria: RiskCriteria, scenario_name: str):
"""Sensitivity analysis: Impact of each criterion change on RPN"""
baseline_rpn = self.calculate_risk_priority_number(criteria)
print(f"\n=== Sensitivity Analysis: {scenario_name} ===")
print(f"Baseline RPN: {baseline_rpn:.1f}\n")
print("Impact of +1 change in each criterion:")
# Likelihood +1
temp_criteria = RiskCriteria(
criteria.likelihood + 1,
criteria.severity_people,
criteria.severity_env,
criteria.severity_asset,
criteria.detectability
)
new_rpn = self.calculate_risk_priority_number(temp_criteria)
print(f" Likelihood +1: {baseline_rpn:.1f} ā {new_rpn:.1f} (Ī{new_rpn-baseline_rpn:+.1f})")
# Severity_People +1
temp_criteria = RiskCriteria(
criteria.likelihood,
min(criteria.severity_people + 1, 5),
criteria.severity_env,
criteria.severity_asset,
criteria.detectability
)
new_rpn = self.calculate_risk_priority_number(temp_criteria)
print(f" Severity_People +1: {baseline_rpn:.1f} ā {new_rpn:.1f} (Ī{new_rpn-baseline_rpn:+.1f})")
# Detectability -1 (lower is better)
temp_criteria = RiskCriteria(
criteria.likelihood,
criteria.severity_people,
criteria.severity_env,
criteria.severity_asset,
max(criteria.detectability - 1, 1)
)
new_rpn = self.calculate_risk_priority_number(temp_criteria)
print(f" Detectability -1: {baseline_rpn:.1f} ā {new_rpn:.1f} (Ī{new_rpn-baseline_rpn:+.1f})")
# Usage example
mcda = MultiCriteriaRiskRanking()
# Evaluate multiple risk scenarios
scenarios = {
"Reactor overpressure (no relief)": RiskCriteria(
likelihood=2, # Occasional
severity_people=5, # Catastrophic
severity_env=4, # Critical
severity_asset=5, # Catastrophic
detectability=2 # Good detection (pressure transmitter)
),
"Toxic gas release (H2S)": RiskCriteria(
likelihood=3, # Probable
severity_people=4, # Critical
severity_env=3, # Marginal
severity_asset=2, # Negligible
detectability=3 # Moderate detection
),
"Loss of cooling water": RiskCriteria(
likelihood=4, # Frequent
severity_people=2, # Negligible
severity_env=1, # Minimal
severity_asset=3, # Marginal
detectability=1 # Excellent detection (flow transmitter)
),
"Runaway polymerization": RiskCriteria(
likelihood=2, # Occasional
severity_people=4, # Critical
severity_env=3, # Marginal
severity_asset=4, # Critical
detectability=3 # Moderate detection
),
"Flange gasket leak": RiskCriteria(
likelihood=3, # Probable
severity_people=2, # Negligible
severity_env=2, # Negligible
severity_asset=1, # Minimal
detectability=4 # Poor detection (visual inspection)
)
}
# Perform ranking
ranking = mcda.rank_scenarios(scenarios)
print("=== Multi-Criteria Risk Ranking ===\n")
print(ranking.to_string(index=False))
# Sensitivity analysis for highest risk scenario
top_scenario_name = ranking.iloc[0]['Scenario']
top_scenario_criteria = scenarios[top_scenario_name]
mcda.sensitivity_analysis(top_scenario_criteria, top_scenario_name)
# Expected output:
# === Multi-Criteria Risk Ranking ===
#
# Scenario Likelihood Sev_People Sev_Env Sev_Asset Detectability RPN Priority
# Reactor overpressure (no relief) 2 5 4 5 2 80.0 Critical (P1)
# Runaway polymerization 2 4 3 4 3 66.0 High (P2)
# Toxic gas release (H2S) 3 4 3 2 3 66.0 High (P2)
# Loss of cooling water 4 2 1 3 1 48.0 Medium (P3)
# Flange gasket leak 3 2 2 1 4 44.0 Medium (P3)
#
# === Sensitivity Analysis: Reactor overpressure (no relief) ===
# Baseline RPN: 80.0
#
# Impact of +1 change in each criterion:
# Likelihood +1: 80.0 ā 85.0 (Ī+5.0)
# Severity_People +1: 80.0 ā 80.0 (Ī+0.0) # Already at max (5)
# Detectability -1: 80.0 ā 79.0 (Ī-1.0)
Learning Objectives Review
Upon completing this chapter, you should be able to explain:
Fundamental Understanding
- ā Understand HAZOP principles and guide word application methods
- ā Know the deviation analysis process (cause-consequence-safeguards)
- ā Understand the differences between QRA and event tree analysis
- ā Explain F-N curves and ALARP concepts
Practical Skills
- ā Systematically apply HAZOP guide words
- ā Identify HAZOP nodes from P&IDs
- ā Evaluate event sequences using event tree analysis
- ā Calculate PFD using failure rate databases
- ā Create F-N curves and make ALARP judgments
- ā Generate comprehensive HAZOP reports
Application Skills
- ā Conduct HAZOP studies on actual chemical processes
- ā Practice quantitative risk assessment (QRA)
- ā Determine countermeasure priorities using multi-criteria decision analysis
- ā Auto-generate HAZOP reports to support decision-making
Next Steps
In Chapter 2, we learned about HAZOP, QRA, and F-N curves.
In Chapter 3:
- š FMEA (Failure Mode and Effects Analysis)
- š Reliability engineering and MTBF calculation
- š Fault Tree Analysis (FTA)
- š Preventive maintenance optimization (RCM)
will be covered.