第5章:大規模機械学習パイプライン

エンドツーエンドの分散機械学習システム構築

📖 読了時間: 35-40分 📊 難易度: 上級 💻 コード例: 8個 📝 演習問題: 5問

学習目標

この章を読むことで、以下を習得できます:


5.1 パイプライン設計原則

スケーラブルなMLパイプラインの要件

大規模機械学習パイプラインを設計する際の主要な考慮事項:

要件 説明 実装技術
データスケーラビリティ TB〜PBスケールのデータ処理 Spark、Dask、分散ストレージ
計算スケーラビリティ マルチノード並列処理 Ray、Horovod、Kubernetes
フォールトトレランス 障害時の自動リカバリー チェックポイント、冗長化
再現性 実験・モデルの完全な再現 バージョン管理、シード固定
モニタリング リアルタイム性能監視 Prometheus、Grafana、MLflow
コスト効率 リソース使用の最適化 オートスケーリング、スポットインスタンス

パイプラインアーキテクチャパターン

graph TB
    subgraph "データ層"
        A[生データ
HDFS/S3] --> B[データ検証
Great Expectations] B --> C[分散ETL
Apache Spark] end subgraph "特徴量層" C --> D[特徴量エンジニアリング
Spark ML] D --> E[特徴量ストア
Feast/Tecton] end subgraph "訓練層" E --> F[分散訓練
Ray/Horovod] F --> G[ハイパーパラメータ最適化
Ray Tune] G --> H[モデルレジストリ
MLflow] end subgraph "推論層" H --> I[モデル配信
Kubernetes] I --> J[予測サービス
TorchServe] end subgraph "監視層" J --> K[メトリクス収集
Prometheus] K --> L[可視化
Grafana] L --> M[アラート
PagerDuty] end

コード例1: パイプライン設定クラス

"""
大規模MLパイプラインの設定管理
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import yaml
from pathlib import Path


@dataclass
class DataConfig:
    """データ処理設定"""
    source_path: str
    output_path: str
    num_partitions: int = 1000
    file_format: str = "parquet"
    compression: str = "snappy"
    validation_rules: Dict[str, any] = field(default_factory=dict)


@dataclass
class TrainingConfig:
    """訓練設定"""
    model_type: str
    num_workers: int = 4
    num_gpus_per_worker: int = 1
    batch_size: int = 256
    max_epochs: int = 100
    learning_rate: float = 0.001
    checkpoint_freq: int = 10
    early_stopping_patience: int = 5


@dataclass
class ResourceConfig:
    """リソース設定"""
    num_nodes: int = 4
    cpus_per_node: int = 16
    memory_per_node: str = "64GB"
    gpus_per_node: int = 4
    storage_type: str = "ssd"
    network_bandwidth: str = "10Gbps"


@dataclass
class MonitoringConfig:
    """監視設定"""
    metrics_interval: int = 60  # 秒
    log_level: str = "INFO"
    alert_thresholds: Dict[str, float] = field(default_factory=dict)
    dashboard_url: Optional[str] = None


@dataclass
class PipelineConfig:
    """統合パイプライン設定"""
    pipeline_name: str
    version: str
    data: DataConfig
    training: TrainingConfig
    resources: ResourceConfig
    monitoring: MonitoringConfig

    @classmethod
    def from_yaml(cls, config_path: Path) -> 'PipelineConfig':
        """YAMLファイルから設定を読み込む"""
        with open(config_path) as f:
            config_dict = yaml.safe_load(f)

        return cls(
            pipeline_name=config_dict['pipeline_name'],
            version=config_dict['version'],
            data=DataConfig(**config_dict['data']),
            training=TrainingConfig(**config_dict['training']),
            resources=ResourceConfig(**config_dict['resources']),
            monitoring=MonitoringConfig(**config_dict['monitoring'])
        )

    def to_yaml(self, output_path: Path):
        """設定をYAMLファイルに保存"""
        config_dict = {
            'pipeline_name': self.pipeline_name,
            'version': self.version,
            'data': self.data.__dict__,
            'training': self.training.__dict__,
            'resources': self.resources.__dict__,
            'monitoring': self.monitoring.__dict__
        }

        with open(output_path, 'w') as f:
            yaml.dump(config_dict, f, default_flow_style=False)


# 使用例
if __name__ == "__main__":
    # 設定作成
    config = PipelineConfig(
        pipeline_name="customer_churn_prediction",
        version="v1.0.0",
        data=DataConfig(
            source_path="s3://my-bucket/raw-data/",
            output_path="s3://my-bucket/processed-data/",
            num_partitions=2000,
            validation_rules={
                "min_records": 1000000,
                "required_columns": ["customer_id", "features", "label"]
            }
        ),
        training=TrainingConfig(
            model_type="neural_network",
            num_workers=8,
            num_gpus_per_worker=2,
            batch_size=512,
            max_epochs=50
        ),
        resources=ResourceConfig(
            num_nodes=8,
            cpus_per_node=32,
            memory_per_node="128GB",
            gpus_per_node=4
        ),
        monitoring=MonitoringConfig(
            alert_thresholds={
                "accuracy": 0.85,
                "latency_p99": 100.0,  # ミリ秒
                "error_rate": 0.01
            }
        )
    )

    # 設定保存
    config.to_yaml(Path("pipeline_config.yaml"))

    # 設定読み込み
    loaded_config = PipelineConfig.from_yaml(Path("pipeline_config.yaml"))
    print(f"Pipeline: {loaded_config.pipeline_name}")
    print(f"Workers: {loaded_config.training.num_workers}")
    print(f"Partitions: {loaded_config.data.num_partitions}")
💡 設計のポイント:

エラーハンドリングとリトライ戦略

コード例2: 堅牢なパイプライン実行フレームワーク

"""
フォールトトレラントなパイプライン実行
"""
import time
import logging
from typing import Callable, Any, Optional, List
from functools import wraps
from dataclasses import dataclass
from enum import Enum


class TaskStatus(Enum):
    """タスク実行状態"""
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    RETRYING = "retrying"


@dataclass
class TaskResult:
    """タスク実行結果"""
    status: TaskStatus
    result: Any = None
    error: Optional[Exception] = None
    execution_time: float = 0.0
    retry_count: int = 0


class RetryStrategy:
    """リトライ戦略"""

    def __init__(
        self,
        max_retries: int = 3,
        initial_delay: float = 1.0,
        backoff_factor: float = 2.0,
        max_delay: float = 60.0,
        retryable_exceptions: List[type] = None
    ):
        self.max_retries = max_retries
        self.initial_delay = initial_delay
        self.backoff_factor = backoff_factor
        self.max_delay = max_delay
        self.retryable_exceptions = retryable_exceptions or [Exception]

    def get_delay(self, retry_count: int) -> float:
        """リトライ待機時間を計算(指数バックオフ)"""
        delay = self.initial_delay * (self.backoff_factor ** retry_count)
        return min(delay, self.max_delay)

    def should_retry(self, exception: Exception) -> bool:
        """リトライすべきか判定"""
        return any(isinstance(exception, exc_type)
                  for exc_type in self.retryable_exceptions)


def with_retry(retry_strategy: RetryStrategy):
    """リトライ機能を追加するデコレータ"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> TaskResult:
            retry_count = 0
            start_time = time.time()

            while retry_count <= retry_strategy.max_retries:
                try:
                    result = func(*args, **kwargs)
                    execution_time = time.time() - start_time

                    return TaskResult(
                        status=TaskStatus.SUCCESS,
                        result=result,
                        execution_time=execution_time,
                        retry_count=retry_count
                    )

                except Exception as e:
                    if (retry_count < retry_strategy.max_retries and
                        retry_strategy.should_retry(e)):

                        delay = retry_strategy.get_delay(retry_count)
                        logging.warning(
                            f"Task failed (attempt {retry_count + 1}/"
                            f"{retry_strategy.max_retries + 1}): {str(e)}"
                            f"\nRetrying in {delay:.1f} seconds..."
                        )

                        time.sleep(delay)
                        retry_count += 1
                    else:
                        execution_time = time.time() - start_time
                        return TaskResult(
                            status=TaskStatus.FAILED,
                            error=e,
                            execution_time=execution_time,
                            retry_count=retry_count
                        )

            return TaskResult(
                status=TaskStatus.FAILED,
                error=Exception("Max retries exceeded"),
                execution_time=time.time() - start_time,
                retry_count=retry_count
            )

        return wrapper
    return decorator


class PipelineExecutor:
    """パイプライン実行エンジン"""

    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = checkpoint_dir
        self.logger = logging.getLogger(__name__)

    def execute_stage(
        self,
        stage_name: str,
        task_func: Callable,
        retry_strategy: Optional[RetryStrategy] = None,
        checkpoint: bool = True
    ) -> TaskResult:
        """パイプラインステージを実行"""
        self.logger.info(f"Starting stage: {stage_name}")

        # チェックポイントがあれば復元
        if checkpoint and self._checkpoint_exists(stage_name):
            self.logger.info(f"Restoring from checkpoint: {stage_name}")
            return self._load_checkpoint(stage_name)

        # リトライ戦略を適用
        if retry_strategy:
            task_func = with_retry(retry_strategy)(task_func)

        # タスク実行
        result = task_func()

        # チェックポイント保存
        if checkpoint and result.status == TaskStatus.SUCCESS:
            self._save_checkpoint(stage_name, result)

        self.logger.info(
            f"Stage {stage_name} completed: {result.status.value} "
            f"(time: {result.execution_time:.2f}s, "
            f"retries: {result.retry_count})"
        )

        return result

    def _checkpoint_exists(self, stage_name: str) -> bool:
        """チェックポイントの存在確認"""
        from pathlib import Path
        checkpoint_path = Path(self.checkpoint_dir) / f"{stage_name}.ckpt"
        return checkpoint_path.exists()

    def _save_checkpoint(self, stage_name: str, result: TaskResult):
        """チェックポイント保存"""
        import pickle
        from pathlib import Path

        Path(self.checkpoint_dir).mkdir(exist_ok=True)
        checkpoint_path = Path(self.checkpoint_dir) / f"{stage_name}.ckpt"

        with open(checkpoint_path, 'wb') as f:
            pickle.dump(result, f)

    def _load_checkpoint(self, stage_name: str) -> TaskResult:
        """チェックポイント読み込み"""
        import pickle
        from pathlib import Path

        checkpoint_path = Path(self.checkpoint_dir) / f"{stage_name}.ckpt"

        with open(checkpoint_path, 'rb') as f:
            return pickle.load(f)


# 使用例
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # リトライ戦略定義
    retry_strategy = RetryStrategy(
        max_retries=3,
        initial_delay=2.0,
        backoff_factor=2.0,
        retryable_exceptions=[ConnectionError, TimeoutError]
    )

    # パイプライン実行
    executor = PipelineExecutor(checkpoint_dir="./pipeline_checkpoints")

    # ステージ1: データ読み込み(失敗する可能性あり)
    def load_data():
        import random
        if random.random() < 0.3:  # 30%の確率で失敗
            raise ConnectionError("Failed to connect to data source")
        return {"data": list(range(1000))}

    result1 = executor.execute_stage(
        "data_loading",
        load_data,
        retry_strategy=retry_strategy
    )

    if result1.status == TaskStatus.SUCCESS:
        print(f"Data loaded: {len(result1.result['data'])} records")

        # ステージ2: データ処理
        def process_data():
            data = result1.result['data']
            processed = [x * 2 for x in data]
            return {"processed_data": processed}

        result2 = executor.execute_stage(
            "data_processing",
            process_data,
            checkpoint=True
        )

        if result2.status == TaskStatus.SUCCESS:
            print(f"Processing completed successfully")
⚠️ リトライ戦略の注意点:

5.2 データ処理パイプライン

分散ETLパイプライン構築

コード例3: Sparkベースの大規模データETL

"""
Apache Sparkによる大規模データETLパイプライン
"""
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from typing import List, Dict
import logging


class DistributedETLPipeline:
    """分散ETLパイプライン"""

    def __init__(
        self,
        app_name: str = "MLDataPipeline",
        master: str = "spark://master:7077",
        num_partitions: int = 1000
    ):
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .master(master) \
            .config("spark.sql.shuffle.partitions", num_partitions) \
            .config("spark.default.parallelism", num_partitions) \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .getOrCreate()

        self.logger = logging.getLogger(__name__)

    def extract(
        self,
        source_path: str,
        file_format: str = "parquet",
        schema: StructType = None
    ) -> DataFrame:
        """データ抽出"""
        self.logger.info(f"Extracting data from {source_path}")

        if schema:
            df = self.spark.read.schema(schema).format(file_format).load(source_path)
        else:
            df = self.spark.read.format(file_format).load(source_path)

        self.logger.info(f"Extracted {df.count()} records with {len(df.columns)} columns")
        return df

    def validate(self, df: DataFrame, validation_rules: Dict) -> DataFrame:
        """データ検証"""
        self.logger.info("Validating data quality")

        # 必須カラムチェック
        if "required_columns" in validation_rules:
            missing_cols = set(validation_rules["required_columns"]) - set(df.columns)
            if missing_cols:
                raise ValueError(f"Missing required columns: {missing_cols}")

        # Null値チェック
        if "non_null_columns" in validation_rules:
            for col in validation_rules["non_null_columns"]:
                null_count = df.filter(F.col(col).isNull()).count()
                if null_count > 0:
                    self.logger.warning(f"Column {col} has {null_count} null values")

        # 値範囲チェック
        if "value_ranges" in validation_rules:
            for col, (min_val, max_val) in validation_rules["value_ranges"].items():
                df = df.filter(
                    (F.col(col) >= min_val) & (F.col(col) <= max_val)
                )

        # 重複チェック
        if "unique_columns" in validation_rules:
            unique_cols = validation_rules["unique_columns"]
            initial_count = df.count()
            df = df.dropDuplicates(unique_cols)
            duplicates_removed = initial_count - df.count()
            if duplicates_removed > 0:
                self.logger.warning(f"Removed {duplicates_removed} duplicate records")

        return df

    def transform(
        self,
        df: DataFrame,
        feature_columns: List[str],
        label_column: str = None,
        normalize: bool = True
    ) -> DataFrame:
        """データ変換と特徴量エンジニアリング"""
        self.logger.info("Transforming data")

        # 欠損値処理
        df = self._handle_missing_values(df, feature_columns)

        # カテゴリ変数エンコーディング
        df = self._encode_categorical_features(df, feature_columns)

        # 特徴量ベクトル作成
        assembler = VectorAssembler(
            inputCols=feature_columns,
            outputCol="features_raw"
        )
        df = assembler.transform(df)

        # 正規化
        if normalize:
            scaler = StandardScaler(
                inputCol="features_raw",
                outputCol="features",
                withMean=True,
                withStd=True
            )
            scaler_model = scaler.fit(df)
            df = scaler_model.transform(df)
        else:
            df = df.withColumnRenamed("features_raw", "features")

        # 必要なカラムのみ選択
        select_cols = ["features"]
        if label_column:
            select_cols.append(label_column)

        return df.select(select_cols)

    def _handle_missing_values(
        self,
        df: DataFrame,
        columns: List[str],
        strategy: str = "mean"
    ) -> DataFrame:
        """欠損値処理"""
        for col in columns:
            if strategy == "mean":
                mean_val = df.select(F.mean(col)).first()[0]
                df = df.fillna({col: mean_val})
            elif strategy == "median":
                median_val = df.approxQuantile(col, [0.5], 0.01)[0]
                df = df.fillna({col: median_val})
            elif strategy == "drop":
                df = df.dropna(subset=[col])

        return df

    def _encode_categorical_features(
        self,
        df: DataFrame,
        feature_columns: List[str]
    ) -> DataFrame:
        """カテゴリ変数をエンコーディング"""
        from pyspark.ml.feature import StringIndexer, OneHotEncoder

        categorical_cols = [
            col for col in feature_columns
            if dict(df.dtypes)[col] == 'string'
        ]

        for col in categorical_cols:
            # StringIndexer
            indexer = StringIndexer(
                inputCol=col,
                outputCol=f"{col}_index",
                handleInvalid="keep"
            )
            df = indexer.fit(df).transform(df)

            # OneHotEncoder
            encoder = OneHotEncoder(
                inputCol=f"{col}_index",
                outputCol=f"{col}_encoded"
            )
            df = encoder.fit(df).transform(df)

        return df

    def load(
        self,
        df: DataFrame,
        output_path: str,
        file_format: str = "parquet",
        mode: str = "overwrite",
        partition_by: List[str] = None
    ):
        """データ書き込み"""
        self.logger.info(f"Loading data to {output_path}")

        writer = df.write.mode(mode).format(file_format)

        if partition_by:
            writer = writer.partitionBy(partition_by)

        writer.save(output_path)
        self.logger.info(f"Data successfully loaded to {output_path}")

    def run_etl(
        self,
        source_path: str,
        output_path: str,
        feature_columns: List[str],
        label_column: str = None,
        validation_rules: Dict = None,
        partition_by: List[str] = None
    ):
        """完全なETLパイプライン実行"""
        # Extract
        df = self.extract(source_path)

        # Validate
        if validation_rules:
            df = self.validate(df, validation_rules)

        # Transform
        df = self.transform(df, feature_columns, label_column)

        # Load
        self.load(df, output_path, partition_by=partition_by)

        return df


# 使用例
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # ETLパイプライン初期化
    pipeline = DistributedETLPipeline(
        app_name="CustomerChurnETL",
        num_partitions=2000
    )

    # 検証ルール定義
    validation_rules = {
        "required_columns": ["customer_id", "age", "balance", "churn"],
        "non_null_columns": ["customer_id", "churn"],
        "value_ranges": {
            "age": (18, 100),
            "balance": (0, 1000000)
        },
        "unique_columns": ["customer_id"]
    }

    # ETL実行
    feature_columns = [
        "age", "balance", "num_products", "credit_score",
        "country", "gender", "is_active_member"
    ]

    pipeline.run_etl(
        source_path="s3://my-bucket/raw-data/customers/",
        output_path="s3://my-bucket/processed-data/customers/",
        feature_columns=feature_columns,
        label_column="churn",
        validation_rules=validation_rules,
        partition_by=["country"]
    )
✅ Spark ETLのベストプラクティス:

5.3 分散モデル訓練

マルチノード訓練とハイパーパラメータ最適化

コード例4: Ray Tuneによる分散ハイパーパラメータ最適化

"""
Ray Tuneを使った大規模ハイパーパラメータ最適化
"""
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from typing import Dict


class NeuralNetwork(nn.Module):
    """シンプルなニューラルネットワーク"""

    def __init__(self, input_dim: int, hidden_dims: list, output_dim: int, dropout: float):
        super().__init__()

        layers = []
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            prev_dim = hidden_dim

        layers.append(nn.Linear(prev_dim, output_dim))

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)


def train_model(config: Dict, checkpoint_dir=None):
    """
    訓練関数(Ray Tuneから呼び出される)

    Args:
        config: ハイパーパラメータ設定
        checkpoint_dir: チェックポイントディレクトリ
    """
    # データ準備(実際はSparkから読み込む)
    np.random.seed(42)
    X_train = np.random.randn(10000, 50).astype(np.float32)
    y_train = np.random.randint(0, 2, 10000).astype(np.int64)
    X_val = np.random.randn(2000, 50).astype(np.float32)
    y_val = np.random.randint(0, 2, 2000).astype(np.int64)

    train_dataset = TensorDataset(
        torch.from_numpy(X_train),
        torch.from_numpy(y_train)
    )
    val_dataset = TensorDataset(
        torch.from_numpy(X_val),
        torch.from_numpy(y_val)
    )

    train_loader = DataLoader(
        train_dataset,
        batch_size=config["batch_size"],
        shuffle=True,
        num_workers=2
    )
    val_loader = DataLoader(
        val_dataset,
        batch_size=config["batch_size"],
        num_workers=2
    )

    # モデル初期化
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = NeuralNetwork(
        input_dim=50,
        hidden_dims=config["hidden_dims"],
        output_dim=2,
        dropout=config["dropout"]
    ).to(device)

    # オプティマイザと損失関数
    optimizer = torch.optim.Adam(
        model.parameters(),
        lr=config["lr"],
        weight_decay=config["weight_decay"]
    )
    criterion = nn.CrossEntropyLoss()

    # チェックポイントから復元
    if checkpoint_dir:
        checkpoint = torch.load(checkpoint_dir + "/checkpoint.pt")
        model.load_state_dict(checkpoint["model_state"])
        optimizer.load_state_dict(checkpoint["optimizer_state"])
        start_epoch = checkpoint["epoch"]
    else:
        start_epoch = 0

    # 訓練ループ
    for epoch in range(start_epoch, config["max_epochs"]):
        # 訓練フェーズ
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()

        # 検証フェーズ
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()

        # メトリクス計算
        train_acc = train_correct / train_total
        val_acc = val_correct / val_total

        # チェックポイント保存
        with tune.checkpoint_dir(step=epoch) as checkpoint_dir:
            torch.save({
                "epoch": epoch + 1,
                "model_state": model.state_dict(),
                "optimizer_state": optimizer.state_dict(),
            }, checkpoint_dir + "/checkpoint.pt")

        # Ray Tuneにメトリクス報告
        tune.report(
            train_loss=train_loss / len(train_loader),
            train_accuracy=train_acc,
            val_loss=val_loss / len(val_loader),
            val_accuracy=val_acc
        )


def run_hyperparameter_optimization():
    """分散ハイパーパラメータ最適化実行"""

    # Ray初期化
    ray.init(
        address="auto",  # 既存のRayクラスタに接続
        _redis_password="password"
    )

    # 探索空間定義
    config = {
        "lr": tune.loguniform(1e-5, 1e-2),
        "batch_size": tune.choice([128, 256, 512]),
        "hidden_dims": tune.choice([
            [128, 64],
            [256, 128],
            [512, 256, 128]
        ]),
        "dropout": tune.uniform(0.1, 0.5),
        "weight_decay": tune.loguniform(1e-6, 1e-3),
        "max_epochs": 50
    }

    # スケジューラ(早期停止)
    scheduler = ASHAScheduler(
        metric="val_accuracy",
        mode="max",
        max_t=50,
        grace_period=10,
        reduction_factor=2
    )

    # サーチアルゴリズム(Optuna)
    search_alg = OptunaSearch(
        metric="val_accuracy",
        mode="max"
    )

    # レポーター設定
    reporter = CLIReporter(
        metric_columns=["train_loss", "train_accuracy", "val_loss", "val_accuracy"],
        max_progress_rows=20
    )

    # チューニング実行
    analysis = tune.run(
        train_model,
        config=config,
        num_samples=100,  # 試行回数
        scheduler=scheduler,
        search_alg=search_alg,
        progress_reporter=reporter,
        resources_per_trial={
            "cpu": 4,
            "gpu": 1
        },
        checkpoint_at_end=True,
        checkpoint_freq=10,
        local_dir="./ray_results",
        name="neural_network_hpo"
    )

    # 最良の設定を取得
    best_config = analysis.get_best_config(metric="val_accuracy", mode="max")
    best_trial = analysis.get_best_trial(metric="val_accuracy", mode="max")

    print("\n" + "="*80)
    print("Best Hyperparameters:")
    print("="*80)
    for key, value in best_config.items():
        print(f"{key:20s}: {value}")

    print(f"\nBest Validation Accuracy: {best_trial.last_result['val_accuracy']:.4f}")

    # 結果をDataFrameとして取得
    df = analysis.dataframe()
    df.to_csv("hpo_results.csv", index=False)

    ray.shutdown()

    return best_config, analysis


# 使用例
if __name__ == "__main__":
    best_config, analysis = run_hyperparameter_optimization()

分散訓練戦略

コード例5: PyTorch Distributed Data Parallel (DDP)

"""
PyTorch DDPによるマルチノード分散訓練
"""
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
import os
from typing import Optional


def setup_distributed(rank: int, world_size: int, backend: str = "nccl"):
    """
    分散環境セットアップ

    Args:
        rank: 現在のプロセスのランク
        world_size: 総プロセス数
        backend: 通信バックエンド(nccl, gloo, mpi)
    """
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # プロセスグループ初期化
    dist.init_process_group(backend, rank=rank, world_size=world_size)

    # GPUデバイス設定
    torch.cuda.set_device(rank)


def cleanup_distributed():
    """分散環境クリーンアップ"""
    dist.destroy_process_group()


class DistributedTrainer:
    """分散訓練マネージャー"""

    def __init__(
        self,
        model: nn.Module,
        train_dataset,
        val_dataset,
        rank: int,
        world_size: int,
        batch_size: int = 256,
        learning_rate: float = 0.001,
        checkpoint_dir: str = "./checkpoints"
    ):
        self.rank = rank
        self.world_size = world_size
        self.checkpoint_dir = checkpoint_dir

        # デバイス設定
        self.device = torch.device(f"cuda:{rank}")

        # モデルをDDPでラップ
        self.model = model.to(self.device)
        self.model = DDP(self.model, device_ids=[rank])

        # データローダー(DistributedSamplerを使用)
        train_sampler = DistributedSampler(
            train_dataset,
            num_replicas=world_size,
            rank=rank,
            shuffle=True
        )

        self.train_loader = DataLoader(
            train_dataset,
            batch_size=batch_size,
            sampler=train_sampler,
            num_workers=4,
            pin_memory=True
        )

        val_sampler = DistributedSampler(
            val_dataset,
            num_replicas=world_size,
            rank=rank,
            shuffle=False
        )

        self.val_loader = DataLoader(
            val_dataset,
            batch_size=batch_size,
            sampler=val_sampler,
            num_workers=4,
            pin_memory=True
        )

        # オプティマイザと損失関数
        self.optimizer = torch.optim.Adam(
            self.model.parameters(),
            lr=learning_rate
        )
        self.criterion = nn.CrossEntropyLoss()

    def train_epoch(self, epoch: int):
        """1エポック訓練"""
        self.model.train()
        self.train_loader.sampler.set_epoch(epoch)  # シャッフルのシード設定

        total_loss = 0.0
        total_correct = 0
        total_samples = 0

        for batch_idx, (inputs, labels) in enumerate(self.train_loader):
            inputs, labels = inputs.to(self.device), labels.to(self.device)

            # Forward pass
            self.optimizer.zero_grad()
            outputs = self.model(inputs)
            loss = self.criterion(outputs, labels)

            # Backward pass
            loss.backward()
            self.optimizer.step()

            # メトリクス計算
            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total_correct += predicted.eq(labels).sum().item()
            total_samples += labels.size(0)

            if batch_idx % 100 == 0 and self.rank == 0:
                print(f"Epoch {epoch} [{batch_idx}/{len(self.train_loader)}] "
                      f"Loss: {loss.item():.4f}")

        # 全プロセスでメトリクスを集約
        avg_loss = self._aggregate_metric(total_loss / len(self.train_loader))
        accuracy = self._aggregate_metric(total_correct / total_samples)

        return avg_loss, accuracy

    def validate(self):
        """検証"""
        self.model.eval()

        total_loss = 0.0
        total_correct = 0
        total_samples = 0

        with torch.no_grad():
            for inputs, labels in self.val_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)

                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)

                total_loss += loss.item()
                _, predicted = outputs.max(1)
                total_correct += predicted.eq(labels).sum().item()
                total_samples += labels.size(0)

        # 全プロセスでメトリクスを集約
        avg_loss = self._aggregate_metric(total_loss / len(self.val_loader))
        accuracy = self._aggregate_metric(total_correct / total_samples)

        return avg_loss, accuracy

    def _aggregate_metric(self, value: float) -> float:
        """全プロセスでメトリクスを集約"""
        tensor = torch.tensor(value).to(self.device)
        dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
        return (tensor / self.world_size).item()

    def save_checkpoint(self, epoch: int, val_accuracy: float):
        """チェックポイント保存(ランク0のみ)"""
        if self.rank == 0:
            os.makedirs(self.checkpoint_dir, exist_ok=True)
            checkpoint_path = os.path.join(
                self.checkpoint_dir,
                f"checkpoint_epoch_{epoch}.pt"
            )

            torch.save({
                'epoch': epoch,
                'model_state_dict': self.model.module.state_dict(),
                'optimizer_state_dict': self.optimizer.state_dict(),
                'val_accuracy': val_accuracy,
            }, checkpoint_path)

            print(f"Checkpoint saved: {checkpoint_path}")

    def load_checkpoint(self, checkpoint_path: str):
        """チェックポイント読み込み"""
        checkpoint = torch.load(checkpoint_path, map_location=self.device)
        self.model.module.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        return checkpoint['epoch']

    def train(self, num_epochs: int, save_freq: int = 10):
        """完全な訓練ループ"""
        for epoch in range(num_epochs):
            # 訓練
            train_loss, train_acc = self.train_epoch(epoch)

            # 検証
            val_loss, val_acc = self.validate()

            # ログ出力(ランク0のみ)
            if self.rank == 0:
                print(f"\nEpoch {epoch + 1}/{num_epochs}")
                print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
                print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

            # チェックポイント保存
            if (epoch + 1) % save_freq == 0:
                self.save_checkpoint(epoch + 1, val_acc)


def distributed_training_worker(
    rank: int,
    world_size: int,
    model_class,
    train_dataset,
    val_dataset
):
    """分散訓練ワーカー関数"""
    # 分散環境セットアップ
    setup_distributed(rank, world_size)

    # モデル作成
    model = model_class(input_dim=50, hidden_dims=[256, 128], output_dim=2, dropout=0.3)

    # トレーナー初期化
    trainer = DistributedTrainer(
        model=model,
        train_dataset=train_dataset,
        val_dataset=val_dataset,
        rank=rank,
        world_size=world_size,
        batch_size=256,
        learning_rate=0.001
    )

    # 訓練実行
    trainer.train(num_epochs=50, save_freq=10)

    # クリーンアップ
    cleanup_distributed()


# 使用例
if __name__ == "__main__":
    import torch.multiprocessing as mp
    from torch.utils.data import TensorDataset
    import numpy as np

    # ダミーデータ作成
    np.random.seed(42)
    X_train = torch.from_numpy(np.random.randn(100000, 50).astype(np.float32))
    y_train = torch.from_numpy(np.random.randint(0, 2, 100000).astype(np.int64))
    X_val = torch.from_numpy(np.random.randn(20000, 50).astype(np.float32))
    y_val = torch.from_numpy(np.random.randint(0, 2, 20000).astype(np.int64))

    train_dataset = TensorDataset(X_train, y_train)
    val_dataset = TensorDataset(X_val, y_val)

    # 分散訓練起動(4 GPUs)
    world_size = 4
    mp.spawn(
        distributed_training_worker,
        args=(world_size, NeuralNetwork, train_dataset, val_dataset),
        nprocs=world_size,
        join=True
    )
💡 分散訓練の選択基準:

5.4 パフォーマンス最適化

プロファイリングとボトルネック特定

コード例6: 分散システムのプロファイリング

"""
分散機械学習パイプラインのプロファイリングと最適化
"""
import time
import psutil
import torch
from contextlib import contextmanager
from typing import Dict, List
import json
from dataclasses import dataclass, asdict
import numpy as np


@dataclass
class ProfileMetrics:
    """プロファイリングメトリクス"""
    stage_name: str
    execution_time: float
    cpu_percent: float
    memory_mb: float
    gpu_memory_mb: float = 0.0
    io_read_mb: float = 0.0
    io_write_mb: float = 0.0
    network_sent_mb: float = 0.0
    network_recv_mb: float = 0.0


class PerformanceProfiler:
    """パフォーマンスプロファイラー"""

    def __init__(self, enable_gpu: bool = True):
        self.enable_gpu = enable_gpu and torch.cuda.is_available()
        self.metrics: List[ProfileMetrics] = []
        self.process = psutil.Process()

    @contextmanager
    def profile(self, stage_name: str):
        """ステージのプロファイリング"""
        # 開始時のメトリクス
        start_time = time.time()
        start_cpu = self.process.cpu_percent()
        start_memory = self.process.memory_info().rss / 1024 / 1024  # MB

        io_start = self.process.io_counters()
        net_start = psutil.net_io_counters()

        if self.enable_gpu:
            torch.cuda.reset_peak_memory_stats()
            start_gpu_memory = torch.cuda.memory_allocated() / 1024 / 1024

        try:
            yield
        finally:
            # 終了時のメトリクス
            end_time = time.time()
            end_cpu = self.process.cpu_percent()
            end_memory = self.process.memory_info().rss / 1024 / 1024

            io_end = self.process.io_counters()
            net_end = psutil.net_io_counters()

            # GPU メモリ
            if self.enable_gpu:
                end_gpu_memory = torch.cuda.max_memory_allocated() / 1024 / 1024
            else:
                end_gpu_memory = 0.0

            # メトリクス記録
            metrics = ProfileMetrics(
                stage_name=stage_name,
                execution_time=end_time - start_time,
                cpu_percent=(start_cpu + end_cpu) / 2,
                memory_mb=end_memory - start_memory,
                gpu_memory_mb=end_gpu_memory - start_gpu_memory if self.enable_gpu else 0.0,
                io_read_mb=(io_end.read_bytes - io_start.read_bytes) / 1024 / 1024,
                io_write_mb=(io_end.write_bytes - io_start.write_bytes) / 1024 / 1024,
                network_sent_mb=(net_end.bytes_sent - net_start.bytes_sent) / 1024 / 1024,
                network_recv_mb=(net_end.bytes_recv - net_start.bytes_recv) / 1024 / 1024
            )

            self.metrics.append(metrics)

    def print_summary(self):
        """プロファイリング結果サマリー表示"""
        print("\n" + "="*100)
        print("Performance Profiling Summary")
        print("="*100)
        print(f"{'Stage':<30} {'Time (s)':<12} {'CPU %':<10} {'Mem (MB)':<12} "
              f"{'GPU (MB)':<12} {'I/O Read':<12} {'I/O Write':<12}")
        print("-"*100)

        total_time = 0.0
        for m in self.metrics:
            print(f"{m.stage_name:<30} {m.execution_time:<12.2f} {m.cpu_percent:<10.1f} "
                  f"{m.memory_mb:<12.1f} {m.gpu_memory_mb:<12.1f} "
                  f"{m.io_read_mb:<12.1f} {m.io_write_mb:<12.1f}")
            total_time += m.execution_time

        print("-"*100)
        print(f"{'Total':<30} {total_time:<12.2f}")
        print("="*100)

    def get_bottlenecks(self, top_k: int = 3) -> List[ProfileMetrics]:
        """ボトルネック特定"""
        sorted_metrics = sorted(
            self.metrics,
            key=lambda m: m.execution_time,
            reverse=True
        )
        return sorted_metrics[:top_k]

    def export_json(self, output_path: str):
        """結果をJSONにエクスポート"""
        data = [asdict(m) for m in self.metrics]
        with open(output_path, 'w') as f:
            json.dump(data, f, indent=2)


class DataLoaderOptimizer:
    """DataLoader最適化ヘルパー"""

    @staticmethod
    def benchmark_dataloader(
        dataset,
        batch_sizes: List[int],
        num_workers_list: List[int],
        num_iterations: int = 100
    ) -> Dict:
        """DataLoader設定の最適化"""
        results = []

        for batch_size in batch_sizes:
            for num_workers in num_workers_list:
                loader = torch.utils.data.DataLoader(
                    dataset,
                    batch_size=batch_size,
                    num_workers=num_workers,
                    pin_memory=True
                )

                start_time = time.time()
                for i, batch in enumerate(loader):
                    if i >= num_iterations:
                        break
                    _ = batch  # データロード
                elapsed = time.time() - start_time

                throughput = (batch_size * num_iterations) / elapsed

                results.append({
                    'batch_size': batch_size,
                    'num_workers': num_workers,
                    'throughput': throughput,
                    'time_per_batch': elapsed / num_iterations
                })

        # 最適設定を見つける
        best_config = max(results, key=lambda x: x['throughput'])

        print("\nDataLoader Optimization Results:")
        print(f"Best Configuration: batch_size={best_config['batch_size']}, "
              f"num_workers={best_config['num_workers']}")
        print(f"Throughput: {best_config['throughput']:.2f} samples/sec")

        return best_config


class GPUOptimizer:
    """GPU最適化ヘルパー"""

    @staticmethod
    def optimize_memory():
        """GPU メモリ最適化"""
        if torch.cuda.is_available():
            # 未使用キャッシュクリア
            torch.cuda.empty_cache()

            # メモリ統計表示
            allocated = torch.cuda.memory_allocated() / 1024**3
            reserved = torch.cuda.memory_reserved() / 1024**3

            print(f"\nGPU Memory Status:")
            print(f"Allocated: {allocated:.2f} GB")
            print(f"Reserved: {reserved:.2f} GB")

    @staticmethod
    def enable_auto_mixed_precision():
        """自動混合精度訓練を有効化"""
        return torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 7

    @staticmethod
    def benchmark_precision(model, sample_input, num_iterations: int = 100):
        """精度別パフォーマンス比較"""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = model.to(device)
        sample_input = sample_input.to(device)

        results = {}

        # FP32
        model.float()
        start = time.time()
        for _ in range(num_iterations):
            with torch.no_grad():
                _ = model(sample_input.float())
        torch.cuda.synchronize()
        results['fp32'] = time.time() - start

        # FP16 (if available)
        if GPUOptimizer.enable_auto_mixed_precision():
            model.half()
            start = time.time()
            for _ in range(num_iterations):
                with torch.no_grad():
                    _ = model(sample_input.half())
            torch.cuda.synchronize()
            results['fp16'] = time.time() - start

        print("\nPrecision Benchmark:")
        for precision, elapsed in results.items():
            print(f"{precision.upper()}: {elapsed:.4f}s "
                  f"({num_iterations/elapsed:.2f} iter/s)")

        return results


# 使用例
if __name__ == "__main__":
    # プロファイラー初期化
    profiler = PerformanceProfiler(enable_gpu=True)

    # データ準備のプロファイリング
    with profiler.profile("Data Loading"):
        data = torch.randn(100000, 50)
        time.sleep(0.5)  # シミュレーション

    # 前処理のプロファイリング
    with profiler.profile("Preprocessing"):
        normalized_data = (data - data.mean()) / data.std()
        time.sleep(0.3)

    # モデル訓練のプロファイリング
    with profiler.profile("Model Training"):
        model = torch.nn.Linear(50, 10)
        optimizer = torch.optim.Adam(model.parameters())

        for _ in range(100):
            outputs = model(normalized_data)
            loss = outputs.sum()
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

    # 結果表示
    profiler.print_summary()

    # ボトルネック特定
    bottlenecks = profiler.get_bottlenecks(top_k=2)
    print("\nTop Bottlenecks:")
    for i, m in enumerate(bottlenecks, 1):
        print(f"{i}. {m.stage_name}: {m.execution_time:.2f}s")

    # 結果エクスポート
    profiler.export_json("profiling_results.json")

    # DataLoader最適化
    dataset = torch.utils.data.TensorDataset(data, torch.zeros(len(data)))
    DataLoaderOptimizer.benchmark_dataloader(
        dataset,
        batch_sizes=[128, 256, 512],
        num_workers_list=[2, 4, 8],
        num_iterations=50
    )

    # GPU最適化
    GPUOptimizer.optimize_memory()
    if torch.cuda.is_available():
        sample_model = torch.nn.Sequential(
            torch.nn.Linear(50, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 10)
        )
        sample_input = torch.randn(32, 50)
        GPUOptimizer.benchmark_precision(sample_model, sample_input)

I/O最適化とネットワーク効率化

💡 I/O最適化のベストプラクティス:

5.5 エンドツーエンドの実装例

完全な大規模MLパイプライン

コード例7: Spark + PyTorch 統合パイプライン

"""
SparkとPyTorchを統合した大規模MLパイプライン
"""
from pyspark.sql import SparkSession
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from typing import List, Tuple
import pickle


class SparkDatasetConverter:
    """Spark DataFrameをPyTorch Datasetに変換"""

    @staticmethod
    def spark_to_pytorch(
        spark_df,
        feature_column: str = "features",
        label_column: str = "label",
        output_path: str = "/tmp/pytorch_data"
    ):
        """
        Spark DataFrameをPyTorchで読み込み可能な形式で保存

        Args:
            spark_df: Spark DataFrame
            feature_column: 特徴量カラム名
            label_column: ラベルカラム名
            output_path: 出力パス
        """
        # Pandas経由で変換(小〜中規模データ)
        # 大規模データの場合はパーティションごとに処理

        def convert_partition(iterator):
            """各パーティションを変換"""
            data_list = []
            for row in iterator:
                features = row[feature_column].toArray()
                label = row[label_column]
                data_list.append((features, label))

            # パーティションごとにファイル保存
            import random
            partition_id = random.randint(0, 10000)
            output_file = f"{output_path}/partition_{partition_id}.pkl"

            with open(output_file, 'wb') as f:
                pickle.dump(data_list, f)

            yield (output_file, len(data_list))

        # 各パーティションを処理
        spark_df.rdd.mapPartitions(convert_partition).collect()


class DistributedDataset(Dataset):
    """分散保存されたデータを読み込むDataset"""

    def __init__(self, data_dir: str):
        from pathlib import Path

        self.data_files = list(Path(data_dir).glob("partition_*.pkl"))

        # 全データのインデックスを構築
        self.file_indices = []
        for file_path in self.data_files:
            with open(file_path, 'rb') as f:
                data = pickle.load(f)
                self.file_indices.append((file_path, len(data)))

        self.total_samples = sum(count for _, count in self.file_indices)

        # キャッシュ(メモリに余裕があれば)
        self.cache = {}

    def __len__(self):
        return self.total_samples

    def __getitem__(self, idx):
        # どのファイルのどのインデックスか計算
        file_idx = 0
        cumsum = 0

        for i, (_, count) in enumerate(self.file_indices):
            if idx < cumsum + count:
                file_idx = i
                local_idx = idx - cumsum
                break
            cumsum += count

        # ファイルからデータ読み込み
        file_path = self.file_indices[file_idx][0]

        if file_path not in self.cache:
            with open(file_path, 'rb') as f:
                self.cache[file_path] = pickle.load(f)

        features, label = self.cache[file_path][local_idx]

        return torch.FloatTensor(features), torch.LongTensor([label])[0]


class EndToEndMLPipeline:
    """エンドツーエンドMLパイプライン"""

    def __init__(
        self,
        spark_master: str = "local[*]",
        app_name: str = "EndToEndML"
    ):
        # Spark初期化
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .master(spark_master) \
            .config("spark.sql.adaptive.enabled", "true") \
            .getOrCreate()

        self.profiler = PerformanceProfiler()

    def run_pipeline(
        self,
        data_path: str,
        model_class,
        model_params: dict,
        training_config: dict,
        output_dir: str = "./pipeline_output"
    ):
        """完全なパイプライン実行"""

        # ステップ1: データ読み込み(Spark)
        with self.profiler.profile("1. Data Loading (Spark)"):
            raw_df = self.spark.read.parquet(data_path)
            print(f"Loaded {raw_df.count()} records")

        # ステップ2: データ検証
        with self.profiler.profile("2. Data Validation"):
            # 基本統計量
            raw_df.describe().show()

            # Null値チェック
            null_counts = raw_df.select([
                F.count(F.when(F.col(c).isNull(), c)).alias(c)
                for c in raw_df.columns
            ])
            null_counts.show()

        # ステップ3: 特徴量エンジニアリング(Spark)
        with self.profiler.profile("3. Feature Engineering (Spark)"):
            from pyspark.ml.feature import VectorAssembler, StandardScaler

            feature_cols = [c for c in raw_df.columns if c != 'label']

            assembler = VectorAssembler(
                inputCols=feature_cols,
                outputCol="features_raw"
            )
            df_assembled = assembler.transform(raw_df)

            scaler = StandardScaler(
                inputCol="features_raw",
                outputCol="features",
                withMean=True,
                withStd=True
            )
            scaler_model = scaler.fit(df_assembled)
            df_scaled = scaler_model.transform(df_assembled)

            # 訓練/テストデータ分割
            train_df, test_df = df_scaled.randomSplit([0.8, 0.2], seed=42)

        # ステップ4: PyTorch用データ変換
        with self.profiler.profile("4. Spark to PyTorch Conversion"):
            train_data_dir = f"{output_dir}/train_data"
            test_data_dir = f"{output_dir}/test_data"

            SparkDatasetConverter.spark_to_pytorch(
                train_df.select("features", "label"),
                output_path=train_data_dir
            )
            SparkDatasetConverter.spark_to_pytorch(
                test_df.select("features", "label"),
                output_path=test_data_dir
            )

        # ステップ5: PyTorch Dataset/DataLoader作成
        with self.profiler.profile("5. PyTorch DataLoader Setup"):
            train_dataset = DistributedDataset(train_data_dir)
            test_dataset = DistributedDataset(test_data_dir)

            train_loader = DataLoader(
                train_dataset,
                batch_size=training_config['batch_size'],
                shuffle=True,
                num_workers=4,
                pin_memory=True
            )
            test_loader = DataLoader(
                test_dataset,
                batch_size=training_config['batch_size'],
                num_workers=4
            )

        # ステップ6: モデル訓練
        with self.profiler.profile("6. Model Training"):
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            model = model_class(**model_params).to(device)

            optimizer = torch.optim.Adam(
                model.parameters(),
                lr=training_config['learning_rate']
            )
            criterion = nn.CrossEntropyLoss()

            # 訓練ループ
            for epoch in range(training_config['num_epochs']):
                model.train()
                train_loss = 0.0

                for inputs, labels in train_loader:
                    inputs, labels = inputs.to(device), labels.to(device)

                    optimizer.zero_grad()
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    loss.backward()
                    optimizer.step()

                    train_loss += loss.item()

                if (epoch + 1) % 10 == 0:
                    print(f"Epoch {epoch+1}: Loss = {train_loss/len(train_loader):.4f}")

        # ステップ7: モデル評価
        with self.profiler.profile("7. Model Evaluation"):
            model.eval()
            correct = 0
            total = 0

            with torch.no_grad():
                for inputs, labels in test_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    _, predicted = outputs.max(1)
                    total += labels.size(0)
                    correct += predicted.eq(labels).sum().item()

            accuracy = correct / total
            print(f"\nTest Accuracy: {accuracy:.4f}")

        # ステップ8: モデル保存
        with self.profiler.profile("8. Model Saving"):
            model_path = f"{output_dir}/model.pt"
            torch.save({
                'model_state_dict': model.state_dict(),
                'model_params': model_params,
                'accuracy': accuracy
            }, model_path)
            print(f"Model saved to {model_path}")

        # プロファイリング結果
        self.profiler.print_summary()
        self.profiler.export_json(f"{output_dir}/profiling.json")

        return model, accuracy


# 使用例
if __name__ == "__main__":
    # パイプライン実行
    pipeline = EndToEndMLPipeline(
        spark_master="spark://master:7077",
        app_name="CustomerChurnPrediction"
    )

    # モデル定義
    class ChurnPredictor(nn.Module):
        def __init__(self, input_dim, hidden_dims, output_dim):
            super().__init__()
            layers = []
            prev_dim = input_dim
            for hidden_dim in hidden_dims:
                layers.extend([
                    nn.Linear(prev_dim, hidden_dim),
                    nn.ReLU(),
                    nn.Dropout(0.3)
                ])
                prev_dim = hidden_dim
            layers.append(nn.Linear(prev_dim, output_dim))
            self.network = nn.Sequential(*layers)

        def forward(self, x):
            return self.network(x)

    # パイプライン実行
    model, accuracy = pipeline.run_pipeline(
        data_path="s3://my-bucket/customer-data/",
        model_class=ChurnPredictor,
        model_params={
            'input_dim': 50,
            'hidden_dims': [256, 128, 64],
            'output_dim': 2
        },
        training_config={
            'batch_size': 512,
            'learning_rate': 0.001,
            'num_epochs': 50
        },
        output_dir="./churn_prediction_output"
    )

監視とメンテナンス

コード例8: リアルタイム監視システム

"""
MLパイプラインの監視とアラートシステム
"""
import time
import threading
from dataclasses import dataclass
from typing import Dict, List, Callable, Optional
from datetime import datetime
import json


@dataclass
class Metric:
    """メトリクス"""
    name: str
    value: float
    timestamp: datetime
    tags: Dict[str, str] = None


class MetricsCollector:
    """メトリクス収集"""

    def __init__(self):
        self.metrics: List[Metric] = []
        self.lock = threading.Lock()

    def record(self, name: str, value: float, tags: Dict[str, str] = None):
        """メトリクス記録"""
        with self.lock:
            metric = Metric(
                name=name,
                value=value,
                timestamp=datetime.now(),
                tags=tags or {}
            )
            self.metrics.append(metric)

    def get_latest(self, name: str, n: int = 1) -> List[Metric]:
        """最新のメトリクス取得"""
        with self.lock:
            filtered = [m for m in self.metrics if m.name == name]
            return sorted(filtered, key=lambda m: m.timestamp, reverse=True)[:n]

    def get_average(self, name: str, window_seconds: int = 60) -> Optional[float]:
        """期間内の平均値"""
        now = datetime.now()
        with self.lock:
            recent = [
                m for m in self.metrics
                if m.name == name and (now - m.timestamp).total_seconds() <= window_seconds
            ]
            if not recent:
                return None
            return sum(m.value for m in recent) / len(recent)


class AlertRule:
    """アラートルール"""

    def __init__(
        self,
        name: str,
        metric_name: str,
        threshold: float,
        comparison: str = "greater",  # greater, less, equal
        window_seconds: int = 60,
        callback: Callable = None
    ):
        self.name = name
        self.metric_name = metric_name
        self.threshold = threshold
        self.comparison = comparison
        self.window_seconds = window_seconds
        self.callback = callback or self.default_callback

    def check(self, collector: MetricsCollector) -> bool:
        """ルールチェック"""
        avg_value = collector.get_average(self.metric_name, self.window_seconds)

        if avg_value is None:
            return False

        if self.comparison == "greater":
            triggered = avg_value > self.threshold
        elif self.comparison == "less":
            triggered = avg_value < self.threshold
        else:  # equal
            triggered = abs(avg_value - self.threshold) < 0.0001

        if triggered:
            self.callback(self.name, self.metric_name, avg_value, self.threshold)

        return triggered

    def default_callback(self, rule_name, metric_name, value, threshold):
        """デフォルトのアラートコールバック"""
        print(f"\n⚠️  ALERT: {rule_name}")
        print(f"   Metric: {metric_name} = {value:.4f}")
        print(f"   Threshold: {threshold:.4f}")
        print(f"   Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


class MonitoringSystem:
    """統合監視システム"""

    def __init__(self, check_interval: int = 10):
        self.collector = MetricsCollector()
        self.alert_rules: List[AlertRule] = []
        self.check_interval = check_interval
        self.running = False
        self.monitor_thread = None

    def add_alert_rule(self, rule: AlertRule):
        """アラートルール追加"""
        self.alert_rules.append(rule)

    def start(self):
        """監視開始"""
        self.running = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        print("Monitoring system started")

    def stop(self):
        """監視停止"""
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join()
        print("Monitoring system stopped")

    def _monitor_loop(self):
        """監視ループ"""
        while self.running:
            for rule in self.alert_rules:
                rule.check(self.collector)
            time.sleep(self.check_interval)

    def export_metrics(self, output_path: str):
        """メトリクスエクスポート"""
        with self.collector.lock:
            data = [
                {
                    'name': m.name,
                    'value': m.value,
                    'timestamp': m.timestamp.isoformat(),
                    'tags': m.tags
                }
                for m in self.collector.metrics
            ]

        with open(output_path, 'w') as f:
            json.dump(data, f, indent=2)


# 使用例
if __name__ == "__main__":
    # 監視システム初期化
    monitor = MonitoringSystem(check_interval=5)

    # アラートルール設定
    monitor.add_alert_rule(AlertRule(
        name="High Training Loss",
        metric_name="train_loss",
        threshold=0.5,
        comparison="greater",
        window_seconds=30
    ))

    monitor.add_alert_rule(AlertRule(
        name="Low Validation Accuracy",
        metric_name="val_accuracy",
        threshold=0.80,
        comparison="less",
        window_seconds=60
    ))

    monitor.add_alert_rule(AlertRule(
        name="High GPU Memory Usage",
        metric_name="gpu_memory_percent",
        threshold=90.0,
        comparison="greater",
        window_seconds=30
    ))

    # 監視開始
    monitor.start()

    # シミュレーション: メトリクス記録
    try:
        for i in range(50):
            # 訓練メトリクス(徐々に改善)
            train_loss = 1.0 / (i + 1) + 0.1
            val_acc = min(0.95, 0.5 + i * 0.01)
            gpu_mem = 70 + (i % 5) * 5

            monitor.collector.record("train_loss", train_loss)
            monitor.collector.record("val_accuracy", val_acc)
            monitor.collector.record("gpu_memory_percent", gpu_mem)

            # 異常値を時々注入
            if i == 20:
                monitor.collector.record("train_loss", 0.8)  # アラート発火
            if i == 30:
                monitor.collector.record("val_accuracy", 0.75)  # アラート発火
            if i == 40:
                monitor.collector.record("gpu_memory_percent", 95.0)  # アラート発火

            time.sleep(1)

    except KeyboardInterrupt:
        pass
    finally:
        # 監視停止とメトリクスエクスポート
        monitor.stop()
        monitor.export_metrics("monitoring_metrics.json")
        print("\nMetrics exported to monitoring_metrics.json")
✅ 本番環境の監視項目:

練習問題

問題1: パイプライン設計

問題: 1日あたり10TBの新規データが生成される推薦システムを設計してください。以下の要件を満たすパイプラインアーキテクチャを提案してください:

ヒント:

問題2: 分散訓練の最適化

問題: 8ノード x 4 GPU (合計32 GPU) のクラスタで画像分類モデルを訓練します。以下の最適化を実装してください:

期待される改善:

問題3: コスト最適化

問題: 月間のクラウドコストが$50,000に達しているMLパイプラインがあります。コストを30%削減しつつ、モデル性能を維持する戦略を提案してください。

考慮すべき要素:

問題4: データドリフト検出

問題: 本番環境でデータドリフトを自動検出するシステムを実装してください。以下を含めてください:

実装例:

class DataDriftDetector:
    def detect_drift(self, reference_data, current_data):
        # KS検定実装
        pass

    def visualize_distributions(self, feature_name):
        # 分布比較プロット
        pass

    def trigger_retraining(self, drift_score, threshold=0.05):
        # 再訓練トリガー
        pass

問題5: エンドツーエンドパイプライン実装

問題: 以下の要件を満たす完全なMLパイプラインを実装してください:

データ: Kaggleの "Credit Card Fraud Detection" データセット(284,807件)

要件:

評価基準:


まとめ

この章では、大規模機械学習パイプラインの設計と実装について学びました:

トピック 主要な学習内容 実践スキル
パイプライン設計 スケーラビリティ、フォールトトレランス、監視 設定管理、エラーハンドリング、チェックポイント
データ処理 Spark ETL、データ検証、特徴量エンジニアリング 分散データ変換、品質チェック、最適化
分散訓練 DDP、Ray Tune、ハイパーパラメータ最適化 マルチノード訓練、効率的なHPO
パフォーマンス最適化 プロファイリング、I/O最適化、GPU活用 ボトルネック特定、混合精度訓練
本番運用 監視、アラート、コスト最適化 メトリクス収集、異常検知、自動化

次のステップ

💡 実務での応用:

大規模MLパイプラインは、推薦システム、不正検知、需要予測など、ビジネスクリティカルなアプリケーションで使用されます。本章で学んだ技術は、データサイエンティストからMLエンジニアへのキャリアパスにおいて重要なスキルです。


参考文献

書籍

論文

公式ドキュメント

オンラインリソース

ツール・フレームワーク