学習目標
この章を読むことで、以下を習得できます:
- ✅ 大規模機械学習パイプラインの設計原則を理解する
- ✅ SparkとPyTorchを統合した分散ETLパイプラインを構築できる
- ✅ 分散環境でのモデル訓練とハイパーパラメータ最適化を実装できる
- ✅ 大規模パイプラインのパフォーマンス最適化手法を適用できる
- ✅ エンドツーエンドの本番環境対応MLシステムを構築できる
- ✅ 監視・メンテナンス戦略を実装できる
5.1 パイプライン設計原則
スケーラブルなMLパイプラインの要件
大規模機械学習パイプラインを設計する際の主要な考慮事項:
| 要件 | 説明 | 実装技術 |
|---|---|---|
| データスケーラビリティ | TB〜PBスケールのデータ処理 | Spark、Dask、分散ストレージ |
| 計算スケーラビリティ | マルチノード並列処理 | Ray、Horovod、Kubernetes |
| フォールトトレランス | 障害時の自動リカバリー | チェックポイント、冗長化 |
| 再現性 | 実験・モデルの完全な再現 | バージョン管理、シード固定 |
| モニタリング | リアルタイム性能監視 | Prometheus、Grafana、MLflow |
| コスト効率 | リソース使用の最適化 | オートスケーリング、スポットインスタンス |
パイプラインアーキテクチャパターン
graph TB
subgraph "データ層"
A[生データ
HDFS/S3] --> B[データ検証
Great Expectations]
B --> C[分散ETL
Apache Spark]
end
subgraph "特徴量層"
C --> D[特徴量エンジニアリング
Spark ML]
D --> E[特徴量ストア
Feast/Tecton]
end
subgraph "訓練層"
E --> F[分散訓練
Ray/Horovod]
F --> G[ハイパーパラメータ最適化
Ray Tune]
G --> H[モデルレジストリ
MLflow]
end
subgraph "推論層"
H --> I[モデル配信
Kubernetes]
I --> J[予測サービス
TorchServe]
end
subgraph "監視層"
J --> K[メトリクス収集
Prometheus]
K --> L[可視化
Grafana]
L --> M[アラート
PagerDuty]
end
コード例1: パイプライン設定クラス
"""
大規模MLパイプラインの設定管理
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import yaml
from pathlib import Path
@dataclass
class DataConfig:
"""データ処理設定"""
source_path: str
output_path: str
num_partitions: int = 1000
file_format: str = "parquet"
compression: str = "snappy"
validation_rules: Dict[str, any] = field(default_factory=dict)
@dataclass
class TrainingConfig:
"""訓練設定"""
model_type: str
num_workers: int = 4
num_gpus_per_worker: int = 1
batch_size: int = 256
max_epochs: int = 100
learning_rate: float = 0.001
checkpoint_freq: int = 10
early_stopping_patience: int = 5
@dataclass
class ResourceConfig:
"""リソース設定"""
num_nodes: int = 4
cpus_per_node: int = 16
memory_per_node: str = "64GB"
gpus_per_node: int = 4
storage_type: str = "ssd"
network_bandwidth: str = "10Gbps"
@dataclass
class MonitoringConfig:
"""監視設定"""
metrics_interval: int = 60 # 秒
log_level: str = "INFO"
alert_thresholds: Dict[str, float] = field(default_factory=dict)
dashboard_url: Optional[str] = None
@dataclass
class PipelineConfig:
"""統合パイプライン設定"""
pipeline_name: str
version: str
data: DataConfig
training: TrainingConfig
resources: ResourceConfig
monitoring: MonitoringConfig
@classmethod
def from_yaml(cls, config_path: Path) -> 'PipelineConfig':
"""YAMLファイルから設定を読み込む"""
with open(config_path) as f:
config_dict = yaml.safe_load(f)
return cls(
pipeline_name=config_dict['pipeline_name'],
version=config_dict['version'],
data=DataConfig(**config_dict['data']),
training=TrainingConfig(**config_dict['training']),
resources=ResourceConfig(**config_dict['resources']),
monitoring=MonitoringConfig(**config_dict['monitoring'])
)
def to_yaml(self, output_path: Path):
"""設定をYAMLファイルに保存"""
config_dict = {
'pipeline_name': self.pipeline_name,
'version': self.version,
'data': self.data.__dict__,
'training': self.training.__dict__,
'resources': self.resources.__dict__,
'monitoring': self.monitoring.__dict__
}
with open(output_path, 'w') as f:
yaml.dump(config_dict, f, default_flow_style=False)
# 使用例
if __name__ == "__main__":
# 設定作成
config = PipelineConfig(
pipeline_name="customer_churn_prediction",
version="v1.0.0",
data=DataConfig(
source_path="s3://my-bucket/raw-data/",
output_path="s3://my-bucket/processed-data/",
num_partitions=2000,
validation_rules={
"min_records": 1000000,
"required_columns": ["customer_id", "features", "label"]
}
),
training=TrainingConfig(
model_type="neural_network",
num_workers=8,
num_gpus_per_worker=2,
batch_size=512,
max_epochs=50
),
resources=ResourceConfig(
num_nodes=8,
cpus_per_node=32,
memory_per_node="128GB",
gpus_per_node=4
),
monitoring=MonitoringConfig(
alert_thresholds={
"accuracy": 0.85,
"latency_p99": 100.0, # ミリ秒
"error_rate": 0.01
}
)
)
# 設定保存
config.to_yaml(Path("pipeline_config.yaml"))
# 設定読み込み
loaded_config = PipelineConfig.from_yaml(Path("pipeline_config.yaml"))
print(f"Pipeline: {loaded_config.pipeline_name}")
print(f"Workers: {loaded_config.training.num_workers}")
print(f"Partitions: {loaded_config.data.num_partitions}")
💡 設計のポイント:
- 設定を環境変数やコードに埋め込まず、YAMLで外部化
- データクラスで型安全性を確保
- バージョン管理で設定の履歴を追跡
- 環境(開発/本番)ごとに設定ファイルを分離
エラーハンドリングとリトライ戦略
コード例2: 堅牢なパイプライン実行フレームワーク
"""
フォールトトレラントなパイプライン実行
"""
import time
import logging
from typing import Callable, Any, Optional, List
from functools import wraps
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
"""タスク実行状態"""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class TaskResult:
"""タスク実行結果"""
status: TaskStatus
result: Any = None
error: Optional[Exception] = None
execution_time: float = 0.0
retry_count: int = 0
class RetryStrategy:
"""リトライ戦略"""
def __init__(
self,
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
max_delay: float = 60.0,
retryable_exceptions: List[type] = None
):
self.max_retries = max_retries
self.initial_delay = initial_delay
self.backoff_factor = backoff_factor
self.max_delay = max_delay
self.retryable_exceptions = retryable_exceptions or [Exception]
def get_delay(self, retry_count: int) -> float:
"""リトライ待機時間を計算(指数バックオフ)"""
delay = self.initial_delay * (self.backoff_factor ** retry_count)
return min(delay, self.max_delay)
def should_retry(self, exception: Exception) -> bool:
"""リトライすべきか判定"""
return any(isinstance(exception, exc_type)
for exc_type in self.retryable_exceptions)
def with_retry(retry_strategy: RetryStrategy):
"""リトライ機能を追加するデコレータ"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> TaskResult:
retry_count = 0
start_time = time.time()
while retry_count <= retry_strategy.max_retries:
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
return TaskResult(
status=TaskStatus.SUCCESS,
result=result,
execution_time=execution_time,
retry_count=retry_count
)
except Exception as e:
if (retry_count < retry_strategy.max_retries and
retry_strategy.should_retry(e)):
delay = retry_strategy.get_delay(retry_count)
logging.warning(
f"Task failed (attempt {retry_count + 1}/"
f"{retry_strategy.max_retries + 1}): {str(e)}"
f"\nRetrying in {delay:.1f} seconds..."
)
time.sleep(delay)
retry_count += 1
else:
execution_time = time.time() - start_time
return TaskResult(
status=TaskStatus.FAILED,
error=e,
execution_time=execution_time,
retry_count=retry_count
)
return TaskResult(
status=TaskStatus.FAILED,
error=Exception("Max retries exceeded"),
execution_time=time.time() - start_time,
retry_count=retry_count
)
return wrapper
return decorator
class PipelineExecutor:
"""パイプライン実行エンジン"""
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = checkpoint_dir
self.logger = logging.getLogger(__name__)
def execute_stage(
self,
stage_name: str,
task_func: Callable,
retry_strategy: Optional[RetryStrategy] = None,
checkpoint: bool = True
) -> TaskResult:
"""パイプラインステージを実行"""
self.logger.info(f"Starting stage: {stage_name}")
# チェックポイントがあれば復元
if checkpoint and self._checkpoint_exists(stage_name):
self.logger.info(f"Restoring from checkpoint: {stage_name}")
return self._load_checkpoint(stage_name)
# リトライ戦略を適用
if retry_strategy:
task_func = with_retry(retry_strategy)(task_func)
# タスク実行
result = task_func()
# チェックポイント保存
if checkpoint and result.status == TaskStatus.SUCCESS:
self._save_checkpoint(stage_name, result)
self.logger.info(
f"Stage {stage_name} completed: {result.status.value} "
f"(time: {result.execution_time:.2f}s, "
f"retries: {result.retry_count})"
)
return result
def _checkpoint_exists(self, stage_name: str) -> bool:
"""チェックポイントの存在確認"""
from pathlib import Path
checkpoint_path = Path(self.checkpoint_dir) / f"{stage_name}.ckpt"
return checkpoint_path.exists()
def _save_checkpoint(self, stage_name: str, result: TaskResult):
"""チェックポイント保存"""
import pickle
from pathlib import Path
Path(self.checkpoint_dir).mkdir(exist_ok=True)
checkpoint_path = Path(self.checkpoint_dir) / f"{stage_name}.ckpt"
with open(checkpoint_path, 'wb') as f:
pickle.dump(result, f)
def _load_checkpoint(self, stage_name: str) -> TaskResult:
"""チェックポイント読み込み"""
import pickle
from pathlib import Path
checkpoint_path = Path(self.checkpoint_dir) / f"{stage_name}.ckpt"
with open(checkpoint_path, 'rb') as f:
return pickle.load(f)
# 使用例
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# リトライ戦略定義
retry_strategy = RetryStrategy(
max_retries=3,
initial_delay=2.0,
backoff_factor=2.0,
retryable_exceptions=[ConnectionError, TimeoutError]
)
# パイプライン実行
executor = PipelineExecutor(checkpoint_dir="./pipeline_checkpoints")
# ステージ1: データ読み込み(失敗する可能性あり)
def load_data():
import random
if random.random() < 0.3: # 30%の確率で失敗
raise ConnectionError("Failed to connect to data source")
return {"data": list(range(1000))}
result1 = executor.execute_stage(
"data_loading",
load_data,
retry_strategy=retry_strategy
)
if result1.status == TaskStatus.SUCCESS:
print(f"Data loaded: {len(result1.result['data'])} records")
# ステージ2: データ処理
def process_data():
data = result1.result['data']
processed = [x * 2 for x in data]
return {"processed_data": processed}
result2 = executor.execute_stage(
"data_processing",
process_data,
checkpoint=True
)
if result2.status == TaskStatus.SUCCESS:
print(f"Processing completed successfully")
⚠️ リトライ戦略の注意点:
- 冪等性:同じ操作を複数回実行しても結果が変わらないように設計
- タイムアウト:無限ループを防ぐため最大リトライ回数を設定
- バックオフ:サービス過負荷を防ぐため待機時間を指数的に増加
- 選択的リトライ:一時的なエラーのみリトライし、永続的なエラーは即座に失敗
5.2 データ処理パイプライン
分散ETLパイプライン構築
コード例3: Sparkベースの大規模データETL
"""
Apache Sparkによる大規模データETLパイプライン
"""
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from typing import List, Dict
import logging
class DistributedETLPipeline:
"""分散ETLパイプライン"""
def __init__(
self,
app_name: str = "MLDataPipeline",
master: str = "spark://master:7077",
num_partitions: int = 1000
):
self.spark = SparkSession.builder \
.appName(app_name) \
.master(master) \
.config("spark.sql.shuffle.partitions", num_partitions) \
.config("spark.default.parallelism", num_partitions) \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
self.logger = logging.getLogger(__name__)
def extract(
self,
source_path: str,
file_format: str = "parquet",
schema: StructType = None
) -> DataFrame:
"""データ抽出"""
self.logger.info(f"Extracting data from {source_path}")
if schema:
df = self.spark.read.schema(schema).format(file_format).load(source_path)
else:
df = self.spark.read.format(file_format).load(source_path)
self.logger.info(f"Extracted {df.count()} records with {len(df.columns)} columns")
return df
def validate(self, df: DataFrame, validation_rules: Dict) -> DataFrame:
"""データ検証"""
self.logger.info("Validating data quality")
# 必須カラムチェック
if "required_columns" in validation_rules:
missing_cols = set(validation_rules["required_columns"]) - set(df.columns)
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
# Null値チェック
if "non_null_columns" in validation_rules:
for col in validation_rules["non_null_columns"]:
null_count = df.filter(F.col(col).isNull()).count()
if null_count > 0:
self.logger.warning(f"Column {col} has {null_count} null values")
# 値範囲チェック
if "value_ranges" in validation_rules:
for col, (min_val, max_val) in validation_rules["value_ranges"].items():
df = df.filter(
(F.col(col) >= min_val) & (F.col(col) <= max_val)
)
# 重複チェック
if "unique_columns" in validation_rules:
unique_cols = validation_rules["unique_columns"]
initial_count = df.count()
df = df.dropDuplicates(unique_cols)
duplicates_removed = initial_count - df.count()
if duplicates_removed > 0:
self.logger.warning(f"Removed {duplicates_removed} duplicate records")
return df
def transform(
self,
df: DataFrame,
feature_columns: List[str],
label_column: str = None,
normalize: bool = True
) -> DataFrame:
"""データ変換と特徴量エンジニアリング"""
self.logger.info("Transforming data")
# 欠損値処理
df = self._handle_missing_values(df, feature_columns)
# カテゴリ変数エンコーディング
df = self._encode_categorical_features(df, feature_columns)
# 特徴量ベクトル作成
assembler = VectorAssembler(
inputCols=feature_columns,
outputCol="features_raw"
)
df = assembler.transform(df)
# 正規化
if normalize:
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features",
withMean=True,
withStd=True
)
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)
else:
df = df.withColumnRenamed("features_raw", "features")
# 必要なカラムのみ選択
select_cols = ["features"]
if label_column:
select_cols.append(label_column)
return df.select(select_cols)
def _handle_missing_values(
self,
df: DataFrame,
columns: List[str],
strategy: str = "mean"
) -> DataFrame:
"""欠損値処理"""
for col in columns:
if strategy == "mean":
mean_val = df.select(F.mean(col)).first()[0]
df = df.fillna({col: mean_val})
elif strategy == "median":
median_val = df.approxQuantile(col, [0.5], 0.01)[0]
df = df.fillna({col: median_val})
elif strategy == "drop":
df = df.dropna(subset=[col])
return df
def _encode_categorical_features(
self,
df: DataFrame,
feature_columns: List[str]
) -> DataFrame:
"""カテゴリ変数をエンコーディング"""
from pyspark.ml.feature import StringIndexer, OneHotEncoder
categorical_cols = [
col for col in feature_columns
if dict(df.dtypes)[col] == 'string'
]
for col in categorical_cols:
# StringIndexer
indexer = StringIndexer(
inputCol=col,
outputCol=f"{col}_index",
handleInvalid="keep"
)
df = indexer.fit(df).transform(df)
# OneHotEncoder
encoder = OneHotEncoder(
inputCol=f"{col}_index",
outputCol=f"{col}_encoded"
)
df = encoder.fit(df).transform(df)
return df
def load(
self,
df: DataFrame,
output_path: str,
file_format: str = "parquet",
mode: str = "overwrite",
partition_by: List[str] = None
):
"""データ書き込み"""
self.logger.info(f"Loading data to {output_path}")
writer = df.write.mode(mode).format(file_format)
if partition_by:
writer = writer.partitionBy(partition_by)
writer.save(output_path)
self.logger.info(f"Data successfully loaded to {output_path}")
def run_etl(
self,
source_path: str,
output_path: str,
feature_columns: List[str],
label_column: str = None,
validation_rules: Dict = None,
partition_by: List[str] = None
):
"""完全なETLパイプライン実行"""
# Extract
df = self.extract(source_path)
# Validate
if validation_rules:
df = self.validate(df, validation_rules)
# Transform
df = self.transform(df, feature_columns, label_column)
# Load
self.load(df, output_path, partition_by=partition_by)
return df
# 使用例
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# ETLパイプライン初期化
pipeline = DistributedETLPipeline(
app_name="CustomerChurnETL",
num_partitions=2000
)
# 検証ルール定義
validation_rules = {
"required_columns": ["customer_id", "age", "balance", "churn"],
"non_null_columns": ["customer_id", "churn"],
"value_ranges": {
"age": (18, 100),
"balance": (0, 1000000)
},
"unique_columns": ["customer_id"]
}
# ETL実行
feature_columns = [
"age", "balance", "num_products", "credit_score",
"country", "gender", "is_active_member"
]
pipeline.run_etl(
source_path="s3://my-bucket/raw-data/customers/",
output_path="s3://my-bucket/processed-data/customers/",
feature_columns=feature_columns,
label_column="churn",
validation_rules=validation_rules,
partition_by=["country"]
)
✅ Spark ETLのベストプラクティス:
- Adaptive Query Execution (AQE): クエリ実行計画を動的に最適化
- パーティション最適化: データサイズに応じて適切なパーティション数を設定
- キャッシング: 繰り返し使用するDataFrameをメモリにキャッシュ
- スキーマ推論回避: 大規模データでは明示的なスキーマ定義を使用
5.3 分散モデル訓練
マルチノード訓練とハイパーパラメータ最適化
コード例4: Ray Tuneによる分散ハイパーパラメータ最適化
"""
Ray Tuneを使った大規模ハイパーパラメータ最適化
"""
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from typing import Dict
class NeuralNetwork(nn.Module):
"""シンプルなニューラルネットワーク"""
def __init__(self, input_dim: int, hidden_dims: list, output_dim: int, dropout: float):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
def train_model(config: Dict, checkpoint_dir=None):
"""
訓練関数(Ray Tuneから呼び出される)
Args:
config: ハイパーパラメータ設定
checkpoint_dir: チェックポイントディレクトリ
"""
# データ準備(実際はSparkから読み込む)
np.random.seed(42)
X_train = np.random.randn(10000, 50).astype(np.float32)
y_train = np.random.randint(0, 2, 10000).astype(np.int64)
X_val = np.random.randn(2000, 50).astype(np.float32)
y_val = np.random.randint(0, 2, 2000).astype(np.int64)
train_dataset = TensorDataset(
torch.from_numpy(X_train),
torch.from_numpy(y_train)
)
val_dataset = TensorDataset(
torch.from_numpy(X_val),
torch.from_numpy(y_val)
)
train_loader = DataLoader(
train_dataset,
batch_size=config["batch_size"],
shuffle=True,
num_workers=2
)
val_loader = DataLoader(
val_dataset,
batch_size=config["batch_size"],
num_workers=2
)
# モデル初期化
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = NeuralNetwork(
input_dim=50,
hidden_dims=config["hidden_dims"],
output_dim=2,
dropout=config["dropout"]
).to(device)
# オプティマイザと損失関数
optimizer = torch.optim.Adam(
model.parameters(),
lr=config["lr"],
weight_decay=config["weight_decay"]
)
criterion = nn.CrossEntropyLoss()
# チェックポイントから復元
if checkpoint_dir:
checkpoint = torch.load(checkpoint_dir + "/checkpoint.pt")
model.load_state_dict(checkpoint["model_state"])
optimizer.load_state_dict(checkpoint["optimizer_state"])
start_epoch = checkpoint["epoch"]
else:
start_epoch = 0
# 訓練ループ
for epoch in range(start_epoch, config["max_epochs"]):
# 訓練フェーズ
model.train()
train_loss = 0.0
train_correct = 0
train_total = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
train_total += labels.size(0)
train_correct += predicted.eq(labels).sum().item()
# 検証フェーズ
model.eval()
val_loss = 0.0
val_correct = 0
val_total = 0
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item()
_, predicted = outputs.max(1)
val_total += labels.size(0)
val_correct += predicted.eq(labels).sum().item()
# メトリクス計算
train_acc = train_correct / train_total
val_acc = val_correct / val_total
# チェックポイント保存
with tune.checkpoint_dir(step=epoch) as checkpoint_dir:
torch.save({
"epoch": epoch + 1,
"model_state": model.state_dict(),
"optimizer_state": optimizer.state_dict(),
}, checkpoint_dir + "/checkpoint.pt")
# Ray Tuneにメトリクス報告
tune.report(
train_loss=train_loss / len(train_loader),
train_accuracy=train_acc,
val_loss=val_loss / len(val_loader),
val_accuracy=val_acc
)
def run_hyperparameter_optimization():
"""分散ハイパーパラメータ最適化実行"""
# Ray初期化
ray.init(
address="auto", # 既存のRayクラスタに接続
_redis_password="password"
)
# 探索空間定義
config = {
"lr": tune.loguniform(1e-5, 1e-2),
"batch_size": tune.choice([128, 256, 512]),
"hidden_dims": tune.choice([
[128, 64],
[256, 128],
[512, 256, 128]
]),
"dropout": tune.uniform(0.1, 0.5),
"weight_decay": tune.loguniform(1e-6, 1e-3),
"max_epochs": 50
}
# スケジューラ(早期停止)
scheduler = ASHAScheduler(
metric="val_accuracy",
mode="max",
max_t=50,
grace_period=10,
reduction_factor=2
)
# サーチアルゴリズム(Optuna)
search_alg = OptunaSearch(
metric="val_accuracy",
mode="max"
)
# レポーター設定
reporter = CLIReporter(
metric_columns=["train_loss", "train_accuracy", "val_loss", "val_accuracy"],
max_progress_rows=20
)
# チューニング実行
analysis = tune.run(
train_model,
config=config,
num_samples=100, # 試行回数
scheduler=scheduler,
search_alg=search_alg,
progress_reporter=reporter,
resources_per_trial={
"cpu": 4,
"gpu": 1
},
checkpoint_at_end=True,
checkpoint_freq=10,
local_dir="./ray_results",
name="neural_network_hpo"
)
# 最良の設定を取得
best_config = analysis.get_best_config(metric="val_accuracy", mode="max")
best_trial = analysis.get_best_trial(metric="val_accuracy", mode="max")
print("\n" + "="*80)
print("Best Hyperparameters:")
print("="*80)
for key, value in best_config.items():
print(f"{key:20s}: {value}")
print(f"\nBest Validation Accuracy: {best_trial.last_result['val_accuracy']:.4f}")
# 結果をDataFrameとして取得
df = analysis.dataframe()
df.to_csv("hpo_results.csv", index=False)
ray.shutdown()
return best_config, analysis
# 使用例
if __name__ == "__main__":
best_config, analysis = run_hyperparameter_optimization()
分散訓練戦略
コード例5: PyTorch Distributed Data Parallel (DDP)
"""
PyTorch DDPによるマルチノード分散訓練
"""
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
import os
from typing import Optional
def setup_distributed(rank: int, world_size: int, backend: str = "nccl"):
"""
分散環境セットアップ
Args:
rank: 現在のプロセスのランク
world_size: 総プロセス数
backend: 通信バックエンド(nccl, gloo, mpi)
"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# プロセスグループ初期化
dist.init_process_group(backend, rank=rank, world_size=world_size)
# GPUデバイス設定
torch.cuda.set_device(rank)
def cleanup_distributed():
"""分散環境クリーンアップ"""
dist.destroy_process_group()
class DistributedTrainer:
"""分散訓練マネージャー"""
def __init__(
self,
model: nn.Module,
train_dataset,
val_dataset,
rank: int,
world_size: int,
batch_size: int = 256,
learning_rate: float = 0.001,
checkpoint_dir: str = "./checkpoints"
):
self.rank = rank
self.world_size = world_size
self.checkpoint_dir = checkpoint_dir
# デバイス設定
self.device = torch.device(f"cuda:{rank}")
# モデルをDDPでラップ
self.model = model.to(self.device)
self.model = DDP(self.model, device_ids=[rank])
# データローダー(DistributedSamplerを使用)
train_sampler = DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank,
shuffle=True
)
self.train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
sampler=train_sampler,
num_workers=4,
pin_memory=True
)
val_sampler = DistributedSampler(
val_dataset,
num_replicas=world_size,
rank=rank,
shuffle=False
)
self.val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
sampler=val_sampler,
num_workers=4,
pin_memory=True
)
# オプティマイザと損失関数
self.optimizer = torch.optim.Adam(
self.model.parameters(),
lr=learning_rate
)
self.criterion = nn.CrossEntropyLoss()
def train_epoch(self, epoch: int):
"""1エポック訓練"""
self.model.train()
self.train_loader.sampler.set_epoch(epoch) # シャッフルのシード設定
total_loss = 0.0
total_correct = 0
total_samples = 0
for batch_idx, (inputs, labels) in enumerate(self.train_loader):
inputs, labels = inputs.to(self.device), labels.to(self.device)
# Forward pass
self.optimizer.zero_grad()
outputs = self.model(inputs)
loss = self.criterion(outputs, labels)
# Backward pass
loss.backward()
self.optimizer.step()
# メトリクス計算
total_loss += loss.item()
_, predicted = outputs.max(1)
total_correct += predicted.eq(labels).sum().item()
total_samples += labels.size(0)
if batch_idx % 100 == 0 and self.rank == 0:
print(f"Epoch {epoch} [{batch_idx}/{len(self.train_loader)}] "
f"Loss: {loss.item():.4f}")
# 全プロセスでメトリクスを集約
avg_loss = self._aggregate_metric(total_loss / len(self.train_loader))
accuracy = self._aggregate_metric(total_correct / total_samples)
return avg_loss, accuracy
def validate(self):
"""検証"""
self.model.eval()
total_loss = 0.0
total_correct = 0
total_samples = 0
with torch.no_grad():
for inputs, labels in self.val_loader:
inputs, labels = inputs.to(self.device), labels.to(self.device)
outputs = self.model(inputs)
loss = self.criterion(outputs, labels)
total_loss += loss.item()
_, predicted = outputs.max(1)
total_correct += predicted.eq(labels).sum().item()
total_samples += labels.size(0)
# 全プロセスでメトリクスを集約
avg_loss = self._aggregate_metric(total_loss / len(self.val_loader))
accuracy = self._aggregate_metric(total_correct / total_samples)
return avg_loss, accuracy
def _aggregate_metric(self, value: float) -> float:
"""全プロセスでメトリクスを集約"""
tensor = torch.tensor(value).to(self.device)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
return (tensor / self.world_size).item()
def save_checkpoint(self, epoch: int, val_accuracy: float):
"""チェックポイント保存(ランク0のみ)"""
if self.rank == 0:
os.makedirs(self.checkpoint_dir, exist_ok=True)
checkpoint_path = os.path.join(
self.checkpoint_dir,
f"checkpoint_epoch_{epoch}.pt"
)
torch.save({
'epoch': epoch,
'model_state_dict': self.model.module.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'val_accuracy': val_accuracy,
}, checkpoint_path)
print(f"Checkpoint saved: {checkpoint_path}")
def load_checkpoint(self, checkpoint_path: str):
"""チェックポイント読み込み"""
checkpoint = torch.load(checkpoint_path, map_location=self.device)
self.model.module.load_state_dict(checkpoint['model_state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
return checkpoint['epoch']
def train(self, num_epochs: int, save_freq: int = 10):
"""完全な訓練ループ"""
for epoch in range(num_epochs):
# 訓練
train_loss, train_acc = self.train_epoch(epoch)
# 検証
val_loss, val_acc = self.validate()
# ログ出力(ランク0のみ)
if self.rank == 0:
print(f"\nEpoch {epoch + 1}/{num_epochs}")
print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
# チェックポイント保存
if (epoch + 1) % save_freq == 0:
self.save_checkpoint(epoch + 1, val_acc)
def distributed_training_worker(
rank: int,
world_size: int,
model_class,
train_dataset,
val_dataset
):
"""分散訓練ワーカー関数"""
# 分散環境セットアップ
setup_distributed(rank, world_size)
# モデル作成
model = model_class(input_dim=50, hidden_dims=[256, 128], output_dim=2, dropout=0.3)
# トレーナー初期化
trainer = DistributedTrainer(
model=model,
train_dataset=train_dataset,
val_dataset=val_dataset,
rank=rank,
world_size=world_size,
batch_size=256,
learning_rate=0.001
)
# 訓練実行
trainer.train(num_epochs=50, save_freq=10)
# クリーンアップ
cleanup_distributed()
# 使用例
if __name__ == "__main__":
import torch.multiprocessing as mp
from torch.utils.data import TensorDataset
import numpy as np
# ダミーデータ作成
np.random.seed(42)
X_train = torch.from_numpy(np.random.randn(100000, 50).astype(np.float32))
y_train = torch.from_numpy(np.random.randint(0, 2, 100000).astype(np.int64))
X_val = torch.from_numpy(np.random.randn(20000, 50).astype(np.float32))
y_val = torch.from_numpy(np.random.randint(0, 2, 20000).astype(np.int64))
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
# 分散訓練起動(4 GPUs)
world_size = 4
mp.spawn(
distributed_training_worker,
args=(world_size, NeuralNetwork, train_dataset, val_dataset),
nprocs=world_size,
join=True
)
💡 分散訓練の選択基準:
- Data Parallel (DP): 単一ノード、複数GPU(シンプルだが遅い)
- Distributed Data Parallel (DDP): マルチノード、効率的な勾配同期
- Fully Sharded Data Parallel (FSDP): 超大規模モデル(GPT-3クラス)
- Model Parallel: 単一GPUに乗らない巨大モデル
5.4 パフォーマンス最適化
プロファイリングとボトルネック特定
コード例6: 分散システムのプロファイリング
"""
分散機械学習パイプラインのプロファイリングと最適化
"""
import time
import psutil
import torch
from contextlib import contextmanager
from typing import Dict, List
import json
from dataclasses import dataclass, asdict
import numpy as np
@dataclass
class ProfileMetrics:
"""プロファイリングメトリクス"""
stage_name: str
execution_time: float
cpu_percent: float
memory_mb: float
gpu_memory_mb: float = 0.0
io_read_mb: float = 0.0
io_write_mb: float = 0.0
network_sent_mb: float = 0.0
network_recv_mb: float = 0.0
class PerformanceProfiler:
"""パフォーマンスプロファイラー"""
def __init__(self, enable_gpu: bool = True):
self.enable_gpu = enable_gpu and torch.cuda.is_available()
self.metrics: List[ProfileMetrics] = []
self.process = psutil.Process()
@contextmanager
def profile(self, stage_name: str):
"""ステージのプロファイリング"""
# 開始時のメトリクス
start_time = time.time()
start_cpu = self.process.cpu_percent()
start_memory = self.process.memory_info().rss / 1024 / 1024 # MB
io_start = self.process.io_counters()
net_start = psutil.net_io_counters()
if self.enable_gpu:
torch.cuda.reset_peak_memory_stats()
start_gpu_memory = torch.cuda.memory_allocated() / 1024 / 1024
try:
yield
finally:
# 終了時のメトリクス
end_time = time.time()
end_cpu = self.process.cpu_percent()
end_memory = self.process.memory_info().rss / 1024 / 1024
io_end = self.process.io_counters()
net_end = psutil.net_io_counters()
# GPU メモリ
if self.enable_gpu:
end_gpu_memory = torch.cuda.max_memory_allocated() / 1024 / 1024
else:
end_gpu_memory = 0.0
# メトリクス記録
metrics = ProfileMetrics(
stage_name=stage_name,
execution_time=end_time - start_time,
cpu_percent=(start_cpu + end_cpu) / 2,
memory_mb=end_memory - start_memory,
gpu_memory_mb=end_gpu_memory - start_gpu_memory if self.enable_gpu else 0.0,
io_read_mb=(io_end.read_bytes - io_start.read_bytes) / 1024 / 1024,
io_write_mb=(io_end.write_bytes - io_start.write_bytes) / 1024 / 1024,
network_sent_mb=(net_end.bytes_sent - net_start.bytes_sent) / 1024 / 1024,
network_recv_mb=(net_end.bytes_recv - net_start.bytes_recv) / 1024 / 1024
)
self.metrics.append(metrics)
def print_summary(self):
"""プロファイリング結果サマリー表示"""
print("\n" + "="*100)
print("Performance Profiling Summary")
print("="*100)
print(f"{'Stage':<30} {'Time (s)':<12} {'CPU %':<10} {'Mem (MB)':<12} "
f"{'GPU (MB)':<12} {'I/O Read':<12} {'I/O Write':<12}")
print("-"*100)
total_time = 0.0
for m in self.metrics:
print(f"{m.stage_name:<30} {m.execution_time:<12.2f} {m.cpu_percent:<10.1f} "
f"{m.memory_mb:<12.1f} {m.gpu_memory_mb:<12.1f} "
f"{m.io_read_mb:<12.1f} {m.io_write_mb:<12.1f}")
total_time += m.execution_time
print("-"*100)
print(f"{'Total':<30} {total_time:<12.2f}")
print("="*100)
def get_bottlenecks(self, top_k: int = 3) -> List[ProfileMetrics]:
"""ボトルネック特定"""
sorted_metrics = sorted(
self.metrics,
key=lambda m: m.execution_time,
reverse=True
)
return sorted_metrics[:top_k]
def export_json(self, output_path: str):
"""結果をJSONにエクスポート"""
data = [asdict(m) for m in self.metrics]
with open(output_path, 'w') as f:
json.dump(data, f, indent=2)
class DataLoaderOptimizer:
"""DataLoader最適化ヘルパー"""
@staticmethod
def benchmark_dataloader(
dataset,
batch_sizes: List[int],
num_workers_list: List[int],
num_iterations: int = 100
) -> Dict:
"""DataLoader設定の最適化"""
results = []
for batch_size in batch_sizes:
for num_workers in num_workers_list:
loader = torch.utils.data.DataLoader(
dataset,
batch_size=batch_size,
num_workers=num_workers,
pin_memory=True
)
start_time = time.time()
for i, batch in enumerate(loader):
if i >= num_iterations:
break
_ = batch # データロード
elapsed = time.time() - start_time
throughput = (batch_size * num_iterations) / elapsed
results.append({
'batch_size': batch_size,
'num_workers': num_workers,
'throughput': throughput,
'time_per_batch': elapsed / num_iterations
})
# 最適設定を見つける
best_config = max(results, key=lambda x: x['throughput'])
print("\nDataLoader Optimization Results:")
print(f"Best Configuration: batch_size={best_config['batch_size']}, "
f"num_workers={best_config['num_workers']}")
print(f"Throughput: {best_config['throughput']:.2f} samples/sec")
return best_config
class GPUOptimizer:
"""GPU最適化ヘルパー"""
@staticmethod
def optimize_memory():
"""GPU メモリ最適化"""
if torch.cuda.is_available():
# 未使用キャッシュクリア
torch.cuda.empty_cache()
# メモリ統計表示
allocated = torch.cuda.memory_allocated() / 1024**3
reserved = torch.cuda.memory_reserved() / 1024**3
print(f"\nGPU Memory Status:")
print(f"Allocated: {allocated:.2f} GB")
print(f"Reserved: {reserved:.2f} GB")
@staticmethod
def enable_auto_mixed_precision():
"""自動混合精度訓練を有効化"""
return torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 7
@staticmethod
def benchmark_precision(model, sample_input, num_iterations: int = 100):
"""精度別パフォーマンス比較"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
sample_input = sample_input.to(device)
results = {}
# FP32
model.float()
start = time.time()
for _ in range(num_iterations):
with torch.no_grad():
_ = model(sample_input.float())
torch.cuda.synchronize()
results['fp32'] = time.time() - start
# FP16 (if available)
if GPUOptimizer.enable_auto_mixed_precision():
model.half()
start = time.time()
for _ in range(num_iterations):
with torch.no_grad():
_ = model(sample_input.half())
torch.cuda.synchronize()
results['fp16'] = time.time() - start
print("\nPrecision Benchmark:")
for precision, elapsed in results.items():
print(f"{precision.upper()}: {elapsed:.4f}s "
f"({num_iterations/elapsed:.2f} iter/s)")
return results
# 使用例
if __name__ == "__main__":
# プロファイラー初期化
profiler = PerformanceProfiler(enable_gpu=True)
# データ準備のプロファイリング
with profiler.profile("Data Loading"):
data = torch.randn(100000, 50)
time.sleep(0.5) # シミュレーション
# 前処理のプロファイリング
with profiler.profile("Preprocessing"):
normalized_data = (data - data.mean()) / data.std()
time.sleep(0.3)
# モデル訓練のプロファイリング
with profiler.profile("Model Training"):
model = torch.nn.Linear(50, 10)
optimizer = torch.optim.Adam(model.parameters())
for _ in range(100):
outputs = model(normalized_data)
loss = outputs.sum()
loss.backward()
optimizer.step()
optimizer.zero_grad()
# 結果表示
profiler.print_summary()
# ボトルネック特定
bottlenecks = profiler.get_bottlenecks(top_k=2)
print("\nTop Bottlenecks:")
for i, m in enumerate(bottlenecks, 1):
print(f"{i}. {m.stage_name}: {m.execution_time:.2f}s")
# 結果エクスポート
profiler.export_json("profiling_results.json")
# DataLoader最適化
dataset = torch.utils.data.TensorDataset(data, torch.zeros(len(data)))
DataLoaderOptimizer.benchmark_dataloader(
dataset,
batch_sizes=[128, 256, 512],
num_workers_list=[2, 4, 8],
num_iterations=50
)
# GPU最適化
GPUOptimizer.optimize_memory()
if torch.cuda.is_available():
sample_model = torch.nn.Sequential(
torch.nn.Linear(50, 128),
torch.nn.ReLU(),
torch.nn.Linear(128, 10)
)
sample_input = torch.randn(32, 50)
GPUOptimizer.benchmark_precision(sample_model, sample_input)
I/O最適化とネットワーク効率化
💡 I/O最適化のベストプラクティス:
- データフォーマット: Parquet(列指向)は行指向フォーマット(CSV)より高速
- 圧縮: Snappy圧縮で読み込み速度とストレージ効率をバランス
- プリフェッチ: DataLoaderの`num_workers`でバックグラウンドロード
- メモリマッピング: 大規模ファイルはメモリマップで効率的にアクセス
- シャーディング: データを複数ファイルに分割し並列読み込み
5.5 エンドツーエンドの実装例
完全な大規模MLパイプライン
コード例7: Spark + PyTorch 統合パイプライン
"""
SparkとPyTorchを統合した大規模MLパイプライン
"""
from pyspark.sql import SparkSession
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from typing import List, Tuple
import pickle
class SparkDatasetConverter:
"""Spark DataFrameをPyTorch Datasetに変換"""
@staticmethod
def spark_to_pytorch(
spark_df,
feature_column: str = "features",
label_column: str = "label",
output_path: str = "/tmp/pytorch_data"
):
"""
Spark DataFrameをPyTorchで読み込み可能な形式で保存
Args:
spark_df: Spark DataFrame
feature_column: 特徴量カラム名
label_column: ラベルカラム名
output_path: 出力パス
"""
# Pandas経由で変換(小〜中規模データ)
# 大規模データの場合はパーティションごとに処理
def convert_partition(iterator):
"""各パーティションを変換"""
data_list = []
for row in iterator:
features = row[feature_column].toArray()
label = row[label_column]
data_list.append((features, label))
# パーティションごとにファイル保存
import random
partition_id = random.randint(0, 10000)
output_file = f"{output_path}/partition_{partition_id}.pkl"
with open(output_file, 'wb') as f:
pickle.dump(data_list, f)
yield (output_file, len(data_list))
# 各パーティションを処理
spark_df.rdd.mapPartitions(convert_partition).collect()
class DistributedDataset(Dataset):
"""分散保存されたデータを読み込むDataset"""
def __init__(self, data_dir: str):
from pathlib import Path
self.data_files = list(Path(data_dir).glob("partition_*.pkl"))
# 全データのインデックスを構築
self.file_indices = []
for file_path in self.data_files:
with open(file_path, 'rb') as f:
data = pickle.load(f)
self.file_indices.append((file_path, len(data)))
self.total_samples = sum(count for _, count in self.file_indices)
# キャッシュ(メモリに余裕があれば)
self.cache = {}
def __len__(self):
return self.total_samples
def __getitem__(self, idx):
# どのファイルのどのインデックスか計算
file_idx = 0
cumsum = 0
for i, (_, count) in enumerate(self.file_indices):
if idx < cumsum + count:
file_idx = i
local_idx = idx - cumsum
break
cumsum += count
# ファイルからデータ読み込み
file_path = self.file_indices[file_idx][0]
if file_path not in self.cache:
with open(file_path, 'rb') as f:
self.cache[file_path] = pickle.load(f)
features, label = self.cache[file_path][local_idx]
return torch.FloatTensor(features), torch.LongTensor([label])[0]
class EndToEndMLPipeline:
"""エンドツーエンドMLパイプライン"""
def __init__(
self,
spark_master: str = "local[*]",
app_name: str = "EndToEndML"
):
# Spark初期化
self.spark = SparkSession.builder \
.appName(app_name) \
.master(spark_master) \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
self.profiler = PerformanceProfiler()
def run_pipeline(
self,
data_path: str,
model_class,
model_params: dict,
training_config: dict,
output_dir: str = "./pipeline_output"
):
"""完全なパイプライン実行"""
# ステップ1: データ読み込み(Spark)
with self.profiler.profile("1. Data Loading (Spark)"):
raw_df = self.spark.read.parquet(data_path)
print(f"Loaded {raw_df.count()} records")
# ステップ2: データ検証
with self.profiler.profile("2. Data Validation"):
# 基本統計量
raw_df.describe().show()
# Null値チェック
null_counts = raw_df.select([
F.count(F.when(F.col(c).isNull(), c)).alias(c)
for c in raw_df.columns
])
null_counts.show()
# ステップ3: 特徴量エンジニアリング(Spark)
with self.profiler.profile("3. Feature Engineering (Spark)"):
from pyspark.ml.feature import VectorAssembler, StandardScaler
feature_cols = [c for c in raw_df.columns if c != 'label']
assembler = VectorAssembler(
inputCols=feature_cols,
outputCol="features_raw"
)
df_assembled = assembler.transform(raw_df)
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features",
withMean=True,
withStd=True
)
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)
# 訓練/テストデータ分割
train_df, test_df = df_scaled.randomSplit([0.8, 0.2], seed=42)
# ステップ4: PyTorch用データ変換
with self.profiler.profile("4. Spark to PyTorch Conversion"):
train_data_dir = f"{output_dir}/train_data"
test_data_dir = f"{output_dir}/test_data"
SparkDatasetConverter.spark_to_pytorch(
train_df.select("features", "label"),
output_path=train_data_dir
)
SparkDatasetConverter.spark_to_pytorch(
test_df.select("features", "label"),
output_path=test_data_dir
)
# ステップ5: PyTorch Dataset/DataLoader作成
with self.profiler.profile("5. PyTorch DataLoader Setup"):
train_dataset = DistributedDataset(train_data_dir)
test_dataset = DistributedDataset(test_data_dir)
train_loader = DataLoader(
train_dataset,
batch_size=training_config['batch_size'],
shuffle=True,
num_workers=4,
pin_memory=True
)
test_loader = DataLoader(
test_dataset,
batch_size=training_config['batch_size'],
num_workers=4
)
# ステップ6: モデル訓練
with self.profiler.profile("6. Model Training"):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model_class(**model_params).to(device)
optimizer = torch.optim.Adam(
model.parameters(),
lr=training_config['learning_rate']
)
criterion = nn.CrossEntropyLoss()
# 訓練ループ
for epoch in range(training_config['num_epochs']):
model.train()
train_loss = 0.0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}: Loss = {train_loss/len(train_loader):.4f}")
# ステップ7: モデル評価
with self.profiler.profile("7. Model Evaluation"):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
accuracy = correct / total
print(f"\nTest Accuracy: {accuracy:.4f}")
# ステップ8: モデル保存
with self.profiler.profile("8. Model Saving"):
model_path = f"{output_dir}/model.pt"
torch.save({
'model_state_dict': model.state_dict(),
'model_params': model_params,
'accuracy': accuracy
}, model_path)
print(f"Model saved to {model_path}")
# プロファイリング結果
self.profiler.print_summary()
self.profiler.export_json(f"{output_dir}/profiling.json")
return model, accuracy
# 使用例
if __name__ == "__main__":
# パイプライン実行
pipeline = EndToEndMLPipeline(
spark_master="spark://master:7077",
app_name="CustomerChurnPrediction"
)
# モデル定義
class ChurnPredictor(nn.Module):
def __init__(self, input_dim, hidden_dims, output_dim):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.3)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# パイプライン実行
model, accuracy = pipeline.run_pipeline(
data_path="s3://my-bucket/customer-data/",
model_class=ChurnPredictor,
model_params={
'input_dim': 50,
'hidden_dims': [256, 128, 64],
'output_dim': 2
},
training_config={
'batch_size': 512,
'learning_rate': 0.001,
'num_epochs': 50
},
output_dir="./churn_prediction_output"
)
監視とメンテナンス
コード例8: リアルタイム監視システム
"""
MLパイプラインの監視とアラートシステム
"""
import time
import threading
from dataclasses import dataclass
from typing import Dict, List, Callable, Optional
from datetime import datetime
import json
@dataclass
class Metric:
"""メトリクス"""
name: str
value: float
timestamp: datetime
tags: Dict[str, str] = None
class MetricsCollector:
"""メトリクス収集"""
def __init__(self):
self.metrics: List[Metric] = []
self.lock = threading.Lock()
def record(self, name: str, value: float, tags: Dict[str, str] = None):
"""メトリクス記録"""
with self.lock:
metric = Metric(
name=name,
value=value,
timestamp=datetime.now(),
tags=tags or {}
)
self.metrics.append(metric)
def get_latest(self, name: str, n: int = 1) -> List[Metric]:
"""最新のメトリクス取得"""
with self.lock:
filtered = [m for m in self.metrics if m.name == name]
return sorted(filtered, key=lambda m: m.timestamp, reverse=True)[:n]
def get_average(self, name: str, window_seconds: int = 60) -> Optional[float]:
"""期間内の平均値"""
now = datetime.now()
with self.lock:
recent = [
m for m in self.metrics
if m.name == name and (now - m.timestamp).total_seconds() <= window_seconds
]
if not recent:
return None
return sum(m.value for m in recent) / len(recent)
class AlertRule:
"""アラートルール"""
def __init__(
self,
name: str,
metric_name: str,
threshold: float,
comparison: str = "greater", # greater, less, equal
window_seconds: int = 60,
callback: Callable = None
):
self.name = name
self.metric_name = metric_name
self.threshold = threshold
self.comparison = comparison
self.window_seconds = window_seconds
self.callback = callback or self.default_callback
def check(self, collector: MetricsCollector) -> bool:
"""ルールチェック"""
avg_value = collector.get_average(self.metric_name, self.window_seconds)
if avg_value is None:
return False
if self.comparison == "greater":
triggered = avg_value > self.threshold
elif self.comparison == "less":
triggered = avg_value < self.threshold
else: # equal
triggered = abs(avg_value - self.threshold) < 0.0001
if triggered:
self.callback(self.name, self.metric_name, avg_value, self.threshold)
return triggered
def default_callback(self, rule_name, metric_name, value, threshold):
"""デフォルトのアラートコールバック"""
print(f"\n⚠️ ALERT: {rule_name}")
print(f" Metric: {metric_name} = {value:.4f}")
print(f" Threshold: {threshold:.4f}")
print(f" Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
class MonitoringSystem:
"""統合監視システム"""
def __init__(self, check_interval: int = 10):
self.collector = MetricsCollector()
self.alert_rules: List[AlertRule] = []
self.check_interval = check_interval
self.running = False
self.monitor_thread = None
def add_alert_rule(self, rule: AlertRule):
"""アラートルール追加"""
self.alert_rules.append(rule)
def start(self):
"""監視開始"""
self.running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print("Monitoring system started")
def stop(self):
"""監視停止"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
print("Monitoring system stopped")
def _monitor_loop(self):
"""監視ループ"""
while self.running:
for rule in self.alert_rules:
rule.check(self.collector)
time.sleep(self.check_interval)
def export_metrics(self, output_path: str):
"""メトリクスエクスポート"""
with self.collector.lock:
data = [
{
'name': m.name,
'value': m.value,
'timestamp': m.timestamp.isoformat(),
'tags': m.tags
}
for m in self.collector.metrics
]
with open(output_path, 'w') as f:
json.dump(data, f, indent=2)
# 使用例
if __name__ == "__main__":
# 監視システム初期化
monitor = MonitoringSystem(check_interval=5)
# アラートルール設定
monitor.add_alert_rule(AlertRule(
name="High Training Loss",
metric_name="train_loss",
threshold=0.5,
comparison="greater",
window_seconds=30
))
monitor.add_alert_rule(AlertRule(
name="Low Validation Accuracy",
metric_name="val_accuracy",
threshold=0.80,
comparison="less",
window_seconds=60
))
monitor.add_alert_rule(AlertRule(
name="High GPU Memory Usage",
metric_name="gpu_memory_percent",
threshold=90.0,
comparison="greater",
window_seconds=30
))
# 監視開始
monitor.start()
# シミュレーション: メトリクス記録
try:
for i in range(50):
# 訓練メトリクス(徐々に改善)
train_loss = 1.0 / (i + 1) + 0.1
val_acc = min(0.95, 0.5 + i * 0.01)
gpu_mem = 70 + (i % 5) * 5
monitor.collector.record("train_loss", train_loss)
monitor.collector.record("val_accuracy", val_acc)
monitor.collector.record("gpu_memory_percent", gpu_mem)
# 異常値を時々注入
if i == 20:
monitor.collector.record("train_loss", 0.8) # アラート発火
if i == 30:
monitor.collector.record("val_accuracy", 0.75) # アラート発火
if i == 40:
monitor.collector.record("gpu_memory_percent", 95.0) # アラート発火
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
# 監視停止とメトリクスエクスポート
monitor.stop()
monitor.export_metrics("monitoring_metrics.json")
print("\nMetrics exported to monitoring_metrics.json")
✅ 本番環境の監視項目:
- モデル性能: 精度、F1スコア、AUC-ROC、推論レイテンシ
- データ品質: 欠損率、異常値率、データドリフト検出
- システムリソース: CPU/GPU使用率、メモリ使用量、ディスクI/O
- 可用性: アップタイム、エラー率、リクエストスループット
- コスト: クラウドリソース使用量、推論コスト per request
練習問題
問題1: パイプライン設計
問題: 1日あたり10TBの新規データが生成される推薦システムを設計してください。以下の要件を満たすパイプラインアーキテクチャを提案してください:
- データ取り込みから推薦生成まで4時間以内
- 99.9%の可用性
- A/Bテスト機能
- モデルの自動再訓練
ヒント:
- 増分学習(Incremental Learning)を検討
- 特徴量ストア(Feature Store)でデータ再利用
- マルチアームバンディット for A/Bテスト
- モデル性能の自動監視とトリガー
問題2: 分散訓練の最適化
問題: 8ノード x 4 GPU (合計32 GPU) のクラスタで画像分類モデルを訓練します。以下の最適化を実装してください:
- 効率的な勾配同期戦略
- データローダーのパフォーマンスチューニング
- 混合精度訓練の適用
- チェックポイント戦略
期待される改善:
- 訓練時間を単一GPU比で25倍以上に短縮
- GPU使用率を90%以上に維持
- ネットワーク帯域幅の効率的利用
問題3: コスト最適化
問題: 月間のクラウドコストが$50,000に達しているMLパイプラインがあります。コストを30%削減しつつ、モデル性能を維持する戦略を提案してください。
考慮すべき要素:
- スポットインスタンスの活用
- オートスケーリング設定
- ストレージ階層化(Hot/Cold data)
- 計算リソースの最適化
- 不要なパイプライン実行の削減
問題4: データドリフト検出
問題: 本番環境でデータドリフトを自動検出するシステムを実装してください。以下を含めてください:
- 統計的検定(KS検定、カイ二乗検定)
- 分布の可視化
- アラート閾値の設定
- 自動再訓練のトリガー
実装例:
class DataDriftDetector:
def detect_drift(self, reference_data, current_data):
# KS検定実装
pass
def visualize_distributions(self, feature_name):
# 分布比較プロット
pass
def trigger_retraining(self, drift_score, threshold=0.05):
# 再訓練トリガー
pass
問題5: エンドツーエンドパイプライン実装
問題: 以下の要件を満たす完全なMLパイプラインを実装してください:
データ: Kaggleの "Credit Card Fraud Detection" データセット(284,807件)
要件:
- Sparkでデータ前処理(欠損値処理、正規化、不均衡データ対応)
- Ray Tuneでハイパーパラメータ最適化(100試行)
- 分散訓練(複数GPU対応)
- モデル評価(Precision, Recall, F1, AUC-ROC)
- パフォーマンスプロファイリング
- 監視システムの統合
評価基準:
- F1スコア > 0.85
- 訓練時間 < 30分(4 GPU環境)
- 完全な再現性(シード固定)
- プロファイリングレポート生成
まとめ
この章では、大規模機械学習パイプラインの設計と実装について学びました:
| トピック | 主要な学習内容 | 実践スキル |
|---|---|---|
| パイプライン設計 | スケーラビリティ、フォールトトレランス、監視 | 設定管理、エラーハンドリング、チェックポイント |
| データ処理 | Spark ETL、データ検証、特徴量エンジニアリング | 分散データ変換、品質チェック、最適化 |
| 分散訓練 | DDP、Ray Tune、ハイパーパラメータ最適化 | マルチノード訓練、効率的なHPO |
| パフォーマンス最適化 | プロファイリング、I/O最適化、GPU活用 | ボトルネック特定、混合精度訓練 |
| 本番運用 | 監視、アラート、コスト最適化 | メトリクス収集、異常検知、自動化 |
次のステップ
- Kubernetesでのデプロイ: MLパイプラインをコンテナ化し、K8sで運用
- ストリーミング処理: Kafka + Spark StreamingでリアルタイムML
- MLOps統合: MLflow、Kubeflow、SageMakerとの統合
- モデルサービング: TorchServe、TensorFlow Servingでの推論最適化
- AutoML: 自動特徴量エンジニアリング、ニューラルアーキテクチャ探索
💡 実務での応用:
大規模MLパイプラインは、推薦システム、不正検知、需要予測など、ビジネスクリティカルなアプリケーションで使用されます。本章で学んだ技術は、データサイエンティストからMLエンジニアへのキャリアパスにおいて重要なスキルです。
参考文献
書籍
- Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
- Ryza, S. et al. (2017). Advanced Analytics with Spark (2nd ed.). O'Reilly Media.
- Huyen, C. (2022). Designing Machine Learning Systems. O'Reilly Media.
- Gift, N. & Deza, A. (2021). Practical MLOps. O'Reilly Media.
論文
- Zaharia, M. et al. (2016). "Apache Spark: A Unified Engine for Big Data Processing". Communications of the ACM, 59(11), 56-65.
- Li, M. et al. (2014). "Scaling Distributed Machine Learning with the Parameter Server". OSDI, 14, 583-598.
- Goyal, P. et al. (2017). "Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour". arXiv:1706.02677.
- Liaw, R. et al. (2018). "Tune: A Research Platform for Distributed Model Selection and Training". arXiv:1807.05118.
公式ドキュメント
オンラインリソース
- Databricks Spark Deep Learning
- TensorFlow Distributed Training Guide
- Facebook: Scaling ML Infrastructure
- Netflix: Distributed Feature Generation
ツール・フレームワーク
- Apache Spark: https://spark.apache.org/
- Ray: https://www.ray.io/
- Horovod: https://horovod.ai/
- Kubeflow: https://www.kubeflow.org/
- Feast (Feature Store): https://feast.dev/