This chapter covers Model Management and Registry. You will learn model versioning, Properly manage model metadata, and model packaging.
Learning Objectives
By reading this chapter, you will be able to:
- ā Understand the challenges and importance of model management
- ā Register and manage models using MLflow Model Registry
- ā Implement model versioning and stage management
- ā Properly manage model metadata and schemas
- ā Understand model packaging and different formats
- ā Implement model governance and compliance
4.1 Challenges in Model Management
What is Model Management?
Model Management is the process of systematically managing the entire lifecycle of machine learning models.
"There is no MLOps success without proper model management" - The foundation of model operations in production
Key Challenges in Model Management
1. Model Versioning
| Challenge | Description | Impact |
|---|---|---|
| Version Tracking | When models were created | Lack of reproducibility |
| Model Comparison | Performance comparison across versions | Difficulty selecting optimal model |
| Rollback | Reverting to old versions when issues occur | Increased downtime |
| Dependency Management | Linking models to training code | Retraining failures |
2. Metadata Management
Managing important information about models:
- Training Metadata: Hyperparameters, training data information
- Performance Metrics: Accuracy, recall, F1 score
- Input/Output Schema: Expected input and output formats
- Dependent Libraries: Python, scikit-learn, PyTorch versions
3. Model Lifecycle
Degradation?} E -->|Yes| F[Archive] E -->|No| C F --> A style A fill:#fff3e0 style B fill:#e3f2fd style C fill:#c8e6c9 style D fill:#f3e5f5 style E fill:#ffebee style F fill:#e0e0e0
4. Governance Requirements
| Requirement | Purpose | Implementation Method |
|---|---|---|
| Access Control | Permission management | RBAC, API authentication |
| Audit Logs | Change history tracking | Event logs, timestamps |
| Compliance | Regulatory compliance | Model cards, accountability |
| Approval Process | Quality assurance | Review, testing |
Implementation Challenges in Model Management
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
import os
import json
from datetime import datetime
import numpy as np
# Example demonstrating typical challenges in model management
class ModelManagementChallenges:
"""Class demonstrating model management challenges"""
def __init__(self):
self.models = {}
self.challenges = []
def demonstrate_version_chaos(self):
"""Demonstrate version management chaos"""
# Challenge 1: Inconsistent version naming
model_files = [
"model.pkl",
"model_v2.pkl",
"model_final.pkl",
"model_final_v2.pkl",
"model_REALLY_final.pkl",
"model_2024_01_15.pkl"
]
print("=== Challenge 1: Version Management Chaos ===")
print("Unsystematic file names:")
for f in model_files:
print(f" - {f}")
print("\nProblems:")
print(" - Which is the latest version?")
print(" - Creation order unknown")
print(" - Differences between versions unclear")
return model_files
def demonstrate_metadata_loss(self):
"""Demonstrate metadata loss"""
print("\n=== Challenge 2: Missing Metadata ===")
# Only model file is saved
model_info = {
"filename": "model.pkl",
"size_mb": 45.2
}
print("Saved information:")
print(json.dumps(model_info, indent=2))
print("\nMissing critical information:")
missing_metadata = [
"Training dataset used",
"Hyperparameters",
"Performance metrics",
"Input/output schema",
"Dependency library versions",
"Author and creation date",
"Training environment (GPU, CPU specs)"
]
for item in missing_metadata:
print(f" ā {item}")
def demonstrate_deployment_risk(self):
"""Demonstrate deployment risks"""
print("\n=== Challenge 3: Deployment Risks ===")
scenarios = [
{
"scenario": "Deploying wrong model",
"cause": "Lack of version control",
"impact": "Performance degradation, business loss"
},
{
"scenario": "Unable to rollback",
"cause": "Insufficient storage of old versions",
"impact": "Extended downtime"
},
{
"scenario": "Dependency mismatch",
"cause": "Environment information not recorded",
"impact": "Runtime errors"
}
]
for s in scenarios:
print(f"\nScenario: {s['scenario']}")
print(f" Cause: {s['cause']}")
print(f" Impact: {s['impact']}")
def demonstrate_governance_gaps(self):
"""Demonstrate governance gaps"""
print("\n=== Challenge 4: Lack of Governance ===")
governance_issues = [
"Unknown who deployed model to production",
"Model changes implemented without approval process",
"No audit logs exist",
"Access control not implemented",
"Compliance requirements not addressed"
]
print("Common governance problems:")
for issue in governance_issues:
print(f" ā ļø {issue}")
# Execution example
challenges = ModelManagementChallenges()
challenges.demonstrate_version_chaos()
challenges.demonstrate_metadata_loss()
challenges.demonstrate_deployment_risk()
challenges.demonstrate_governance_gaps()
print("\n" + "="*60)
print("Conclusion: A systematic model management system is needed")
print("="*60)
Output:
=== Challenge 1: Version Management Chaos ===
Unsystematic file names:
- model.pkl
- model_v2.pkl
- model_final.pkl
- model_final_v2.pkl
- model_REALLY_final.pkl
- model_2024_01_15.pkl
Problems:
- Which is the latest version?
- Creation order unknown
- Differences between versions unclear
=== Challenge 2: Missing Metadata ===
Saved information:
{
"filename": "model.pkl",
"size_mb": 45.2
}
Missing critical information:
ā Training dataset used
ā Hyperparameters
ā Performance metrics
ā Input/output schema
ā Dependency library versions
ā Author and creation date
ā Training environment (GPU, CPU specs)
=== Challenge 3: Deployment Risks ===
Scenario: Deploying wrong model
Cause: Lack of version control
Impact: Performance degradation, business loss
Scenario: Unable to rollback
Cause: Insufficient storage of old versions
Impact: Extended downtime
Scenario: Dependency mismatch
Cause: Environment information not recorded
Impact: Runtime errors
=== Challenge 4: Lack of Governance ===
Common governance problems:
ā ļø Unknown who deployed model to production
ā ļø Model changes implemented without approval process
ā ļø No audit logs exist
ā ļø Access control not implemented
ā ļø Compliance requirements not addressed
============================================================
Conclusion: A systematic model management system is needed
============================================================
4.2 Model Registry
What is MLflow Model Registry?
MLflow Model Registry is a central repository for managing the entire lifecycle of machine learning models.
Key Features of Model Registry
| Feature | Description | Benefits |
|---|---|---|
| Model Registration | Register models with names | Unified management |
| Version Control | Automatic version numbering | History tracking |
| Stage Management | Staging/Production/Archive | Clear environment separation |
| Metadata Storage | Descriptions, tags, annotations | Improved searchability |
| Access Control | Permission-based management | Security |
Setting up MLflow Model Registry
# Requirements:
# - Python 3.9+
# - mlflow>=2.4.0
# - numpy>=1.24.0, <2.0.0
"""
Example: Setting up MLflow Model Registry
Purpose: Demonstrate core concepts and implementation patterns
Target: Advanced
Execution time: 1-5 minutes
Dependencies: None
"""
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
# MLflow tracking server configuration
mlflow.set_tracking_uri("sqlite:///mlflow.db")
mlflow.set_experiment("model-registry-demo")
# Create client
client = MlflowClient()
print("=== MLflow Model Registry Setup ===")
print(f"Tracking URI: {mlflow.get_tracking_uri()}")
print(f"Experiment name: {mlflow.get_experiment_by_name('model-registry-demo').name}")
# Data preparation
X, y = make_classification(
n_samples=1000,
n_features=20,
n_informative=15,
n_redundant=5,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
print(f"\nDataset: {X_train.shape[0]} training, {X_test.shape[0]} test")
Model Versioning
def train_and_register_model(model_name, n_estimators, max_depth):
"""Train model and register to Model Registry"""
with mlflow.start_run(run_name=f"rf_v{n_estimators}_{max_depth}") as run:
# Model training
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
model.fit(X_train, y_train)
# Prediction and evaluation
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Log metrics
mlflow.log_params({
"n_estimators": n_estimators,
"max_depth": max_depth
})
mlflow.log_metrics({
"accuracy": accuracy,
"f1_score": f1
})
# Log model
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name=model_name
)
print(f"\nā Model training complete: {model_name}")
print(f" Run ID: {run.info.run_id}")
print(f" Accuracy: {accuracy:.4f}")
print(f" F1 Score: {f1:.4f}")
return run.info.run_id, accuracy, f1
# Create multiple model versions
model_name = "credit-risk-classifier"
print("\n=== Creating Model Versions ===")
# Version 1: Small model
run_id_v1, acc_v1, f1_v1 = train_and_register_model(
model_name, n_estimators=10, max_depth=5
)
# Version 2: Medium model
run_id_v2, acc_v2, f1_v2 = train_and_register_model(
model_name, n_estimators=50, max_depth=10
)
# Version 3: Large model
run_id_v3, acc_v3, f1_v3 = train_and_register_model(
model_name, n_estimators=100, max_depth=15
)
# Check registered model versions
print(f"\n=== {model_name} Version List ===")
for mv in client.search_model_versions(f"name='{model_name}'"):
print(f"\nVersion: {mv.version}")
print(f" Run ID: {mv.run_id}")
print(f" Stage: {mv.current_stage}")
print(f" Created: {mv.creation_timestamp}")
Stage Transitions
def transition_model_stage(model_name, version, stage, description=""):
"""Transition model to specified stage"""
client.transition_model_version_stage(
name=model_name,
version=version,
stage=stage,
archive_existing_versions=False
)
# Add description
if description:
client.update_model_version(
name=model_name,
version=version,
description=description
)
print(f"ā {model_name} v{version} transitioned to {stage}")
print("\n=== Stage Management ===")
# Version 1: Staging (testing in development environment)
transition_model_stage(
model_name,
version=1,
stage="Staging",
description="Initial model. Lightweight but lower accuracy."
)
# Version 2: Production (production environment)
transition_model_stage(
model_name,
version=2,
stage="Production",
description="Current production model. Balanced performance."
)
# Version 3: Staging (under evaluation)
transition_model_stage(
model_name,
version=3,
stage="Staging",
description="Latest model. High accuracy but potentially longer inference time."
)
# Get models by stage
print("\n=== Models by Stage ===")
def get_models_by_stage(model_name, stage):
"""Get models in specific stage"""
versions = client.get_latest_versions(model_name, stages=[stage])
return versions
# Production environment models
prod_models = get_models_by_stage(model_name, "Production")
for model in prod_models:
print(f"\nProduction: {model_name} v{model.version}")
print(f" Description: {model.description}")
# Staging environment models
staging_models = get_models_by_stage(model_name, "Staging")
print(f"\nNumber of Staging models: {len(staging_models)}")
for model in staging_models:
print(f" - v{model.version}: {model.description}")
Complete Model Registry Example
class ModelRegistry:
"""Comprehensive management class for MLflow Model Registry"""
def __init__(self, tracking_uri="sqlite:///mlflow.db"):
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()
def register_model(self, model, model_name, run_id,
params, metrics, tags=None):
"""Register model"""
# Log model
with mlflow.start_run(run_id=run_id):
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name=model_name
)
mlflow.log_params(params)
mlflow.log_metrics(metrics)
if tags:
mlflow.set_tags(tags)
# Get latest version
versions = self.client.search_model_versions(
f"name='{model_name}'"
)
latest_version = max([int(v.version) for v in versions])
print(f"ā Model registration complete: {model_name} v{latest_version}")
return latest_version
def promote_to_production(self, model_name, version,
archive_old=True):
"""Promote model to production"""
# Archive existing Production models
if archive_old:
prod_models = self.client.get_latest_versions(
model_name, stages=["Production"]
)
for model in prod_models:
self.client.transition_model_version_stage(
name=model_name,
version=model.version,
stage="Archived"
)
print(f" Archived old version v{model.version}")
# Transition new version to Production
self.client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production"
)
print(f"ā {model_name} v{version} promoted to production")
def compare_versions(self, model_name, version1, version2):
"""Compare two versions"""
print(f"\n=== {model_name}: v{version1} vs v{version2} ===")
for version in [version1, version2]:
mv = self.client.get_model_version(model_name, version)
run = self.client.get_run(mv.run_id)
print(f"\nVersion {version}:")
print(f" Stage: {mv.current_stage}")
print(f" Parameters: {run.data.params}")
print(f" Metrics: {run.data.metrics}")
def get_production_model(self, model_name):
"""Get production model"""
versions = self.client.get_latest_versions(
model_name, stages=["Production"]
)
if not versions:
raise ValueError(f"{model_name} does not exist in production")
model_uri = f"models:/{model_name}/Production"
model = mlflow.sklearn.load_model(model_uri)
print(f"ā Production model loaded: {model_name} v{versions[0].version}")
return model
def add_model_alias(self, model_name, version, alias):
"""Add alias to model"""
self.client.set_registered_model_alias(
model_name, alias, version
)
print(f"ā Alias '{alias}' set to v{version}")
def delete_model_version(self, model_name, version):
"""Delete specific version"""
self.client.delete_model_version(model_name, version)
print(f"ā {model_name} v{version} deleted")
# Usage example
registry = ModelRegistry()
print("\n=== Advanced Model Registry Usage ===")
# Version comparison
registry.compare_versions(model_name, version1=1, version2=3)
# Promotion to production
registry.promote_to_production(model_name, version=3, archive_old=True)
# Get production model and inference
prod_model = registry.get_production_model(model_name)
sample_prediction = prod_model.predict(X_test[:5])
print(f"\nSample prediction: {sample_prediction}")
4.3 Model Metadata Management
Model Signature
Model signature defines the input/output schema of a model and ensures type safety.
# Requirements:
# - Python 3.9+
# - mlflow>=2.4.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
"""
Example: Model signaturedefines the input/output schema of a model an
Purpose: Demonstrate machine learning model training and evaluation
Target: Advanced
Execution time: 30-60 seconds
Dependencies: None
"""
import mlflow
from mlflow.models.signature import infer_signature, ModelSignature
from mlflow.types.schema import Schema, ColSpec
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
# Data preparation
X_train_df = pd.DataFrame(X_train, columns=[f"feature_{i}" for i in range(20)])
y_train_series = pd.Series(y_train, name="target")
# Model training
model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_train_df, y_train_series)
# Prediction
predictions = model.predict(X_train_df[:5])
predict_proba = model.predict_proba(X_train_df[:5])
print("=== Creating Model Signature ===\n")
# Method 1: Automatic inference
signature = infer_signature(X_train_df, predictions)
print("Automatically inferred signature:")
print(signature)
# Method 2: Explicit definition
from mlflow.types import Schema, ColSpec
input_schema = Schema([
ColSpec("double", f"feature_{i}") for i in range(20)
])
output_schema = Schema([ColSpec("long")])
explicit_signature = ModelSignature(
inputs=input_schema,
outputs=output_schema
)
print("\nExplicitly defined signature:")
print(explicit_signature)
# Save model with signature
with mlflow.start_run(run_name="model-with-signature"):
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
input_example=X_train_df[:5]
)
print("\nā Model with signature saved")
# Signature validation
print("\n=== Signature Validation ===")
# Correct input
correct_input = pd.DataFrame(
np.random.randn(3, 20),
columns=[f"feature_{i}" for i in range(20)]
)
print("ā Correct input format: OK")
# Wrong input (different number of columns)
try:
wrong_input = pd.DataFrame(
np.random.randn(3, 15), # Fewer columns
columns=[f"feature_{i}" for i in range(15)]
)
# MLflow checks signature
print("ā Wrong input format: Error should be detected")
except Exception as e:
print(f"ā Error detected: {type(e).__name__}")
Input/Output Schema Management
from mlflow.types.schema import Schema, ColSpec, DataType
from mlflow.models.signature import ModelSignature
import json
class SchemaManager:
"""Model schema management class"""
@staticmethod
def create_detailed_schema(feature_info):
"""Create detailed schema"""
col_specs = []
for name, dtype, description in feature_info:
col_spec = ColSpec(
type=dtype,
name=name
)
col_specs.append(col_spec)
return Schema(col_specs)
@staticmethod
def validate_input(data, schema):
"""Validate if input data conforms to schema"""
errors = []
# Check number of columns
if len(data.columns) != len(schema.inputs):
errors.append(
f"Column count mismatch: expected={len(schema.inputs)}, "
f"actual={len(data.columns)}"
)
# Check column names
expected_cols = [col.name for col in schema.inputs]
actual_cols = list(data.columns)
if expected_cols != actual_cols:
errors.append(f"Column name mismatch: {set(expected_cols) ^ set(actual_cols)}")
# Check data types
for col_spec in schema.inputs:
if col_spec.name in data.columns:
actual_dtype = data[col_spec.name].dtype
# Simple type check
if col_spec.type == DataType.double:
if not np.issubdtype(actual_dtype, np.floating):
errors.append(
f"{col_spec.name}: type mismatch "
f"(expected=float, actual={actual_dtype})"
)
return len(errors) == 0, errors
@staticmethod
def export_schema_json(signature, filepath):
"""Export schema to JSON format"""
schema_dict = {
"inputs": [
{
"name": col.name,
"type": str(col.type)
}
for col in signature.inputs.inputs
],
"outputs": [
{
"name": col.name if hasattr(col, 'name') else "prediction",
"type": str(col.type)
}
for col in signature.outputs.inputs
]
}
with open(filepath, 'w') as f:
json.dump(schema_dict, f, indent=2)
print(f"ā Schema exported: {filepath}")
# Usage example
print("\n=== Detailed Schema Management ===")
# Define feature information
feature_info = [
("age", DataType.long, "Age"),
("income", DataType.double, "Annual income"),
("credit_score", DataType.double, "Credit score"),
("loan_amount", DataType.double, "Loan amount"),
]
# Create schema
manager = SchemaManager()
input_schema = manager.create_detailed_schema(feature_info)
output_schema = Schema([
ColSpec(DataType.long, "prediction"),
ColSpec(DataType.double, "probability")
])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
print("Created schema:")
print(signature)
# Export schema
manager.export_schema_json(signature, "model_schema.json")
# Validation example
test_data_valid = pd.DataFrame({
"age": [35, 42],
"income": [50000.0, 75000.0],
"credit_score": [720.0, 680.0],
"loan_amount": [25000.0, 40000.0]
})
test_data_invalid = pd.DataFrame({
"age": [35, 42],
"income": [50000.0, 75000.0],
"credit_score": [720.0, 680.0]
# loan_amount is missing
})
print("\n=== Input Validation ===")
valid, errors = manager.validate_input(test_data_valid, signature)
print(f"Valid input: {valid}")
valid, errors = manager.validate_input(test_data_invalid, signature)
print(f"Invalid input: {valid}")
if errors:
for error in errors:
print(f" - {error}")
Dependency Management
# Requirements:
# - Python 3.9+
# - mlflow>=2.4.0
import mlflow
from mlflow.models import make_metric
import cloudpickle
import sys
def log_model_with_dependencies(model, model_name, conda_env=None,
pip_requirements=None):
"""Save model with dependencies"""
with mlflow.start_run(run_name="model-with-deps"):
# Log current environment information
mlflow.log_param("python_version", sys.version)
mlflow.log_param("mlflow_version", mlflow.__version__)
# Define Conda environment
if conda_env is None:
conda_env = {
"name": "model_env",
"channels": ["conda-forge"],
"dependencies": [
f"python={sys.version_info.major}.{sys.version_info.minor}",
"pip",
{
"pip": [
f"mlflow=={mlflow.__version__}",
"scikit-learn==1.3.0",
"pandas==2.0.3",
"numpy==1.24.3"
]
}
]
}
# pip requirements
if pip_requirements is None:
pip_requirements = [
"scikit-learn==1.3.0",
"pandas==2.0.3",
"numpy==1.24.3"
]
# Save model
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
conda_env=conda_env,
pip_requirements=pip_requirements,
registered_model_name=model_name
)
print(f"ā Model and dependencies saved: {model_name}")
print(f"\nConda environment:")
print(f" Python: {conda_env['dependencies'][0]}")
print(f" Packages: {len(pip_requirements)} items")
return mlflow.active_run().info.run_id
# Usage example
print("=== Saving Model with Dependencies ===\n")
model = GradientBoostingClassifier(n_estimators=50, random_state=42)
model.fit(X_train, y_train)
run_id = log_model_with_dependencies(
model=model,
model_name="credit-model-with-deps"
)
print(f"\nRun ID: {run_id}")
Performance Metrics Management
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix
)
import json
from datetime import datetime
class PerformanceMetricsManager:
"""Comprehensive model performance metrics management"""
def __init__(self):
self.metrics_history = []
def compute_classification_metrics(self, y_true, y_pred, y_prob=None):
"""Compute comprehensive metrics for classification"""
metrics = {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average='binary'),
"recall": recall_score(y_true, y_pred, average='binary'),
"f1_score": f1_score(y_true, y_pred, average='binary')
}
if y_prob is not None:
metrics["roc_auc"] = roc_auc_score(y_true, y_prob)
# Confusion matrix
cm = confusion_matrix(y_true, y_pred)
metrics["confusion_matrix"] = {
"tn": int(cm[0, 0]),
"fp": int(cm[0, 1]),
"fn": int(cm[1, 0]),
"tp": int(cm[1, 1])
}
# Business metrics
metrics["false_positive_rate"] = cm[0, 1] / (cm[0, 0] + cm[0, 1])
metrics["false_negative_rate"] = cm[1, 0] / (cm[1, 0] + cm[1, 1])
return metrics
def log_metrics_to_mlflow(self, metrics, model_version=None):
"""Log metrics to MLflow"""
# Save confusion matrix separately
cm = metrics.pop("confusion_matrix", None)
# Log scalar metrics
mlflow.log_metrics(metrics)
# Save confusion matrix as JSON
if cm:
mlflow.log_dict(cm, "confusion_matrix.json")
# Add to history with timestamp
metrics_with_time = {
"timestamp": datetime.now().isoformat(),
"model_version": model_version,
**metrics,
"confusion_matrix": cm
}
self.metrics_history.append(metrics_with_time)
print("ā Metrics logged to MLflow")
def compare_model_performance(self, metrics1, metrics2,
model1_name="Model 1",
model2_name="Model 2"):
"""Compare performance of two models"""
print(f"\n=== {model1_name} vs {model2_name} ===\n")
comparison = {}
for metric in ["accuracy", "precision", "recall", "f1_score", "roc_auc"]:
if metric in metrics1 and metric in metrics2:
val1 = metrics1[metric]
val2 = metrics2[metric]
diff = val2 - val1
pct_change = (diff / val1) * 100 if val1 > 0 else 0
comparison[metric] = {
model1_name: val1,
model2_name: val2,
"difference": diff,
"pct_change": pct_change
}
print(f"{metric}:")
print(f" {model1_name}: {val1:.4f}")
print(f" {model2_name}: {val2:.4f}")
print(f" Difference: {diff:+.4f} ({pct_change:+.2f}%)")
print()
return comparison
def export_metrics_report(self, filepath="metrics_report.json"):
"""Export metrics history as report"""
with open(filepath, 'w') as f:
json.dump(self.metrics_history, f, indent=2)
print(f"ā Metrics report exported: {filepath}")
# Usage example
print("\n=== Performance Metrics Management ===")
metrics_manager = PerformanceMetricsManager()
# Evaluate Model 1
model1 = RandomForestClassifier(n_estimators=10, random_state=42)
model1.fit(X_train, y_train)
y_pred1 = model1.predict(X_test)
y_prob1 = model1.predict_proba(X_test)[:, 1]
metrics1 = metrics_manager.compute_classification_metrics(
y_test, y_pred1, y_prob1
)
# Evaluate Model 2
model2 = RandomForestClassifier(n_estimators=100, random_state=42)
model2.fit(X_train, y_train)
y_pred2 = model2.predict(X_test)
y_prob2 = model2.predict_proba(X_test)[:, 1]
metrics2 = metrics_manager.compute_classification_metrics(
y_test, y_pred2, y_prob2
)
# Comparison
comparison = metrics_manager.compare_model_performance(
metrics1, metrics2,
model1_name="RF-10",
model2_name="RF-100"
)
# Log to MLflow
with mlflow.start_run(run_name="rf-10"):
metrics_manager.log_metrics_to_mlflow(metrics1.copy(), model_version=1)
with mlflow.start_run(run_name="rf-100"):
metrics_manager.log_metrics_to_mlflow(metrics2.copy(), model_version=2)
# Export report
metrics_manager.export_metrics_report()
4.4 Model Packaging
ONNX Format
ONNX (Open Neural Network Exchange) is an open format that enables model exchange between different frameworks.
# Requirements:
# - Python 3.9+
# - mlflow>=2.4.0
# - numpy>=1.24.0, <2.0.0
"""
Example: ONNX (Open Neural Network Exchange)is an open format that en
Purpose: Demonstrate machine learning model training and evaluation
Target: Advanced
Execution time: 30-60 seconds
Dependencies: None
"""
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
import onnxruntime as rt
import mlflow
print("=== Converting to ONNX Format ===\n")
# Model training
model = RandomForestClassifier(n_estimators=10, max_depth=5, random_state=42)
model.fit(X_train, y_train)
# Convert to ONNX format
initial_type = [('float_input', FloatTensorType([None, X_train.shape[1]]))]
onnx_model = convert_sklearn(
model,
initial_types=initial_type,
target_opset=12
)
# Save ONNX model
onnx_path = "model.onnx"
with open(onnx_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f"ā ONNX model saved: {onnx_path}")
# Inference with ONNX Runtime
print("\n=== Inference with ONNX Runtime ===")
sess = rt.InferenceSession(onnx_path)
input_name = sess.get_inputs()[0].name
output_name = sess.get_outputs()[0].name
# Execute inference
X_test_float = X_test.astype(np.float32)
onnx_pred = sess.run([output_name], {input_name: X_test_float})[0]
# Compare with scikit-learn predictions
sklearn_pred = model.predict(X_test)
print(f"ONNX predictions: {onnx_pred[:5]}")
print(f"sklearn predictions: {sklearn_pred[:5]}")
print(f"Match rate: {np.mean(onnx_pred == sklearn_pred):.2%}")
# Save to MLflow
with mlflow.start_run(run_name="onnx-model"):
mlflow.onnx.log_model(onnx_model, "onnx_model")
mlflow.log_metric("accuracy", accuracy_score(y_test, onnx_pred))
print("\nā ONNX model saved to MLflow")
print("\nAdvantages:")
print(" - Framework independence")
print(" - Fast inference")
print(" - Edge device support")
print(" - Cross-platform")
BentoML
BentoML is a framework for packaging ML models as production-ready API services.
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
"""
Example: BentoMLis a framework for packaging ML models as production-
Purpose: Demonstrate machine learning model training and evaluation
Target: Advanced
Execution time: 30-60 seconds
Dependencies: None
"""
import bentoml
from bentoml.io import NumpyNdarray, JSON
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
print("=== Model Packaging with BentoML ===\n")
# Model training
model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Save to BentoML
saved_model = bentoml.sklearn.save_model(
"credit_risk_model",
model,
signatures={
"predict": {
"batchable": True,
"batch_dim": 0,
}
},
labels={
"owner": "data-science-team",
"stage": "production"
},
metadata={
"accuracy": float(accuracy_score(y_test, model.predict(X_test))),
"model_type": "GradientBoosting",
"features": X_train.shape[1]
}
)
print(f"ā Model saved: {saved_model.tag}")
print(f" Path: {saved_model.path}")
# Create service definition
service_code = '''
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON
# Get model reference
credit_model_runner = bentoml.sklearn.get("credit_risk_model:latest").to_runner()
# Define service
svc = bentoml.Service("credit_risk_classifier", runners=[credit_model_runner])
@svc.api(input=NumpyNdarray(), output=JSON())
async def classify(input_data: np.ndarray) -> dict:
"""Credit risk classification API"""
# Execute prediction
prediction = await credit_model_runner.predict.async_run(input_data)
probabilities = await credit_model_runner.predict_proba.async_run(input_data)
return {
"predictions": prediction.tolist(),
"probabilities": probabilities.tolist()
}
'''
# Save as service.py
with open("service.py", "w") as f:
f.write(service_code)
print("\nā Service definition created: service.py")
# Bento creation configuration
bentofile_content = '''
service: "service:svc"
labels:
owner: data-science-team
project: credit-risk
include:
- "service.py"
python:
packages:
- scikit-learn==1.3.0
- pandas==2.0.3
- numpy==1.24.3
'''
with open("bentofile.yaml", "w") as f:
f.write(bentofile_content)
print("ā Bento configuration created: bentofile.yaml")
print("\nNext steps:")
print(" 1. bentoml build # Build Bento")
print(" 2. bentoml containerize credit_risk_classifier:latest # Create Docker image")
print(" 3. bentoml serve service:svc # Start service locally")
print("\nBentoML advantages:")
print(" - Easy API creation")
print(" - Auto-scaling")
print(" - Batch processing support")
print(" - Monitoring integration")
print(" - Docker containerization")
TorchScript
TorchScript is a format for optimizing and serializing PyTorch models.
# Requirements:
# - Python 3.9+
# - torch>=2.0.0, <2.3.0
"""
Example: TorchScriptis a format for optimizing and serializing PyTorc
Purpose: Demonstrate neural network implementation
Target: Advanced
Execution time: 1-5 minutes
Dependencies: None
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
print("=== Model Packaging with TorchScript ===\n")
# Simple neural network definition
class SimpleClassifier(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super(SimpleClassifier, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, num_classes)
self.dropout = nn.Dropout(0.2)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = F.relu(self.fc2(x))
x = self.dropout(x)
x = self.fc3(x)
return x
# Model instantiation
input_size = X_train.shape[1]
hidden_size = 64
num_classes = 2
model = SimpleClassifier(input_size, hidden_size, num_classes)
# Simple training
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.LongTensor(y_train)
dataset = TensorDataset(X_train_tensor, y_train_tensor)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Training loop (simplified)
model.train()
for epoch in range(5):
for batch_X, batch_y in dataloader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
print("ā Model training complete")
# Convert to TorchScript (Tracing method)
model.eval()
example_input = torch.randn(1, input_size)
traced_model = torch.jit.trace(model, example_input)
# Save
traced_model.save("model_traced.pt")
print("\nā TorchScript (traced) saved: model_traced.pt")
# Convert to TorchScript (Scripting method)
scripted_model = torch.jit.script(model)
scripted_model.save("model_scripted.pt")
print("ā TorchScript (scripted) saved: model_scripted.pt")
# Load and inference
print("\n=== Loading and Inference with TorchScript Model ===")
loaded_model = torch.jit.load("model_traced.pt")
loaded_model.eval()
# Inference on test data
X_test_tensor = torch.FloatTensor(X_test[:5])
with torch.no_grad():
outputs = loaded_model(X_test_tensor)
predictions = torch.argmax(outputs, dim=1)
print(f"Prediction results: {predictions.numpy()}")
print(f"Actual labels: {y_test[:5]}")
print("\nTorchScript advantages:")
print(" - Executable without Python dependency")
print(" - Usable from C++")
print(" - Mobile/edge device support")
print(" - Optimization for speed")
print(" - Ideal for production environments")
Comparison of Model Serialization Methods
# Requirements:
# - Python 3.9+
# - joblib>=1.3.0
import pickle
import joblib
import json
import os
from datetime import datetime
import time
class ModelSerializationComparison:
"""Compare different serialization methods"""
def __init__(self, model):
self.model = model
self.results = {}
def compare_formats(self, X_test_sample):
"""Compare each format"""
print("=== Model Serialization Comparison ===\n")
# 1. Pickle
self._test_pickle(X_test_sample)
# 2. Joblib
self._test_joblib(X_test_sample)
# 3. MLflow
self._test_mlflow(X_test_sample)
# 4. ONNX
self._test_onnx(X_test_sample)
# Display results
self._display_results()
def _test_pickle(self, X_test):
"""Test Pickle format"""
filepath = "model.pkl"
# Save
start = time.time()
with open(filepath, 'wb') as f:
pickle.dump(self.model, f)
save_time = time.time() - start
# Load
start = time.time()
with open(filepath, 'rb') as f:
loaded_model = pickle.load(f)
load_time = time.time() - start
# Inference
start = time.time()
predictions = loaded_model.predict(X_test)
inference_time = time.time() - start
self.results['Pickle'] = {
'size_mb': os.path.getsize(filepath) / 1024 / 1024,
'save_time': save_time,
'load_time': load_time,
'inference_time': inference_time
}
os.remove(filepath)
def _test_joblib(self, X_test):
"""Test Joblib format"""
filepath = "model.joblib"
start = time.time()
joblib.dump(self.model, filepath)
save_time = time.time() - start
start = time.time()
loaded_model = joblib.load(filepath)
load_time = time.time() - start
start = time.time()
predictions = loaded_model.predict(X_test)
inference_time = time.time() - start
self.results['Joblib'] = {
'size_mb': os.path.getsize(filepath) / 1024 / 1024,
'save_time': save_time,
'load_time': load_time,
'inference_time': inference_time
}
os.remove(filepath)
def _test_mlflow(self, X_test):
"""Test MLflow format"""
model_path = "mlflow_model"
start = time.time()
mlflow.sklearn.save_model(self.model, model_path)
save_time = time.time() - start
start = time.time()
loaded_model = mlflow.sklearn.load_model(model_path)
load_time = time.time() - start
start = time.time()
predictions = loaded_model.predict(X_test)
inference_time = time.time() - start
# Calculate directory size
total_size = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(model_path)
for filename in filenames
)
self.results['MLflow'] = {
'size_mb': total_size / 1024 / 1024,
'save_time': save_time,
'load_time': load_time,
'inference_time': inference_time
}
# Cleanup
import shutil
shutil.rmtree(model_path)
def _test_onnx(self, X_test):
"""Test ONNX format"""
try:
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
import onnxruntime as rt
filepath = "model.onnx"
initial_type = [('float_input', FloatTensorType([None, X_test.shape[1]]))]
start = time.time()
onnx_model = convert_sklearn(self.model, initial_types=initial_type)
with open(filepath, "wb") as f:
f.write(onnx_model.SerializeToString())
save_time = time.time() - start
start = time.time()
sess = rt.InferenceSession(filepath)
load_time = time.time() - start
input_name = sess.get_inputs()[0].name
output_name = sess.get_outputs()[0].name
start = time.time()
predictions = sess.run([output_name], {input_name: X_test.astype(np.float32)})[0]
inference_time = time.time() - start
self.results['ONNX'] = {
'size_mb': os.path.getsize(filepath) / 1024 / 1024,
'save_time': save_time,
'load_time': load_time,
'inference_time': inference_time
}
os.remove(filepath)
except ImportError:
print("ā ļø ONNX: Libraries not installed")
def _display_results(self):
"""Display results in table format"""
print("\n" + "="*70)
print(f"{'Format':<15} {'Size (MB)':<12} {'Save Time (s)':<12} {'Load Time (s)':<12} {'Inference (s)':<12}")
print("="*70)
for format_name, metrics in self.results.items():
print(f"{format_name:<15} "
f"{metrics['size_mb']:<12.3f} "
f"{metrics['save_time']:<12.4f} "
f"{metrics['load_time']:<12.4f} "
f"{metrics['inference_time']:<12.4f}")
print("="*70)
# Usage example
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
comparison = ModelSerializationComparison(model)
comparison.compare_formats(X_test[:100])
print("\nRecommendations:")
print(" - Development/Experimentation: Pickle, Joblib")
print(" - MLOps: MLflow")
print(" - Production: ONNX, TorchScript")
print(" - API Creation: BentoML")
4.5 Model Governance
Access Control
Properly manage access to models and ensure security.
from enum import Enum
from datetime import datetime
import hashlib
class UserRole(Enum):
"""User role definitions"""
DATA_SCIENTIST = "data_scientist"
ML_ENGINEER = "ml_engineer"
ADMIN = "admin"
VIEWER = "viewer"
class Permission(Enum):
"""Permission definitions"""
READ = "read"
WRITE = "write"
DEPLOY = "deploy"
DELETE = "delete"
class AccessControl:
"""Model access control system"""
# Role-permission mapping
ROLE_PERMISSIONS = {
UserRole.VIEWER: [Permission.READ],
UserRole.DATA_SCIENTIST: [Permission.READ, Permission.WRITE],
UserRole.ML_ENGINEER: [Permission.READ, Permission.WRITE, Permission.DEPLOY],
UserRole.ADMIN: [Permission.READ, Permission.WRITE, Permission.DEPLOY, Permission.DELETE]
}
def __init__(self):
self.users = {}
self.access_log = []
def add_user(self, username, role):
"""Add user"""
self.users[username] = {
'role': role,
'created_at': datetime.now(),
'api_key': self._generate_api_key(username)
}
print(f"ā User added: {username} ({role.value})")
def _generate_api_key(self, username):
"""Generate API key"""
data = f"{username}-{datetime.now().isoformat()}".encode()
return hashlib.sha256(data).hexdigest()[:32]
def check_permission(self, username, permission):
"""Check permission"""
if username not in self.users:
return False
user_role = self.users[username]['role']
allowed_permissions = self.ROLE_PERMISSIONS.get(user_role, [])
return permission in allowed_permissions
def access_model(self, username, model_name, action):
"""Attempt to access model"""
# Record access log
log_entry = {
'timestamp': datetime.now().isoformat(),
'username': username,
'model_name': model_name,
'action': action.value,
'granted': False
}
# Permission check
if not self.check_permission(username, action):
log_entry['reason'] = 'Insufficient permissions'
self.access_log.append(log_entry)
print(f"ā Access denied: {username} - {action.value} on {model_name}")
return False
log_entry['granted'] = True
self.access_log.append(log_entry)
print(f"ā Access granted: {username} - {action.value} on {model_name}")
return True
def get_access_log(self, username=None):
"""Get access log"""
if username:
return [log for log in self.access_log if log['username'] == username]
return self.access_log
def export_access_log(self, filepath="access_log.json"):
"""Export access log"""
import json
with open(filepath, 'w') as f:
json.dump(self.access_log, f, indent=2)
print(f"ā Access log exported: {filepath}")
# Usage example
print("=== Access Control System ===\n")
ac = AccessControl()
# Add users
ac.add_user("alice", UserRole.DATA_SCIENTIST)
ac.add_user("bob", UserRole.ML_ENGINEER)
ac.add_user("charlie", UserRole.VIEWER)
ac.add_user("admin", UserRole.ADMIN)
print("\n--- Access Tests ---")
# Various access attempts
ac.access_model("alice", "credit-model", Permission.READ) # OK
ac.access_model("alice", "credit-model", Permission.WRITE) # OK
ac.access_model("alice", "credit-model", Permission.DEPLOY) # NG
ac.access_model("bob", "credit-model", Permission.DEPLOY) # OK
ac.access_model("charlie", "credit-model", Permission.READ) # OK
ac.access_model("charlie", "credit-model", Permission.WRITE) # NG
ac.access_model("admin", "credit-model", Permission.DELETE) # OK
# Export logs
ac.export_access_log()
print(f"\nTotal access count: {len(ac.access_log)}")
print(f"Denied count: {sum(1 for log in ac.access_log if not log['granted'])}")
Audit Logs
import json
from datetime import datetime
from enum import Enum
class AuditEventType(Enum):
"""Audit event types"""
MODEL_REGISTERED = "model_registered"
MODEL_UPDATED = "model_updated"
MODEL_DEPLOYED = "model_deployed"
MODEL_ARCHIVED = "model_archived"
MODEL_DELETED = "model_deleted"
STAGE_TRANSITION = "stage_transition"
PERMISSION_CHANGED = "permission_changed"
class AuditLogger:
"""Comprehensive audit log system"""
def __init__(self, log_file="audit_log.json"):
self.log_file = log_file
self.events = []
def log_event(self, event_type, model_name, user, details=None):
"""Record event to log"""
event = {
'timestamp': datetime.now().isoformat(),
'event_type': event_type.value,
'model_name': model_name,
'user': user,
'details': details or {}
}
self.events.append(event)
# Append to file
with open(self.log_file, 'a') as f:
f.write(json.dumps(event) + '\n')
print(f"š Audit log recorded: {event_type.value} - {model_name} by {user}")
def get_events(self, model_name=None, user=None, event_type=None):
"""Get events with filtering"""
filtered = self.events
if model_name:
filtered = [e for e in filtered if e['model_name'] == model_name]
if user:
filtered = [e for e in filtered if e['user'] == user]
if event_type:
filtered = [e for e in filtered if e['event_type'] == event_type.value]
return filtered
def generate_audit_report(self, model_name):
"""Generate audit report for model"""
events = self.get_events(model_name=model_name)
print(f"\n=== {model_name} Audit Report ===")
print(f"Total events: {len(events)}\n")
for event in events:
print(f"{event['timestamp']}")
print(f" Event: {event['event_type']}")
print(f" Executed by: {event['user']}")
if event['details']:
print(f" Details: {event['details']}")
print()
def check_compliance(self, model_name, required_events):
"""Check compliance"""
events = self.get_events(model_name=model_name)
event_types = set(e['event_type'] for e in events)
compliance_status = {}
for required in required_events:
compliance_status[required.value] = required.value in event_types
return compliance_status
# Usage example
print("=== Audit Log System ===\n")
audit = AuditLogger()
# Record various events
audit.log_event(
AuditEventType.MODEL_REGISTERED,
"credit-model",
"alice",
{"version": 1, "accuracy": 0.85}
)
audit.log_event(
AuditEventType.STAGE_TRANSITION,
"credit-model",
"bob",
{"from_stage": "None", "to_stage": "Staging", "version": 1}
)
audit.log_event(
AuditEventType.MODEL_DEPLOYED,
"credit-model",
"admin",
{"version": 1, "environment": "production", "approved_by": "manager"}
)
# Generate audit report
audit.generate_audit_report("credit-model")
# Compliance check
print("\n=== Compliance Check ===")
required = [
AuditEventType.MODEL_REGISTERED,
AuditEventType.MODEL_DEPLOYED
]
compliance = audit.check_compliance("credit-model", required)
for req, status in compliance.items():
symbol = "ā" if status else "ā"
print(f"{symbol} {req}: {'Compliant' if status else 'Non-compliant'}")
Model Cards
Model cards are a standard format for documenting a model's intent, performance, and limitations.
from dataclasses import dataclass, asdict
from typing import List, Dict
import json
@dataclass
class ModelCard:
"""Model Card - Comprehensive model documentation"""
# Basic information
model_name: str
version: str
date: str
authors: List[str]
# Model details
model_type: str
architecture: str
training_data: Dict
# Performance
performance_metrics: Dict
test_data: Dict
# Intended use
intended_use: str
out_of_scope_use: List[str]
# Limitations
limitations: List[str]
biases: List[str]
# Ethical considerations
ethical_considerations: List[str]
# Recommendations
recommendations: List[str]
def to_dict(self):
"""Convert to dictionary"""
return asdict(self)
def to_json(self, filepath):
"""Save in JSON format"""
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)
print(f"ā Model card saved: {filepath}")
def to_markdown(self, filepath):
"""Save in Markdown format"""
md_content = f"""# Model Card: {self.model_name}
## Basic Information
- **Model Name**: {self.model_name}
- **Version**: {self.version}
- **Created**: {self.date}
- **Authors**: {', '.join(self.authors)}
## Model Details
- **Model Type**: {self.model_type}
- **Architecture**: {self.architecture}
### Training Data
"""
for key, value in self.training_data.items():
md_content += f"- **{key}**: {value}\n"
md_content += f"""
## Performance Metrics
### Test Data
"""
for key, value in self.test_data.items():
md_content += f"- **{key}**: {value}\n"
md_content += "\n### Performance\n"
for metric, value in self.performance_metrics.items():
md_content += f"- **{metric}**: {value}\n"
md_content += f"""
## Intended Use
### Intended Usage
{self.intended_use}
### Out-of-Scope Use
"""
for item in self.out_of_scope_use:
md_content += f"- {item}\n"
md_content += "\n## Limitations\n"
for limitation in self.limitations:
md_content += f"- {limitation}\n"
md_content += "\n## Biases\n"
for bias in self.biases:
md_content += f"- {bias}\n"
md_content += "\n## Ethical Considerations\n"
for consideration in self.ethical_considerations:
md_content += f"- {consideration}\n"
md_content += "\n## Recommendations\n"
for recommendation in self.recommendations:
md_content += f"- {recommendation}\n"
with open(filepath, 'w', encoding='utf-8') as f:
f.write(md_content)
print(f"ā Model card (Markdown) saved: {filepath}")
# Usage example
print("=== Creating Model Card ===\n")
model_card = ModelCard(
model_name="Credit Risk Classification Model",
version="1.2.0",
date="2025-10-21",
authors=["Data Science Team", "ML Engineering Team"],
model_type="Random Forest Classifier",
architecture="100 estimators, max_depth=10",
training_data={
"Dataset": "Customer Credit Data 2023-2024",
"Samples": "100,000",
"Features": "20",
"Classes": "Approved/Rejected (balanced)"
},
performance_metrics={
"Accuracy": "0.892",
"Precision": "0.885",
"Recall": "0.901",
"F1 Score": "0.893",
"ROC AUC": "0.945"
},
test_data={
"Dataset": "Holdout test set",
"Samples": "20,000",
"Period": "2024 Q3"
},
intended_use="Credit risk assessment for personal loans. Used as a supplementary tool for lending decisions.",
out_of_scope_use=[
"Business loan evaluation",
"Employment decisions",
"Insurance premium setting",
"Automated approval without human review"
],
limitations=[
"Based on past 2 years of data; may not adapt to rapid economic changes",
"Only applicable to individuals aged 18 and above; not suitable for minors",
"Income data is self-reported and not verified",
"Does not fully account for regional differences in credit practices"
],
biases=[
"Limited samples for younger age groups may result in lower prediction accuracy for this demographic",
"Data distribution skewed toward high-income individuals, resulting in conservative predictions for low-income groups",
"Predominance of urban data; caution needed when applying to rural areas"
],
ethical_considerations=[
"Model predictions are reference information; final decisions must be made by humans",
"Establish system to explain rejection reasons to customers",
"Regularly monitor model fairness and detect biases",
"Implement data management in compliance with personal information protection laws"
],
recommendations=[
"Monitor model performance quarterly and retrain if degradation is observed",
"Use in combination with human review processes",
"Ensure decisions based on model predictions comply with relevant regulations",
"Conduct regular bias audits with new data",
"Ensure transparency to stakeholders and make model operations explainable"
]
)
# Save in JSON format
model_card.to_json("model_card.json")
# Save in Markdown format
model_card.to_markdown("model_card.md")
print("\nModel card benefits:")
print(" - Improved transparency")
print(" - Ensured accountability")
print(" - Promoted appropriate use")
print(" - Clarified risks")
print(" - Compliance support")
4.6 Chapter Summary
What We Learned
Challenges in Model Management
- Versioning, metadata, lifecycle, governance
- Importance of systematic management
Model Registry
- Centralized management with MLflow Model Registry
- Version control and stage transitions
- Model promotion and rollback
Model Metadata Management
- Type safety through model signatures
- Input/output schema definition and validation
- Dependencies and performance metrics
Model Packaging
- ONNX: Framework independence
- BentoML: API creation and deployment
- TorchScript: Optimization and acceleration
- Choosing the right format
Model Governance
- Access control and RBAC
- Audit logs and compliance
- Documentation through model cards
Model Management Best Practices
| Practice | Description | Benefits |
|---|---|---|
| Unified Registry | Manage all models in one place | Visibility, traceability |
| Automatic Versioning | Automatically record all changes | Reproducibility, audit |
| Stage Management | Clarify development/staging/production | Risk management |
| Rich Metadata | Record all related information | Searchability, understanding |
| Access Control | Role-based permission management | Security |
| Audit Logs | Record all operations | Compliance |
| Model Cards | Document intent, performance, limitations | Transparency, responsibility |
Next Chapter
In Chapter 5, we'll learn about Model Deployment:
- Batch inference and real-time inference
- Model serving (FastAPI, BentoML)
- Containerization and Kubernetes
- A/B testing and canary deployment
- Monitoring and alerting
Exercises
Question 1 (Difficulty: Easy)
List three key features of a model registry and explain the importance of each.
Sample Answer
Answer:
Version Control
- Feature: Automatically track each version of models
- Importance: Ensures reproducibility, enables rollback when issues occur, allows comparison between models
Stage Management
- Feature: Define stages such as Staging, Production, Archived
- Importance: Clarifies environments, reduces deployment risks, implements approval processes
Metadata Storage
- Feature: Save hyperparameters, metrics, descriptions, etc.
- Importance: Improves model searchability, supports decision-making, enables audit and compliance
Question 2 (Difficulty: Medium)
Write code to register a model using MLflow Model Registry and promote it from Staging to Production.
Sample Answer
# Requirements:
# - Python 3.9+
# - mlflow>=2.4.0
"""
Example: Write code to register a model using MLflow Model Registry a
Purpose: Demonstrate machine learning model training and evaluation
Target: Advanced
Execution time: 30-60 seconds
Dependencies: None
"""
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
# Setup
mlflow.set_tracking_uri("sqlite:///mlflow.db")
client = MlflowClient()
# Data preparation
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Model training and registration
model_name = "my_classifier"
with mlflow.start_run():
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Calculate metrics
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
# Register model
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name=model_name
)
# Get latest version
versions = client.search_model_versions(f"name='{model_name}'")
latest_version = max([int(v.version) for v in versions])
# Transition to Staging
client.transition_model_version_stage(
name=model_name,
version=latest_version,
stage="Staging"
)
print(f"ā Version {latest_version} transitioned to Staging")
# After testing, promote to Production
client.transition_model_version_stage(
name=model_name,
version=latest_version,
stage="Production",
archive_existing_versions=True # Archive existing Production
)
print(f"ā Version {latest_version} promoted to Production")
# Get Production model
prod_model = mlflow.sklearn.load_model(f"models:/{model_name}/Production")
print(f"ā Production model loaded")
Question 3 (Difficulty: Medium)
Write code to create a model signature and perform input data validation.
Sample Answer
# Requirements:
# - Python 3.9+
# - mlflow>=2.4.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0
"""
Example: Write code to create a model signature and perform input dat
Purpose: Demonstrate machine learning model training and evaluation
Target: Advanced
Execution time: 30-60 seconds
Dependencies: None
"""
import mlflow
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from mlflow.models.signature import infer_signature
# Data preparation
np.random.seed(42)
X_train = pd.DataFrame({
'feature_1': np.random.randn(100),
'feature_2': np.random.randn(100),
'feature_3': np.random.randn(100)
})
y_train = np.random.randint(0, 2, 100)
# Model training
model = GradientBoostingClassifier(random_state=42)
model.fit(X_train, y_train)
# Prediction (for signature inference)
predictions = model.predict(X_train[:5])
# Create signature
signature = infer_signature(X_train, predictions)
print("=== Model Signature ===")
print(signature)
# Save model with signature
with mlflow.start_run():
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
input_example=X_train[:5]
)
print("\nā Model with signature saved")
# Validation: Correct input
print("\n=== Input Validation ===")
correct_input = pd.DataFrame({
'feature_1': [1.0],
'feature_2': [2.0],
'feature_3': [3.0]
})
print(f"ā Correct input format (columns: {len(correct_input.columns)})")
# Validation: Wrong input (missing column)
try:
wrong_input = pd.DataFrame({
'feature_1': [1.0],
'feature_2': [2.0]
# feature_3 is missing
})
print(f"ā Wrong input format (columns: {len(wrong_input.columns)})")
print(" ā MLflow will detect error during actual deployment")
except Exception as e:
print(f"ā Error detected: {e}")
Question 4 (Difficulty: Hard)
Implement an access control system that restricts operations different roles of users can perform on models. Include audit logging.
Sample Answer
from enum import Enum
from datetime import datetime
import json
class UserRole(Enum):
VIEWER = "viewer"
DATA_SCIENTIST = "data_scientist"
ML_ENGINEER = "ml_engineer"
ADMIN = "admin"
class Permission(Enum):
READ = "read"
WRITE = "write"
DEPLOY = "deploy"
DELETE = "delete"
class ModelAccessControl:
"""Model access control and audit logging"""
ROLE_PERMISSIONS = {
UserRole.VIEWER: [Permission.READ],
UserRole.DATA_SCIENTIST: [Permission.READ, Permission.WRITE],
UserRole.ML_ENGINEER: [Permission.READ, Permission.WRITE, Permission.DEPLOY],
UserRole.ADMIN: [Permission.READ, Permission.WRITE, Permission.DEPLOY, Permission.DELETE]
}
def __init__(self):
self.users = {}
self.audit_log = []
def add_user(self, username, role):
"""Add user"""
self.users[username] = {'role': role, 'created_at': datetime.now()}
self._log_audit("USER_ADDED", None, username, {"role": role.value})
def check_permission(self, username, permission):
"""Check permission"""
if username not in self.users:
return False
user_role = self.users[username]['role']
return permission in self.ROLE_PERMISSIONS.get(user_role, [])
def execute_action(self, username, model_name, action):
"""Execute action (with permission check)"""
# Permission check
if not self.check_permission(username, action):
self._log_audit(
"ACCESS_DENIED",
model_name,
username,
{"action": action.value, "reason": "insufficient_permissions"}
)
print(f"ā Access denied: {username} - {action.value}")
return False
# Execute action
self._log_audit("ACTION_EXECUTED", model_name, username, {"action": action.value})
print(f"ā Action executed: {username} - {action.value} on {model_name}")
return True
def _log_audit(self, event_type, model_name, username, details):
"""Record to audit log"""
event = {
'timestamp': datetime.now().isoformat(),
'event_type': event_type,
'model_name': model_name,
'username': username,
'details': details
}
self.audit_log.append(event)
def export_audit_log(self, filepath="audit.json"):
"""Export audit log"""
with open(filepath, 'w') as f:
json.dump(self.audit_log, f, indent=2)
print(f"\nā Audit log exported: {filepath}")
def get_user_activity(self, username):
"""Get user activity"""
return [log for log in self.audit_log if log['username'] == username]
def get_model_activity(self, model_name):
"""Get model activity"""
return [log for log in self.audit_log
if log['model_name'] == model_name]
# Usage example
print("=== Access Control and Audit Logging ===\n")
access_control = ModelAccessControl()
# Add users
access_control.add_user("alice", UserRole.DATA_SCIENTIST)
access_control.add_user("bob", UserRole.ML_ENGINEER)
access_control.add_user("charlie", UserRole.VIEWER)
access_control.add_user("admin", UserRole.ADMIN)
print("\n--- Action Execution ---")
# Various actions
access_control.execute_action("alice", "credit-model", Permission.READ)
access_control.execute_action("alice", "credit-model", Permission.WRITE)
access_control.execute_action("alice", "credit-model", Permission.DEPLOY) # Fail
access_control.execute_action("bob", "credit-model", Permission.DEPLOY)
access_control.execute_action("charlie", "credit-model", Permission.READ)
access_control.execute_action("charlie", "credit-model", Permission.WRITE) # Fail
access_control.execute_action("admin", "credit-model", Permission.DELETE)
# Export audit log
access_control.export_audit_log()
# User activity
print("\n--- Alice's Activity ---")
alice_activity = access_control.get_user_activity("alice")
for activity in alice_activity:
print(f"{activity['timestamp']}: {activity['event_type']} - {activity.get('details', {})}")
print(f"\nTotal audit events: {len(access_control.audit_log)}")
Question 5 (Difficulty: Hard)
Create a model card and export it in both JSON and Markdown formats. Include limitations and biases.
Sample Answer
from dataclasses import dataclass, asdict
import json
@dataclass
class ModelCard:
"""Comprehensive model card"""
model_name: str
version: str
date: str
authors: list
model_type: str
intended_use: str
performance: dict
limitations: list
biases: list
ethical_considerations: list
def to_json(self, filepath):
"""Export to JSON format"""
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(asdict(self), f, indent=2, ensure_ascii=False)
print(f"ā Saved in JSON format: {filepath}")
def to_markdown(self, filepath):
"""Export to Markdown format"""
md = f"""# Model Card: {self.model_name}
## Basic Information
- Model Name: {self.model_name}
- Version: {self.version}
- Created: {self.date}
- Authors: {', '.join(self.authors)}
- Model Type: {self.model_type}
## Intended Use
{self.intended_use}
## Performance Metrics
"""
for metric, value in self.performance.items():
md += f"- {metric}: {value}\n"
md += "\n## Limitations\n"
for limitation in self.limitations:
md += f"- {limitation}\n"
md += "\n## Biases\n"
for bias in self.biases:
md += f"- {bias}\n"
md += "\n## Ethical Considerations\n"
for consideration in self.ethical_considerations:
md += f"- {consideration}\n"
with open(filepath, 'w', encoding='utf-8') as f:
f.write(md)
print(f"ā Saved in Markdown format: {filepath}")
# Create model card
card = ModelCard(
model_name="Mortgage Approval Model",
version="2.1.0",
date="2025-10-21",
authors=["Data Science Team", "Risk Management Team"],
model_type="Gradient Boosting Classifier",
intended_use="Assessing approval probability in initial review of mortgage applications",
performance={
"Accuracy": "0.87",
"Precision": "0.84",
"Recall": "0.89",
"F1 Score": "0.865",
"ROC AUC": "0.92"
},
limitations=[
"Training data limited to past 3 years; not adapted to long-term economic fluctuations",
"For applicants under 25 years old, limited samples result in lower prediction accuracy",
"Insufficient data on self-employed individuals; predictions for this group tend to be conservative"
],
biases=[
"More urban than rural data; prediction accuracy varies by region",
"More high-income data; predictions tend to be stricter for low-income groups",
"Differences in approval rates by gender observed; regular monitoring required"
],
ethical_considerations=[
"Model predictions are reference information; final decisions made by human reviewers",
"Provide explanations for rejections in an accessible format",
"Regularly monitor fairness metrics to ensure no unjust discrimination",
"Implement data management compliant with personal information protection regulations"
]
)
# Export
print("=== Model Card Export ===\n")
card.to_json("model_card.json")
card.to_markdown("MODEL_CARD.md")
print("\nModel card includes:")
print(" ā Basic information and metadata")
print(" ā Performance metrics")
print(" ā Stated limitations")
print(" ā Disclosed biases")
print(" ā Ethical considerations")
References
- Sato, D., Wider, A., & Windheuser, C. (2019). Continuous Delivery for Machine Learning. Martin Fowler's Blog.
- Polyzotis, N., et al. (2018). Data Lifecycle Challenges in Production Machine Learning: A Survey. ACM SIGMOD Record.
- Mitchell, M., et al. (2019). Model Cards for Model Reporting. Proceedings of FAT* 2019.
- Paleyes, A., Urma, R. G., & Lawrence, N. D. (2022). Challenges in Deploying Machine Learning: A Survey of Case Studies. ACM Computing Surveys.
- Sculley, D., et al. (2015). Hidden Technical Debt in Machine Learning Systems. NIPS 2015.