This chapter covers Implementation Strategy and Case Studies. You will learn essential concepts and techniques.
AI Technology Implementation Strategy for Real Plants
In this chapter, we will learn practical strategies for implementing the AI technologies covered so far into actual chemical plants. We will systematically explain all elements necessary for implementation, including data integration, model management, online updates, A/B testing, and ROI evaluation.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
<div class="code-header">Example 1: Data Integration Pipeline</div>
<div class="code-desc">
An ETL pipeline that integrates multiple data sources such as DCS, SCADA, and MES, and transforms them into inputs for AI models.
</div>
<pre><code>import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List
class PlantDataIntegrationPipeline:
"""Chemical Plant Data Integration Pipeline"""
def __init__(self, data_sources: Dict[str, str]):
self.data_sources = data_sources
self.integrated_data = None
def extract_dcs_data(self, start_time: datetime, end_time: datetime) -> pd.DataFrame:
"""Extract DCS (Distributed Control System) data"""
# In practice, retrieve from OPC-UA or Historian
print(f"Extracting DCS data: {start_time} ~ {end_time}")
# Generate sample data
periods = int((end_time - start_time).total_seconds() / 60) # 1-minute interval
timestamps = pd.date_range(start_time, end_time, periods=periods)
dcs_data = pd.DataFrame({
'timestamp': timestamps,
'reactor_temp': 350 + np.random.normal(0, 2, periods),
'reactor_pressure': 5.0 + np.random.normal(0, 0.1, periods),
'feed_flow': 100 + np.random.normal(0, 5, periods),
'coolant_flow': 50 + np.random.normal(0, 2, periods)
})
return dcs_data
def extract_mes_data(self, start_time: datetime, end_time: datetime) -> pd.DataFrame:
"""Extract MES (Manufacturing Execution System) data"""
print(f"Extracting MES data: {start_time} ~ {end_time}")
# Batch information
num_batches = int((end_time - start_time).days / 2) # 1 batch per 2 days
mes_data = pd.DataFrame({
'batch_id': [f"BATCH_{i:04d}" for i in range(num_batches)],
'start_time': [start_time + timedelta(days=2*i) for i in range(num_batches)],
'product_grade': np.random.choice(['Grade_A', 'Grade_B'], num_batches),
'target_yield': np.random.uniform(0.92, 0.98, num_batches)
})
return mes_data
def extract_lims_data(self, start_time: datetime, end_time: datetime) -> pd.DataFrame:
"""Extract LIMS (Laboratory Information Management System) data"""
print(f"Extracting LIMS data: {start_time} ~ {end_time}")
# Quality measurement data
num_samples = int((end_time - start_time).days * 3) # 3 samples per day
lims_data = pd.DataFrame({
'sample_time': pd.date_range(start_time, end_time, periods=num_samples),
'purity': np.random.uniform(99.0, 99.9, num_samples),
'viscosity': np.random.uniform(50, 70, num_samples),
'color_index': np.random.uniform(1, 5, num_samples)
})
return lims_data
def transform_data(self, dcs_df: pd.DataFrame, mes_df: pd.DataFrame,
lims_df: pd.DataFrame) -> pd.DataFrame:
"""Transform and integrate data"""
print("Transforming and integrating data...")
# Resample DCS data to 10-minute intervals
dcs_resampled = dcs_df.set_index('timestamp').resample('10T').mean()
# Merge LIMS data to nearest timestamp
dcs_resampled['sample_time'] = dcs_resampled.index
merged = pd.merge_asof(
dcs_resampled.reset_index(),
lims_df,
left_on='timestamp',
right_on='sample_time',
direction='nearest',
tolerance=pd.Timedelta('4H')
)
# Handle missing values
merged = merged.fillna(method='ffill').fillna(method='bfill')
# Feature engineering
merged['temp_pressure_ratio'] = merged['reactor_temp'] / (merged['reactor_pressure'] * 100)
merged['flow_ratio'] = merged['feed_flow'] / (merged['coolant_flow'] + 1e-6)
return merged
def validate_data_quality(self, data: pd.DataFrame) -> Dict:
"""Validate data quality"""
quality_report = {
'total_records': len(data),
'missing_ratio': data.isnull().sum().sum() / (len(data) * len(data.columns)),
'outliers_detected': 0,
'time_gaps': 0
}
# Outlier detection (3-sigma rule)
for col in data.select_dtypes(include=[np.number]).columns:
mean = data[col].mean()
std = data[col].std()
outliers = ((data[col] < mean - 3*std) | (data[col] > mean + 3*std)).sum()
quality_report['outliers_detected'] += outliers
# Time series gap detection
if 'timestamp' in data.columns:
time_diffs = data['timestamp'].diff()
expected_interval = time_diffs.median()
gaps = (time_diffs > expected_interval * 2).sum()
quality_report['time_gaps'] = gaps
print(f"\nData Quality Report:")
for key, value in quality_report.items():
print(f" {key}: {value}")
return quality_report
def run_pipeline(self, start_time: datetime, end_time: datetime) -> pd.DataFrame:
"""Execute the entire pipeline"""
print(f"\n{'='*60}")
print(f"Data Integration Pipeline Execution")
print(f"{'='*60}\n")
# Extract
dcs_data = self.extract_dcs_data(start_time, end_time)
mes_data = self.extract_mes_data(start_time, end_time)
lims_data = self.extract_lims_data(start_time, end_time)
# Transform
integrated_data = self.transform_data(dcs_data, mes_data, lims_data)
# Validate
quality_report = self.validate_data_quality(integrated_data)
self.integrated_data = integrated_data
print(f"\nIntegrated data creation complete: {len(integrated_data)} records")
return integrated_data
# Usage example
data_sources = {
'dcs': 'opc://dcs-server:4840',
'mes': 'sql://mes-db/production',
'lims': 'api://lims-server/samples'
}
pipeline = PlantDataIntegrationPipeline(data_sources)
integrated_data = pipeline.run_pipeline(
start_time=datetime(2025, 1, 1),
end_time=datetime(2025, 1, 31)
)
print(f"\nIntegrated data sample:")
print(integrated_data.head())
Example 2: Model Version Control System
A system that implements model version control, experiment tracking, and model registry for AI models.
import json
import pickle
from pathlib import Path
from datetime import datetime
from typing import Dict, Any
import hashlib
class ModelVersionControl:
"""AI Model Version Control System"""
def __init__(self, registry_path: str = "./model_registry"):
self.registry_path = Path(registry_path)
self.registry_path.mkdir(parents=True, exist_ok=True)
self.metadata_file = self.registry_path / "registry_metadata.json"
self._load_registry()
def _load_registry(self):
"""Load registry"""
if self.metadata_file.exists():
with open(self.metadata_file, 'r', encoding='utf-8') as f:
self.registry = json.load(f)
else:
self.registry = {'models': {}, 'experiments': {}}
def _save_registry(self):
"""Save registry"""
with open(self.metadata_file, 'w', encoding='utf-8') as f:
json.dump(self.registry, f, indent=2, ensure_ascii=False)
def register_model(self, model_name: str, model_obj: Any,
metadata: Dict) -> str:
"""Register a model"""
# Calculate model hash
model_bytes = pickle.dumps(model_obj)
model_hash = hashlib.sha256(model_bytes).hexdigest()[:12]
# Determine version number
if model_name in self.registry['models']:
versions = self.registry['models'][model_name]['versions']
version_numbers = [int(v.split('v')[-1]) for v in versions.keys()]
new_version = f"v{max(version_numbers) + 1}"
else:
new_version = "v1"
self.registry['models'][model_name] = {'versions': {}}
# Save model file
model_dir = self.registry_path / model_name / new_version
model_dir.mkdir(parents=True, exist_ok=True)
model_file = model_dir / f"{model_name}_{new_version}.pkl"
with open(model_file, 'wb') as f:
pickle.dump(model_obj, f)
# Save metadata
version_metadata = {
'version': new_version,
'model_hash': model_hash,
'registered_at': datetime.now().isoformat(),
'model_file': str(model_file),
'metadata': metadata
}
self.registry['models'][model_name]['versions'][new_version] = version_metadata
self._save_registry()
print(f"Model registration complete: {model_name} {new_version}")
print(f" Hash: {model_hash}")
print(f" Registered at: {version_metadata['registered_at']}")
return new_version
def load_model(self, model_name: str, version: str = "latest") -> Any:
"""Load a model"""
if model_name not in self.registry['models']:
raise ValueError(f"Model '{model_name}' not found")
versions = self.registry['models'][model_name]['versions']
if version == "latest":
version_numbers = [int(v.split('v')[-1]) for v in versions.keys()]
version = f"v{max(version_numbers)}"
if version not in versions:
raise ValueError(f"Version '{version}' not found")
model_file = Path(versions[version]['model_file'])
with open(model_file, 'rb') as f:
model_obj = pickle.load(f)
print(f"Model loaded: {model_name} {version}")
return model_obj
def log_experiment(self, experiment_name: str, params: Dict,
metrics: Dict, artifacts: Dict = None):
"""Log experiment results"""
experiment_id = f"exp_{len(self.registry['experiments']) + 1:04d}"
experiment_data = {
'experiment_id': experiment_id,
'experiment_name': experiment_name,
'timestamp': datetime.now().isoformat(),
'parameters': params,
'metrics': metrics,
'artifacts': artifacts or {}
}
self.registry['experiments'][experiment_id] = experiment_data
self._save_registry()
print(f"\nExperiment logged: {experiment_id}")
print(f" Experiment name: {experiment_name}")
print(f" Parameters: {params}")
print(f" Metrics: {metrics}")
return experiment_id
def compare_experiments(self, metric_name: str, top_n: int = 5) -> list:
"""Compare experiment results"""
experiments = []
for exp_id, exp_data in self.registry['experiments'].items():
if metric_name in exp_data['metrics']:
experiments.append({
'experiment_id': exp_id,
'experiment_name': exp_data['experiment_name'],
'metric_value': exp_data['metrics'][metric_name],
'timestamp': exp_data['timestamp']
})
# Sort by metric (descending)
sorted_experiments = sorted(
experiments,
key=lambda x: x['metric_value'],
reverse=True
)[:top_n]
print(f"\nTop {top_n} experiments (by {metric_name}):")
for i, exp in enumerate(sorted_experiments, 1):
print(f"{i}. {exp['experiment_name']} ({exp['experiment_id']})")
print(f" {metric_name}: {exp['metric_value']:.4f}")
return sorted_experiments
def get_model_info(self, model_name: str, version: str = "latest") -> Dict:
"""Get model information"""
if model_name not in self.registry['models']:
raise ValueError(f"Model '{model_name}' not found")
versions = self.registry['models'][model_name]['versions']
if version == "latest":
version_numbers = [int(v.split('v')[-1]) for v in versions.keys()]
version = f"v{max(version_numbers)}"
return versions[version]
# Usage example
mvc = ModelVersionControl(registry_path="./model_registry")
# Register a model
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor(n_estimators=100, random_state=42)
metadata = {
'model_type': 'RandomForestRegressor',
'application': 'Reactor Temperature Prediction',
'training_data_size': 10000,
'features': ['pressure', 'flow_rate', 'concentration']
}
version = mvc.register_model('reactor_temp_model', model, metadata)
# Log experiment
mvc.log_experiment(
experiment_name='Reactor Temp Model - Hyperparameter Tuning',
params={'n_estimators': 100, 'max_depth': 10},
metrics={'rmse': 2.3, 'mae': 1.8, 'r2': 0.92}
)
# Get model information
info = mvc.get_model_info('reactor_temp_model', version='latest')
print(f"\nModel Information:")
print(json.dumps(info, indent=2, ensure_ascii=False))
- Staged Rollout: Follow steps from pilot plant → partial facilities → full deployment
- Fallback Mechanism: Automatically revert to conventional control when AI control fails
- Continuous Monitoring: Early detection of model performance degradation
- Leverage Domain Knowledge: Explicitly model chemical engineering constraints
- Ensure Explainability: Regulatory compliance and gaining operator trust
Summary
In this chapter, we learned the practical elements necessary for implementing AI in real plants. We acquired comprehensive strategies for successful AI implementation, including data integration, model management, online updates, A/B testing, and ROI evaluation.
Chemical Plant AI Applications Series Summary
Through all five chapters, we learned practical applications of AI technology in chemical plants. From process monitoring to predictive maintenance, real-time optimization, supply chain management, and implementation strategies, we systematically acquired the knowledge and skills necessary for digital transformation in the chemical industry.
By applying these technologies to actual plants, concrete results such as productivity improvement, quality enhancement, cost reduction, and safety improvement can be achieved.
References
- Montgomery, D. C. (2019). Design and Analysis of Experiments (9th ed.). Wiley.
- Box, G. E. P., Hunter, J. S., & Hunter, W. G. (2005). Statistics for Experimenters: Design, Innovation, and Discovery (2nd ed.). Wiley.
- Seborg, D. E., Edgar, T. F., Mellichamp, D. A., & Doyle III, F. J. (2016). Process Dynamics and Control (4th ed.). Wiley.
- McKay, M. D., Beckman, R. J., & Conover, W. J. (2000). "A Comparison of Three Methods for Selecting Values of Input Variables in the Analysis of Output from a Computer Code." Technometrics, 42(1), 55-61.
Disclaimer
- This content is provided solely for educational, research, and informational purposes and does not constitute professional advice (legal, accounting, technical warranty, etc.).
- This content and accompanying code examples are provided "AS IS" without any warranty, express or implied, including but not limited to merchantability, fitness for a particular purpose, non-infringement, accuracy, completeness, operation, or safety.
- The author and Tohoku University assume no responsibility for the content, availability, or safety of external links, third-party data, tools, libraries, etc.
- To the maximum extent permitted by applicable law, the author and Tohoku University shall not be liable for any direct, indirect, incidental, special, consequential, or punitive damages arising from the use, execution, or interpretation of this content.
- The content may be changed, updated, or discontinued without notice.
- The copyright and license of this content are subject to the stated conditions (e.g., CC BY 4.0). Such licenses typically include no-warranty clauses.