🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 4: Safety Monitoring and Incident Prediction

Real-time Monitoring and Predictive Safety Management with Machine Learning

📚 Introduction to Process Safety Assessment ⏱️ Reading time: 30-35 minutes 🎯 Difficulty: Intermediate-Advanced

What You'll Learn in This Chapter

4.1 Real-time Safety Monitoring

Ensuring process safety requires continuous monitoring of process variables and early detection of anomalies. We construct systems that monitor critical parameters such as temperature, pressure, and flow rate in real-time, detecting approaches to safety limits.

4.1.1 Safety Parameter Monitoring System

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

# Example 1: Real-time Safety Parameter Monitoring
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum

class SafetyLevel(Enum):
    """Safety levels"""
    NORMAL = "Normal"
    CAUTION = "Caution"
    WARNING = "Warning"
    CRITICAL = "Critical"

@dataclass
class SafetyLimit:
    """Safety limit values"""
    low_critical: float      # Lower Critical limit
    low_warning: float       # Lower Warning limit
    low_caution: float       # Lower Caution limit
    high_caution: float      # Upper Caution limit
    high_warning: float      # Upper Warning limit
    high_critical: float     # Upper Critical limit

class SafetyMonitor:
    """Real-time Safety Monitoring System

    Continuously monitors process variables and detects approach to safety limits
    """

    def __init__(self):
        self.limits: Dict[str, SafetyLimit] = {}
        self.current_values: Dict[str, float] = {}
        self.alert_history: List[Dict] = []

    def set_safety_limits(self, parameter: str, limits: SafetyLimit):
        """Set safety limits"""
        self.limits[parameter] = limits

    def update_value(self, parameter: str, value: float,
                    timestamp: datetime = None) -> Dict:
        """Update parameter value and assess safety

        Args:
            parameter: Parameter name
            value: Measured value
            timestamp: Timestamp

        Returns:
            Assessment result (level, message)
        """
        if timestamp is None:
            timestamp = datetime.now()

        self.current_values[parameter] = value

        # Safety level determination
        safety_assessment = self._assess_safety(parameter, value)

        # Record in alert history
        if safety_assessment['level'] != SafetyLevel.NORMAL:
            self.alert_history.append({
                'timestamp': timestamp,
                'parameter': parameter,
                'value': value,
                'level': safety_assessment['level'],
                'message': safety_assessment['message']
            })

        return safety_assessment

    def _assess_safety(self, parameter: str, value: float) -> Dict:
        """Assess safety"""
        if parameter not in self.limits:
            return {
                'level': SafetyLevel.NORMAL,
                'message': 'No limits defined',
                'distance_to_limit': None
            }

        limits = self.limits[parameter]

        # Critical range
        if value <= limits.low_critical or value >= limits.high_critical:
            level = SafetyLevel.CRITICAL
            message = "Critical limit exceeded! Immediate action required."
            distance = min(
                abs(value - limits.low_critical),
                abs(value - limits.high_critical)
            )

        # Warning range
        elif value <= limits.low_warning or value >= limits.high_warning:
            level = SafetyLevel.WARNING
            message = "Warning: Approaching critical limit"
            distance = min(
                abs(value - limits.low_critical),
                abs(value - limits.high_critical)
            )

        # Caution range
        elif value <= limits.low_caution or value >= limits.high_caution:
            level = SafetyLevel.CAUTION
            message = "Caution: Monitor closely"
            distance = min(
                abs(value - limits.low_critical),
                abs(value - limits.high_critical)
            )

        # Normal range
        else:
            level = SafetyLevel.NORMAL
            message = "Normal operation"
            distance = min(
                abs(value - limits.low_critical),
                abs(value - limits.high_critical)
            )

        return {
            'level': level,
            'message': message,
            'distance_to_limit': distance,
            'value': value
        }

    def get_current_status(self) -> pd.DataFrame:
        """Get current safety status"""
        status_data = []

        for param, value in self.current_values.items():
            assessment = self._assess_safety(param, value)
            status_data.append({
                'Parameter': param,
                'Current Value': value,
                'Safety Level': assessment['level'].value,
                'Distance to Limit': assessment['distance_to_limit'],
                'Message': assessment['message']
            })

        df = pd.DataFrame(status_data)
        return df.sort_values('Distance to Limit')

    def get_alert_summary(self, hours: int = 24) -> pd.DataFrame:
        """Get alert statistics for specified period"""
        if not self.alert_history:
            return pd.DataFrame()

        df = pd.DataFrame(self.alert_history)
        df['timestamp'] = pd.to_datetime(df['timestamp'])

        # Filter by specified period
        cutoff_time = datetime.now() - timedelta(hours=hours)
        df_recent = df[df['timestamp'] >= cutoff_time]

        # Aggregate by level
        summary = df_recent.groupby(['parameter', 'level']).size().unstack(fill_value=0)

        return summary

# Practical example: Monitoring chemical reactor
monitor = SafetyMonitor()

# Set safety limits for reactor temperature
monitor.set_safety_limits(
    'Reactor Temperature',
    SafetyLimit(
        low_critical=50,    # Critical below 50°C
        low_warning=60,
        low_caution=70,
        high_caution=180,
        high_warning=190,
        high_critical=200   # Critical above 200°C
    )
)

# Set safety limits for reactor pressure
monitor.set_safety_limits(
    'Reactor Pressure',
    SafetyLimit(
        low_critical=0.5,   # Critical below 0.5 MPa
        low_warning=1.0,
        low_caution=1.5,
        high_caution=8.0,
        high_warning=9.0,
        high_critical=10.0  # Critical above 10 MPa
    )
)

# Set safety limits for cooling water flow
monitor.set_safety_limits(
    'Cooling Water Flow',
    SafetyLimit(
        low_critical=50,    # Critical below 50 L/min
        low_warning=70,
        low_caution=90,
        high_caution=500,
        high_warning=550,
        high_critical=600
    )
)

# Update real-time measurements (simulation)
np.random.seed(42)

# Normal operation
print("=== Normal Operation ===")
monitor.update_value('Reactor Temperature', 150)
monitor.update_value('Reactor Pressure', 5.0)
monitor.update_value('Cooling Water Flow', 200)

status = monitor.get_current_status()
print(status.to_string(index=False))

# Abnormal state simulation
print("\n=== Abnormal State (Temperature Rise) ===")
result = monitor.update_value('Reactor Temperature', 195)
print(f"Temperature 195°C: {result['level'].value} - {result['message']}")

result = monitor.update_value('Reactor Temperature', 205)
print(f"Temperature 205°C: {result['level'].value} - {result['message']}")

# Pressure abnormality
print("\n=== Abnormal State (Pressure Rise) ===")
result = monitor.update_value('Reactor Pressure', 9.5)
print(f"Pressure 9.5 MPa: {result['level'].value} - {result['message']}")

# Cooling water flow abnormality
print("\n=== Abnormal State (Cooling Water Flow Drop) ===")
result = monitor.update_value('Cooling Water Flow', 65)
print(f"Flow 65 L/min: {result['level'].value} - {result['message']}")

# Alert summary
print("\n=== Alert Summary (24 hours) ===")
alert_summary = monitor.get_alert_summary(hours=24)
if not alert_summary.empty:
    print(alert_summary)

# Output example:
# === Normal Operation ===
# Parameter             Current Value Safety Level  Distance to Limit                    Message
# Cooling Water Flow            200.0       Normal               250.0         Normal operation
# Reactor Temperature           150.0       Normal                50.0         Normal operation
# Reactor Pressure                5.0       Normal                 5.0         Normal operation
#
# === Abnormal State (Temperature Rise) ===
# Temperature 195°C: Warning - Warning: Approaching critical limit
# Temperature 205°C: Critical - Critical limit exceeded! Immediate action required.

4.1.2 Multivariate Anomaly Detection (MSPC)

Multivariate Statistical Process Control (MSPC) monitors multiple process variables simultaneously and performs anomaly detection considering correlations between variables. It detects anomalies using PCA-based T² statistics and Q statistics.

# Requirements:
# - Python 3.9+
# - scipy>=1.11.0

# Example 2: Multivariate Anomaly Detection (MSPC)
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import scipy.stats as stats

class MSPCMonitor:
    """Multivariate Statistical Process Control (MSPC)

    Anomaly detection using PCA-based T² statistics and SPE (Q statistics)
    """

    def __init__(self, n_components: int = None, alpha: float = 0.01):
        """
        Args:
            n_components: Number of principal components (None for auto)
            alpha: Significance level (default 1%)
        """
        self.n_components = n_components
        self.alpha = alpha
        self.scaler = StandardScaler()
        self.pca = None
        self.t2_limit = None
        self.spe_limit = None
        self.is_trained = False

    def train(self, normal_data: np.ndarray):
        """Build statistical model from normal operation data

        Args:
            normal_data: Normal operation data (n_samples, n_features)
        """
        # Standardize data
        X_scaled = self.scaler.fit_transform(normal_data)

        # Build PCA model
        if self.n_components is None:
            # Auto-select number of components for 90%+ cumulative variance
            pca_temp = PCA()
            pca_temp.fit(X_scaled)
            cumsum = np.cumsum(pca_temp.explained_variance_ratio_)
            self.n_components = np.argmax(cumsum >= 0.90) + 1

        self.pca = PCA(n_components=self.n_components)
        T = self.pca.fit_transform(X_scaled)

        # T² control limit (F distribution)
        n, p = normal_data.shape
        a = self.n_components
        f_value = stats.f.ppf(1 - self.alpha, a, n - a)
        self.t2_limit = (a * (n - 1) * (n + 1)) / (n * (n - a)) * f_value

        # SPE (Q statistics) control limit
        residuals = X_scaled - self.pca.inverse_transform(T)
        spe_values = np.sum(residuals ** 2, axis=1)

        # Estimate SPE distribution parameters
        theta1 = np.mean(spe_values)
        theta2 = np.var(spe_values)
        theta3 = np.mean((spe_values - theta1) ** 3)

        h0 = 1 - (2 * theta1 * theta3) / (3 * theta2 ** 2)
        ca = stats.norm.ppf(1 - self.alpha)

        self.spe_limit = theta1 * (
            1 + (ca * np.sqrt(2 * theta2) * h0) / theta1 +
            (theta2 * h0 * (h0 - 1)) / theta1 ** 2
        ) ** (1 / h0)

        self.is_trained = True

        return {
            'n_components': self.n_components,
            'variance_explained': self.pca.explained_variance_ratio_.sum(),
            't2_limit': self.t2_limit,
            'spe_limit': self.spe_limit
        }

    def detect(self, new_data: np.ndarray) -> Dict:
        """Anomaly detection for new data

        Args:
            new_data: Data to monitor (n_samples, n_features)

        Returns:
            Anomaly detection results
        """
        if not self.is_trained:
            raise ValueError("Model not trained. Call train() first.")

        # Standardize data
        X_scaled = self.scaler.transform(new_data)

        # Calculate T² statistics
        T = self.pca.transform(X_scaled)
        t2_values = np.sum(
            (T ** 2) / self.pca.explained_variance_, axis=1
        )

        # Calculate SPE
        residuals = X_scaled - self.pca.inverse_transform(T)
        spe_values = np.sum(residuals ** 2, axis=1)

        # Anomaly determination
        t2_anomaly = t2_values > self.t2_limit
        spe_anomaly = spe_values > self.spe_limit
        any_anomaly = t2_anomaly | spe_anomaly

        return {
            't2': t2_values,
            't2_limit': self.t2_limit,
            't2_anomaly': t2_anomaly,
            'spe': spe_values,
            'spe_limit': self.spe_limit,
            'spe_anomaly': spe_anomaly,
            'anomaly': any_anomaly,
            'anomaly_rate': any_anomaly.mean()
        }

# Practical example: Multivariate monitoring of reactor
np.random.seed(42)

# Generate normal operation data (5 correlated variables)
n_samples = 500
mean_normal = [150, 5.0, 200, 80, 0.85]  # Temperature, pressure, flow, conversion, purity
cov_normal = np.array([
    [25, 0.5, 5, 2, 0.01],
    [0.5, 0.04, 0.1, 0.02, 0.001],
    [5, 0.1, 100, 10, 0.1],
    [2, 0.02, 10, 4, 0.05],
    [0.01, 0.001, 0.1, 0.05, 0.0025]
])

normal_data = np.random.multivariate_normal(mean_normal, cov_normal, n_samples)

# Build MSPC model
mspc = MSPCMonitor(alpha=0.01)
train_result = mspc.train(normal_data)

print("=== MSPC Model Building ===")
print(f"Number of principal components: {train_result['n_components']}")
print(f"Cumulative variance explained: {train_result['variance_explained']:.2%}")
print(f"T² control limit: {train_result['t2_limit']:.2f}")
print(f"SPE control limit: {train_result['spe_limit']:.2f}")

# Validation with normal data
print("\n=== Normal Data Validation ===")
normal_test = np.random.multivariate_normal(mean_normal, cov_normal, 100)
result_normal = mspc.detect(normal_test)
print(f"Anomaly rate: {result_normal['anomaly_rate']*100:.1f}% "
      f"(Expected: {mspc.alpha*100:.1f}%)")

# Validation with abnormal data (temperature anomaly)
print("\n=== Abnormal Data Validation (Temperature +20°C rise) ===")
mean_abnormal = [170, 5.5, 200, 75, 0.82]  # Temperature rise, pressure rise, conversion drop
abnormal_data = np.random.multivariate_normal(mean_abnormal, cov_normal, 50)
result_abnormal = mspc.detect(abnormal_data)

print(f"Anomaly rate: {result_abnormal['anomaly_rate']*100:.1f}%")
print(f"T² anomalies detected: {result_abnormal['t2_anomaly'].sum()} cases")
print(f"SPE anomalies detected: {result_abnormal['spe_anomaly'].sum()} cases")

# Detailed analysis
anomaly_indices = np.where(result_abnormal['anomaly'])[0]
if len(anomaly_indices) > 0:
    print(f"\nAnomaly sample examples (first 5 cases):")
    for i in anomaly_indices[:5]:
        print(f"  Sample {i}: T²={result_abnormal['t2'][i]:.2f} "
              f"(limit={mspc.t2_limit:.2f}), "
              f"SPE={result_abnormal['spe'][i]:.2f} "
              f"(limit={mspc.spe_limit:.2f})")

# Output example:
# === MSPC Model Building ===
# Number of principal components: 3
# Cumulative variance explained: 91.23%
# T² control limit: 11.34
# SPE control limit: 8.76
#
# === Normal Data Validation ===
# Anomaly rate: 1.0% (Expected: 1.0%)
#
# === Abnormal Data Validation (Temperature +20°C rise) ===
# Anomaly rate: 94.0%
# T² anomalies detected: 47 cases
# SPE anomalies detected: 12 cases

4.2 Leading Indicators Management

Leading indicators are metrics for detecting "precursors" to accidents or major disasters. Unlike lagging indicators (such as accident rates, which are outcome measures), they measure safety management activities and risk states before incidents occur, enabling preventive measures.

4.2.1 Leading Indicator Tracking System

# Example 3: Leading Indicators Tracking System
from typing import List, Tuple
from collections import deque

class LeadingIndicator:
    """Leading indicator class"""

    def __init__(self, name: str, target: float, unit: str,
                 direction: str = 'lower'):
        """
        Args:
            name: Indicator name
            target: Target value
            unit: Unit
            direction: 'lower' (lower is better) or 'higher' (higher is better)
        """
        self.name = name
        self.target = target
        self.unit = unit
        self.direction = direction
        self.values = deque(maxlen=12)  # Last 12 months
        self.timestamps = deque(maxlen=12)

    def add_value(self, value: float, timestamp: datetime):
        """Add indicator value"""
        self.values.append(value)
        self.timestamps.append(timestamp)

    def get_current_value(self) -> Optional[float]:
        """Get current value"""
        return self.values[-1] if self.values else None

    def get_trend(self) -> str:
        """Determine trend (Improving/Stable/Deteriorating)"""
        if len(self.values) < 3:
            return "Insufficient Data"

        # Average change rate over last 3 months
        recent_3 = list(self.values)[-3:]
        trend_value = (recent_3[-1] - recent_3[0]) / abs(recent_3[0]) if recent_3[0] != 0 else 0

        # Consider direction
        if self.direction == 'lower':
            # Improvement when value decreases
            if trend_value < -0.05:
                return "Improving"
            elif trend_value > 0.05:
                return "Deteriorating"
            else:
                return "Stable"
        else:
            # Improvement when value increases
            if trend_value > 0.05:
                return "Improving"
            elif trend_value < -0.05:
                return "Deteriorating"
            else:
                return "Stable"

    def is_target_met(self) -> bool:
        """Check if target is met"""
        current = self.get_current_value()
        if current is None:
            return False

        if self.direction == 'lower':
            return current <= self.target
        else:
            return current >= self.target

class LeadingIndicatorTracker:
    """Leading indicator tracking system"""

    def __init__(self):
        self.indicators: Dict[str, LeadingIndicator] = {}

    def add_indicator(self, indicator: LeadingIndicator):
        """Add indicator"""
        self.indicators[indicator.name] = indicator

    def update_indicator(self, name: str, value: float,
                        timestamp: datetime = None):
        """Update indicator value"""
        if name not in self.indicators:
            raise ValueError(f"Indicator '{name}' not found")

        if timestamp is None:
            timestamp = datetime.now()

        self.indicators[name].add_value(value, timestamp)

    def get_dashboard(self) -> pd.DataFrame:
        """Generate dashboard data"""
        data = []
        for name, indicator in self.indicators.items():
            current = indicator.get_current_value()
            target_met = indicator.is_target_met()
            trend = indicator.get_trend()

            data.append({
                'Indicator': name,
                'Current Value': current,
                'Target': indicator.target,
                'Unit': indicator.unit,
                'Target Met': '✓' if target_met else '✗',
                'Trend': trend
            })

        return pd.DataFrame(data)

    def get_risk_score(self) -> float:
        """Calculate overall risk score (0-100, lower is better)"""
        if not self.indicators:
            return 0

        scores = []
        for indicator in self.indicators.values():
            current = indicator.get_current_value()
            if current is None:
                continue

            # Calculate deviation from target
            deviation = abs(current - indicator.target) / abs(indicator.target)

            # Consider trend (weight deteriorating trends)
            trend = indicator.get_trend()
            weight = 1.5 if trend == "Deteriorating" else 1.0

            scores.append(min(deviation * 100 * weight, 100))

        return np.mean(scores) if scores else 0

# Practical example
tracker = LeadingIndicatorTracker()

# Define leading indicators
indicators_config = [
    ('Near Miss Reports', 10, 'reports/month', 'higher'),   # Target 10+ per month
    ('Safety Training Hours', 8, 'hours/person/month', 'higher'),
    ('Overdue Inspections', 5, 'count', 'lower'),          # Target 5 or less
    ('Safety Walk Completions', 20, 'walks/month', 'higher'),
    ('Permit Violations', 2, 'violations/month', 'lower'),
    ('Maintenance Backlog', 15, 'work orders', 'lower')
]

for name, target, unit, direction in indicators_config:
    indicator = LeadingIndicator(name, target, unit, direction)
    tracker.add_indicator(indicator)

# Input 12 months of data (simulation)
np.random.seed(42)
base_date = datetime(2024, 1, 1)

# Generate monthly data
monthly_data = {
    'Near Miss Reports': [8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
    'Safety Training Hours': [7, 7.5, 8, 8.2, 8.5, 8.7, 9, 9.2, 9.5, 9.8, 10, 10.2],
    'Overdue Inspections': [12, 10, 9, 8, 7, 6, 5, 4, 4, 3, 3, 2],
    'Safety Walk Completions': [15, 16, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27],
    'Permit Violations': [5, 4, 4, 3, 3, 2, 2, 1, 1, 1, 0, 0],
    'Maintenance Backlog': [25, 23, 21, 20, 18, 17, 16, 15, 14, 13, 12, 11]
}

for month in range(12):
    timestamp = base_date + timedelta(days=30*month)
    for indicator_name, values in monthly_data.items():
        tracker.update_indicator(indicator_name, values[month], timestamp)

# Display dashboard
print("=== Leading Indicators Dashboard (Latest Month) ===")
dashboard = tracker.get_dashboard()
print(dashboard.to_string(index=False))

# Overall risk score
risk_score = tracker.get_risk_score()
print(f"\n【Overall Risk Score】: {risk_score:.1f}/100")
if risk_score < 20:
    print("Assessment: Excellent (Safety management functioning effectively)")
elif risk_score < 40:
    print("Assessment: Good (Continuous improvement observed)")
elif risk_score < 60:
    print("Assessment: Caution (Room for improvement)")
else:
    print("Assessment: Needs improvement (Urgent action required)")

# Trend analysis
print("\n=== Trend Analysis ===")
improving = [name for name, ind in tracker.indicators.items()
             if ind.get_trend() == "Improving"]
deteriorating = [name for name, ind in tracker.indicators.items()
                if ind.get_trend() == "Deteriorating"]

print(f"Improving indicators ({len(improving)} cases):")
for name in improving:
    print(f"  ✓ {name}")

if deteriorating:
    print(f"\nDeteriorating indicators ({len(deteriorating)} cases):")
    for name in deteriorating:
        print(f"  ⚠️  {name}")

# Output example:
# === Leading Indicators Dashboard (Latest Month) ===
# Indicator                   Current Value  Target                Unit  Target Met     Trend
# Near Miss Reports                    20.0    10.0   reports/month           ✓  Improving
# Safety Training Hours                10.2     8.0  hours/person/month       ✓  Improving
# Overdue Inspections                   2.0     5.0           count           ✓  Improving
# Safety Walk Completions              27.0    20.0     walks/month           ✓  Improving
# Permit Violations                     0.0     2.0  violations/month         ✓  Improving
# Maintenance Backlog                  11.0    15.0     work orders           ✓  Improving
#
# 【Overall Risk Score】: 8.3/100
# Assessment: Excellent (Safety management functioning effectively)

4.3 Incident Prediction with Machine Learning

Using historical incident data, near-miss data, and operational data, we build machine learning models to predict incident probability. Ensemble learning methods such as Random Forest and Gradient Boosting show high prediction accuracy.

4.3.1 Building Incident Prediction Models

# Example 4: Incident Prediction with Machine Learning
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix
import warnings
warnings.filterwarnings('ignore')

class IncidentPredictor:
    """Machine learning-based incident prediction system

    Predicts incident risk from operational data and safety indicators
    """

    def __init__(self, model_type: str = 'random_forest'):
        """
        Args:
            model_type: 'random_forest' or 'gradient_boosting'
        """
        if model_type == 'random_forest':
            self.model = RandomForestClassifier(
                n_estimators=100,
                max_depth=10,
                min_samples_split=10,
                random_state=42
            )
        elif model_type == 'gradient_boosting':
            self.model = GradientBoostingClassifier(
                n_estimators=100,
                learning_rate=0.1,
                max_depth=5,
                random_state=42
            )
        else:
            raise ValueError("model_type must be 'random_forest' or 'gradient_boosting'")

        self.feature_names = None
        self.is_trained = False

    def train(self, X: np.ndarray, y: np.ndarray,
              feature_names: List[str] = None) -> Dict:
        """Train the model

        Args:
            X: Features (n_samples, n_features)
            y: Labels (0: normal, 1: incident)
            feature_names: List of feature names

        Returns:
            Training result statistics
        """
        self.feature_names = feature_names

        # Train/test split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # Train model
        self.model.fit(X_train, y_train)

        # Performance evaluation
        y_pred = self.model.predict(X_test)
        y_prob = self.model.predict_proba(X_test)[:, 1]

        # Cross-validation
        cv_scores = cross_val_score(self.model, X, y, cv=5, scoring='roc_auc')

        self.is_trained = True

        return {
            'train_score': self.model.score(X_train, y_train),
            'test_score': self.model.score(X_test, y_test),
            'roc_auc': roc_auc_score(y_test, y_prob),
            'cv_mean': cv_scores.mean(),
            'cv_std': cv_scores.std(),
            'confusion_matrix': confusion_matrix(y_test, y_pred),
            'classification_report': classification_report(y_test, y_pred)
        }

    def predict_risk(self, X: np.ndarray) -> np.ndarray:
        """Predict incident risk probability

        Args:
            X: Features

        Returns:
            Incident probability (0-1)
        """
        if not self.is_trained:
            raise ValueError("Model not trained")

        return self.model.predict_proba(X)[:, 1]

    def get_feature_importance(self) -> pd.DataFrame:
        """Get feature importance"""
        if not self.is_trained:
            raise ValueError("Model not trained")

        importances = self.model.feature_importances_
        indices = np.argsort(importances)[::-1]

        df = pd.DataFrame({
            'Feature': [self.feature_names[i] if self.feature_names else f'Feature_{i}'
                       for i in indices],
            'Importance': importances[indices]
        })

        return df

# Practical example: Building incident prediction model
np.random.seed(42)

# Generate training data (1000 samples)
n_samples = 1000
n_incidents = 100  # 10% incidents

# Generate features
features = {
    'Temperature Deviation': np.random.randn(n_samples) * 10,  # Temperature deviation
    'Pressure Deviation': np.random.randn(n_samples) * 0.5,   # Pressure deviation
    'Near Miss Count': np.random.poisson(2, n_samples),        # Near miss count
    'Overdue Maintenance': np.random.poisson(3, n_samples),    # Overdue maintenance
    'Operator Experience': np.random.uniform(1, 20, n_samples), # Years of experience
    'Safety Training Hours': np.random.uniform(5, 15, n_samples), # Training hours
    'Equipment Age': np.random.uniform(0, 30, n_samples),      # Equipment age
    'Alarm Rate': np.random.poisson(10, n_samples)             # Alarm rate
}

# Generate incident labels (dependent on features)
incident_score = (
    np.abs(features['Temperature Deviation']) * 0.1 +
    np.abs(features['Pressure Deviation']) * 0.5 +
    features['Near Miss Count'] * 0.3 +
    features['Overdue Maintenance'] * 0.2 +
    (20 - features['Operator Experience']) * 0.05 +
    (15 - features['Safety Training Hours']) * 0.1 +
    features['Equipment Age'] * 0.05 +
    features['Alarm Rate'] * 0.1 +
    np.random.randn(n_samples) * 2  # Noise
)

# Label top 10% as incidents
threshold = np.percentile(incident_score, 90)
y = (incident_score > threshold).astype(int)

# Create feature matrix
X = np.column_stack([features[k] for k in features.keys()])
feature_names = list(features.keys())

# Build model
predictor = IncidentPredictor(model_type='random_forest')
train_result = predictor.train(X, y, feature_names)

print("=== Incident Prediction Model Performance ===")
print(f"Training accuracy: {train_result['train_score']:.3f}")
print(f"Test accuracy: {train_result['test_score']:.3f}")
print(f"ROC-AUC: {train_result['roc_auc']:.3f}")
print(f"CV mean: {train_result['cv_mean']:.3f} (±{train_result['cv_std']:.3f})")

print("\n=== Confusion Matrix ===")
cm = train_result['confusion_matrix']
print(f"True Negative: {cm[0,0]}, False Positive: {cm[0,1]}")
print(f"False Negative: {cm[1,0]}, True Positive: {cm[1,1]}")

print("\n=== Classification Report ===")
print(train_result['classification_report'])

# Feature importance
print("=== Feature Importance ===")
importance_df = predictor.get_feature_importance()
print(importance_df.to_string(index=False))

# Predict new data
print("\n=== New Data Prediction Examples ===")
new_samples = np.array([
    [15, 1.0, 5, 8, 3, 6, 25, 20],   # High risk case
    [2, 0.1, 0, 1, 15, 12, 5, 5],    # Low risk case
    [8, 0.5, 3, 4, 10, 10, 15, 12]   # Medium risk case
])

risk_probs = predictor.predict_risk(new_samples)
for i, prob in enumerate(risk_probs):
    risk_level = "High" if prob > 0.5 else "Medium" if prob > 0.2 else "Low"
    print(f"Sample {i+1}: Incident risk={prob*100:.1f}% ({risk_level} risk)")

# Output example:
# === Incident Prediction Model Performance ===
# Training accuracy: 0.975
# Test accuracy: 0.885
# ROC-AUC: 0.932
# CV mean: 0.918 (±0.023)

4.4 Safety Performance Metrics

Safety performance is measured using standardized metrics such as TRIR (Total Recordable Incident Rate) and LTIR (Lost Time Incident Rate). These enable cross-industry and inter-company comparisons.

4.4.1 Safety KPI Calculation System

# Example 5: Safety Performance Metrics Calculation
from dataclasses import dataclass
from typing import List

@dataclass
class Incident:
    """Incident record"""
    date: datetime
    type: str  # 'fatality', 'lost_time', 'restricted_work', 'medical_treatment', 'first_aid'
    days_away: int = 0  # Days away from work
    description: str = ""

class SafetyPerformanceCalculator:
    """Safety performance metrics calculation system

    Calculates various safety metrics based on OSHA standards
    """

    def __init__(self):
        self.incidents: List[Incident] = []
        self.total_hours_worked = 0

    def add_incident(self, incident: Incident):
        """Add incident"""
        self.incidents.append(incident)

    def set_hours_worked(self, hours: float):
        """Set total hours worked"""
        self.total_hours_worked = hours

    def calculate_trir(self) -> float:
        """Calculate TRIR (Total Recordable Incident Rate)

        TRIR = (Number of recordable incidents × 200,000) / Total hours worked

        Recordable: Fatality, lost time, restricted work, medical treatment
        200,000 = Hours worked by 100 employees annually (100 × 2,000 hours)
        """
        recordable = [i for i in self.incidents
                     if i.type in ['fatality', 'lost_time',
                                  'restricted_work', 'medical_treatment']]

        if self.total_hours_worked == 0:
            return 0

        return (len(recordable) * 200000) / self.total_hours_worked

    def calculate_ltir(self) -> float:
        """Calculate LTIR (Lost Time Incident Rate)

        LTIR = (Number of lost time incidents × 200,000) / Total hours worked
        """
        lost_time_incidents = [i for i in self.incidents
                              if i.type in ['fatality', 'lost_time']]

        if self.total_hours_worked == 0:
            return 0

        return (len(lost_time_incidents) * 200000) / self.total_hours_worked

    def calculate_severity_rate(self) -> float:
        """Calculate severity rate

        Severity Rate = (Total days lost × 200,000) / Total hours worked
        """
        total_days_lost = sum(i.days_away for i in self.incidents)

        if self.total_hours_worked == 0:
            return 0

        return (total_days_lost * 200000) / self.total_hours_worked

    def calculate_fatality_rate(self) -> float:
        """Calculate fatality rate

        Fatality Rate = (Number of fatalities × 200,000) / Total hours worked
        """
        fatalities = [i for i in self.incidents if i.type == 'fatality']

        if self.total_hours_worked == 0:
            return 0

        return (len(fatalities) * 200000) / self.total_hours_worked

    def get_incident_breakdown(self) -> pd.DataFrame:
        """Get incident breakdown by type"""
        if not self.incidents:
            return pd.DataFrame()

        type_counts = {}
        for incident in self.incidents:
            type_counts[incident.type] = type_counts.get(incident.type, 0) + 1

        df = pd.DataFrame({
            'Incident Type': list(type_counts.keys()),
            'Count': list(type_counts.values())
        })

        # Calculate percentage
        df['Percentage'] = (df['Count'] / df['Count'].sum() * 100).round(1)

        return df.sort_values('Count', ascending=False)

    def get_comprehensive_report(self) -> Dict:
        """Generate comprehensive safety report"""
        return {
            'Total Incidents': len(self.incidents),
            'Total Hours Worked': self.total_hours_worked,
            'TRIR': self.calculate_trir(),
            'LTIR': self.calculate_ltir(),
            'Severity Rate': self.calculate_severity_rate(),
            'Fatality Rate': self.calculate_fatality_rate(),
            'Incident Breakdown': self.get_incident_breakdown()
        }

    def benchmark_against_industry(self, industry_trir: float,
                                   industry_ltir: float) -> Dict:
        """Benchmark against industry average

        Args:
            industry_trir: Industry average TRIR
            industry_ltir: Industry average LTIR

        Returns:
            Benchmark results
        """
        company_trir = self.calculate_trir()
        company_ltir = self.calculate_ltir()

        return {
            'Company TRIR': company_trir,
            'Industry TRIR': industry_trir,
            'TRIR Performance': 'Above Average' if company_trir < industry_trir else 'Below Average',
            'TRIR Difference': company_trir - industry_trir,
            'Company LTIR': company_ltir,
            'Industry LTIR': industry_ltir,
            'LTIR Performance': 'Above Average' if company_ltir < industry_ltir else 'Below Average',
            'LTIR Difference': company_ltir - industry_ltir
        }

# Practical example
spc = SafetyPerformanceCalculator()

# Annual total hours worked (500 employees, 2000 hours/person/year)
spc.set_hours_worked(500 * 2000)  # 1,000,000 hours

# Add incident data (one year)
incidents_data = [
    Incident(datetime(2024, 1, 15), 'first_aid', 0, "Minor cut"),
    Incident(datetime(2024, 2, 3), 'medical_treatment', 0, "Bruise, medical treatment"),
    Incident(datetime(2024, 3, 12), 'lost_time', 5, "Fall, ankle sprain"),
    Incident(datetime(2024, 4, 8), 'first_aid', 0, "Debris in eye"),
    Incident(datetime(2024, 5, 20), 'restricted_work', 3, "Back pain, light duty only"),
    Incident(datetime(2024, 6, 15), 'lost_time', 10, "Machine pinch, finger fracture"),
    Incident(datetime(2024, 7, 22), 'medical_treatment', 0, "Chemical splash"),
    Incident(datetime(2024, 8, 9), 'first_aid', 0, "Minor burn"),
    Incident(datetime(2024, 9, 14), 'lost_time', 15, "Fall from height, leg fracture"),
    Incident(datetime(2024, 10, 5), 'medical_treatment', 0, "Gas inhalation"),
    Incident(datetime(2024, 11, 18), 'first_aid', 0, "Abrasion"),
    Incident(datetime(2024, 12, 2), 'restricted_work', 2, "Wrist pain")
]

for incident in incidents_data:
    spc.add_incident(incident)

# Comprehensive report
report = spc.get_comprehensive_report()

print("=== Annual Safety Performance Report ===\n")
print(f"Total incidents: {report['Total Incidents']} cases")
print(f"Total hours worked: {report['Total Hours Worked']:,} hours")
print(f"\n【Key Metrics】")
print(f"TRIR (Total Recordable Incident Rate): {report['TRIR']:.2f}")
print(f"LTIR (Lost Time Incident Rate): {report['LTIR']:.2f}")
print(f"Severity Rate: {report['Severity Rate']:.2f}")
print(f"Fatality Rate: {report['Fatality Rate']:.2f}")

print("\n=== Incident Breakdown by Type ===")
print(report['Incident Breakdown'].to_string(index=False))

# Industry benchmark (typical chemical industry values)
print("\n=== Industry Benchmark ===")
industry_trir = 1.2  # Chemical industry average TRIR
industry_ltir = 0.5  # Chemical industry average LTIR

benchmark = spc.benchmark_against_industry(industry_trir, industry_ltir)
print(f"Company TRIR: {benchmark['Company TRIR']:.2f}")
print(f"Industry average TRIR: {benchmark['Industry TRIR']:.2f}")
print(f"Performance: {benchmark['TRIR Performance']}")
print(f"Difference: {benchmark['TRIR Difference']:+.2f}")

print(f"\nCompany LTIR: {benchmark['Company LTIR']:.2f}")
print(f"Industry average LTIR: {benchmark['Industry LTIR']:.2f}")
print(f"Performance: {benchmark['LTIR Performance']}")
print(f"Difference: {benchmark['LTIR Difference']:+.2f}")

# Output example:
# === Annual Safety Performance Report ===
#
# Total incidents: 12 cases
# Total hours worked: 1,000,000 hours
#
# 【Key Metrics】
# TRIR (Total Recordable Incident Rate): 1.40
# LTIR (Lost Time Incident Rate): 0.60
# Severity Rate: 6.00
# Fatality Rate: 0.00

4.4.2 Near-Miss Analysis System

# Example 6: Near-Miss Analysis System
class NearMissAnalyzer:
    """Near-miss analysis system

    Preventive safety management based on Heinrich's Law
    """

    def __init__(self):
        self.near_misses: List[Dict] = []

    def add_near_miss(self, category: str, severity: int,
                     description: str, timestamp: datetime = None):
        """Add near-miss

        Args:
            category: Category ('slip_trip', 'chemical', 'equipment', etc.)
            severity: Severity (1-5, 5 is most severe)
            description: Description
            timestamp: Occurrence time
        """
        if timestamp is None:
            timestamp = datetime.now()

        self.near_misses.append({
            'timestamp': timestamp,
            'category': category,
            'severity': severity,
            'description': description
        })

    def get_heinrich_ratio_analysis(self, actual_incidents: int) -> Dict:
        """Analysis based on Heinrich's Law

        Heinrich's Law: Behind 1 major accident, there are:
        - 29 minor injuries
        - 300 near misses
        (empirical rule)
        """
        near_miss_count = len(self.near_misses)

        # Expected number of major accidents
        expected_major = near_miss_count / 300

        return {
            'Near Miss Count': near_miss_count,
            'Actual Incidents': actual_incidents,
            'Expected Major Incidents (Heinrich)': expected_major,
            'Heinrich Ratio': f"1:{actual_incidents}:{near_miss_count}",
            'Prevention Effectiveness': (
                (expected_major - actual_incidents) / expected_major * 100
                if expected_major > 0 else 0
            )
        }

    def get_category_analysis(self) -> pd.DataFrame:
        """Analysis by category"""
        if not self.near_misses:
            return pd.DataFrame()

        df = pd.DataFrame(self.near_misses)

        # Aggregate by category
        category_stats = df.groupby('category').agg({
            'severity': ['count', 'mean', 'max'],
            'description': 'count'
        }).round(2)

        category_stats.columns = ['Count', 'Avg Severity', 'Max Severity', 'Total']
        category_stats = category_stats[['Count', 'Avg Severity', 'Max Severity']]

        return category_stats.sort_values('Count', ascending=False)

    def get_high_severity_near_misses(self, threshold: int = 4) -> pd.DataFrame:
        """Extract high-severity near misses

        Args:
            threshold: Severity threshold (default 4 or higher)
        """
        if not self.near_misses:
            return pd.DataFrame()

        df = pd.DataFrame(self.near_misses)
        high_severity = df[df['severity'] >= threshold]

        return high_severity[['timestamp', 'category', 'severity', 'description']]

    def calculate_near_miss_rate(self, hours_worked: float) -> float:
        """Calculate near-miss rate

        Near Miss Rate = (Near miss count × 200,000) / Total hours worked
        """
        if hours_worked == 0:
            return 0

        return (len(self.near_misses) * 200000) / hours_worked

# Practical example
nma = NearMissAnalyzer()

# Add near-miss data (3 months)
near_miss_data = [
    ('slip_trip', 2, "Water puddle on floor, almost fell"),
    ('chemical', 4, "Tank valve slightly open, chemical dripping"),
    ('equipment', 3, "Safety cover removed, almost started operation"),
    ('slip_trip', 1, "Stumbled on step"),
    ('electrical', 5, "Damaged wiring, electric shock hazard"),
    ('chemical', 3, "Almost handled chemicals without protective equipment"),
    ('equipment', 4, "Pump abnormal noise, sign of failure"),
    ('slip_trip', 2, "Obstacle in pathway, stumbled"),
    ('chemical', 2, "Chemical container label unclear"),
    ('equipment', 3, "Pressure gauge showed abnormal value"),
    ('electrical', 4, "Outlet overheating"),
    ('slip_trip', 1, "Floor slippery"),
    ('chemical', 5, "Gas leak smell from pipe connection"),
    ('equipment', 2, "Tool not in designated location"),
]

base_time = datetime(2024, 10, 1)
for i, (cat, sev, desc) in enumerate(near_miss_data):
    timestamp = base_time + timedelta(days=i*3)
    nma.add_near_miss(cat, sev, desc, timestamp)

# Category analysis
print("=== Near-Miss Category Analysis ===")
category_analysis = nma.get_category_analysis()
print(category_analysis)

# Heinrich's Law analysis
print("\n=== Heinrich's Law Analysis ===")
actual_incidents = 3  # Actual incident count in same period
heinrich = nma.get_heinrich_ratio_analysis(actual_incidents)
print(f"Near-miss count: {heinrich['Near Miss Count']}")
print(f"Actual incident count: {heinrich['Actual Incidents']}")
print(f"Expected major incidents (Law): {heinrich['Expected Major Incidents (Heinrich)']:.2f}")
print(f"Ratio: {heinrich['Heinrich Ratio']}")
print(f"Prevention effectiveness: {heinrich['Prevention Effectiveness']:.1f}%")

# High-severity near misses
print("\n=== High-Severity Near Misses (Severity ≥4) ===")
high_severity = nma.get_high_severity_near_misses(threshold=4)
for _, row in high_severity.iterrows():
    print(f"[{row['timestamp'].strftime('%Y-%m-%d')}] "
          f"{row['category']} (Severity {row['severity']}): {row['description']}")

# Near-miss rate calculation
hours_worked = 500 * 2000  # 500 people × 2000 hours/year
nm_rate = nma.calculate_near_miss_rate(hours_worked)
print(f"\nNear-miss rate: {nm_rate:.2f} (per 200,000 hours)")

# Output example:
# === Near-Miss Category Analysis ===
#             Count  Avg Severity  Max Severity
# category
# slip_trip       4          1.50             2
# chemical        4          3.50             5
# equipment       4          3.00             4
# electrical      2          4.50             5

4.5 Integrated Safety Management System

We integrate all previous components to build a comprehensive safety management system. Real-time monitoring, prediction, and KPI management are centralized.

# Example 7: Integrated Safety Management System
class IntegratedSafetyManagementSystem:
    """Integrated safety management system

    Comprehensive safety platform integrating monitoring, prediction, and KPI management
    """

    def __init__(self):
        self.safety_monitor = SafetyMonitor()
        self.mspc_monitor = None  # Set after training
        self.leading_indicator_tracker = LeadingIndicatorTracker()
        self.incident_predictor = None  # Set after training
        self.performance_calculator = SafetyPerformanceCalculator()
        self.near_miss_analyzer = NearMissAnalyzer()

    def initialize_monitoring(self, safety_limits: Dict[str, SafetyLimit]):
        """Initialize monitoring system"""
        for param, limits in safety_limits.items():
            self.safety_monitor.set_safety_limits(param, limits)

    def initialize_leading_indicators(self, indicators: List[LeadingIndicator]):
        """Initialize leading indicators"""
        for indicator in indicators:
            self.leading_indicator_tracker.add_indicator(indicator)

    def train_anomaly_detector(self, normal_data: np.ndarray):
        """Train anomaly detection model"""
        self.mspc_monitor = MSPCMonitor()
        return self.mspc_monitor.train(normal_data)

    def train_incident_predictor(self, X: np.ndarray, y: np.ndarray,
                                feature_names: List[str]):
        """Train incident prediction model"""
        self.incident_predictor = IncidentPredictor()
        return self.incident_predictor.train(X, y, feature_names)

    def get_real_time_status(self) -> Dict:
        """Get real-time status"""
        return {
            'Process Safety': self.safety_monitor.get_current_status(),
            'Leading Indicators': self.leading_indicator_tracker.get_dashboard(),
            'Risk Score': self.leading_indicator_tracker.get_risk_score()
        }

    def generate_safety_dashboard(self) -> str:
        """Generate comprehensive safety dashboard"""
        lines = []
        lines.append("=" * 70)
        lines.append("Integrated Safety Management Dashboard".center(70))
        lines.append("=" * 70)
        lines.append("")

        # Process safety status
        lines.append("【Process Safety Status】")
        process_status = self.safety_monitor.get_current_status()
        for _, row in process_status.iterrows():
            status_icon = "✓" if row['Safety Level'] == 'Normal' else "⚠️"
            lines.append(f"  {status_icon} {row['Parameter']}: "
                        f"{row['Current Value']:.1f} ({row['Safety Level']})")
        lines.append("")

        # Leading indicators
        lines.append("【Leading Indicators】")
        indicators = self.leading_indicator_tracker.get_dashboard()
        for _, row in indicators.iterrows():
            lines.append(f"  {row['Target Met']} {row['Indicator']}: "
                        f"{row['Current Value']:.1f} ({row['Trend']})")

        risk_score = self.leading_indicator_tracker.get_risk_score()
        lines.append(f"\n  Overall risk score: {risk_score:.1f}/100")
        lines.append("")

        # Safety performance
        if self.performance_calculator.total_hours_worked > 0:
            lines.append("【Safety Performance】")
            lines.append(f"  TRIR: {self.performance_calculator.calculate_trir():.2f}")
            lines.append(f"  LTIR: {self.performance_calculator.calculate_ltir():.2f}")
            lines.append("")

        # Near-miss statistics
        if len(self.near_miss_analyzer.near_misses) > 0:
            lines.append("【Near Misses】")
            lines.append(f"  Total count: {len(self.near_miss_analyzer.near_misses)}")
            high_severity = self.near_miss_analyzer.get_high_severity_near_misses()
            lines.append(f"  High severity (≥4): {len(high_severity)} cases")
            lines.append("")

        return "\n".join(lines)

    def predict_incident_risk(self, current_conditions: np.ndarray) -> Dict:
        """Predict incident risk from current conditions"""
        if self.incident_predictor is None:
            return {'error': 'Incident predictor not trained'}

        risk_prob = self.incident_predictor.predict_risk(current_conditions)[0]

        risk_level = (
            "Critical" if risk_prob > 0.7 else
            "High" if risk_prob > 0.5 else
            "Medium" if risk_prob > 0.3 else
            "Low"
        )

        return {
            'risk_probability': risk_prob,
            'risk_level': risk_level,
            'recommendation': self._get_recommendation(risk_level)
        }

    def _get_recommendation(self, risk_level: str) -> str:
        """Generate recommended action based on risk level"""
        recommendations = {
            'Critical': "Consider immediate shutdown. Convene emergency response team.",
            'High': "Review operating conditions, strengthen monitoring. Report to management.",
            'Medium': "Continue monitoring. Watch for anomaly signs.",
            'Low': "Continue normal operation. Maintain regular monitoring."
        }
        return recommendations.get(risk_level, "")

# Practical example (complete integrated system)
isms = IntegratedSafetyManagementSystem()

# 1. Initialize monitoring system
safety_limits_config = {
    'Reactor Temperature': SafetyLimit(50, 60, 70, 180, 190, 200),
    'Reactor Pressure': SafetyLimit(0.5, 1.0, 1.5, 8.0, 9.0, 10.0)
}
isms.initialize_monitoring(safety_limits_config)

# 2. Initialize leading indicators
indicators = [
    LeadingIndicator('Near Miss Reports', 10, 'reports/month', 'higher'),
    LeadingIndicator('Safety Training Hours', 8, 'hours/person/month', 'higher')
]
isms.initialize_leading_indicators(indicators)

# 3. Update process data
isms.safety_monitor.update_value('Reactor Temperature', 150)
isms.safety_monitor.update_value('Reactor Pressure', 5.0)

# 4. Update leading indicators
isms.leading_indicator_tracker.update_indicator('Near Miss Reports', 15)
isms.leading_indicator_tracker.update_indicator('Safety Training Hours', 9.5)

# 5. Generate dashboard
print(isms.generate_safety_dashboard())

# 6. Predict incident risk (if trained model available)
# current_conditions = np.array([[10, 0.8, 3, 5, 8, 10, 15, 12]])
# risk_prediction = isms.predict_incident_risk(current_conditions)
# print(f"\n【Incident Risk Prediction】")
# print(f"Risk probability: {risk_prediction['risk_probability']*100:.1f}%")
# print(f"Risk level: {risk_prediction['risk_level']}")
# print(f"Recommended action: {risk_prediction['recommendation']}")

# Output example:
# ======================================================================
#                  Integrated Safety Management Dashboard
# ======================================================================
#
# 【Process Safety Status】
#   ✓ Reactor Temperature: 150.0 (Normal)
#   ✓ Reactor Pressure: 5.0 (Normal)
#
# 【Leading Indicators】
#   ✓ Near Miss Reports: 15.0 (Improving)
#   ✓ Safety Training Hours: 9.5 (Improving)
#
#   Overall risk score: 12.5/100

Learning Objectives Review

Upon completing this chapter, you will be able to:

Back to Introduction to Process Safety Assessment Index

PI Practical Techniques Top | Back to Home

References

  1. Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
  2. Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
  3. Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
  4. McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.

Disclaimer