🌐 JP | 🇬🇧 EN | Last sync: 2025-11-16

第1章:大規模データ処理の基礎

スケーラビリティと分散処理の原理を理解する

📖 読了時間: 25-30分 📊 難易度: 初級〜中級 💻 コード例: 7個 📝 演習問題: 5問

学習目標

この章を読むことで、以下を習得できます:


1.1 スケーラビリティの課題

データサイズの増大

現代の機械学習プロジェクトでは、データ量が爆発的に増加しています。

「単一マシンでは処理しきれないデータ量に直面することは、もはや例外ではなく標準となっています。」

データ規模 サイズの目安 処理方法
小規模 〜1GB 単一マシンのメモリ内処理
中規模 1GB〜100GB メモリ最適化、チャンク処理
大規模 100GB〜1TB 分散処理、並列化
超大規模 1TB以上 クラスタ、分散ファイルシステム

メモリ制約

最も一般的な問題は、データがメモリに収まらないことです。

import numpy as np
import sys

# メモリ使用量の確認
def memory_usage_mb(data):
    """データのメモリ使用量をMBで返す"""
    return sys.getsizeof(data) / (1024 ** 2)

# 大規模データの例
n_samples = 10_000_000  # 1000万サンプル
n_features = 100

# 通常の配列(全データをメモリに読み込み)
# これは約7.5GBのメモリを消費
# data = np.random.random((n_samples, n_features))  # メモリ不足の可能性

# より小さいデータで確認
n_samples_small = 1_000_000  # 100万サンプル
data_small = np.random.random((n_samples_small, n_features))

print("=== メモリ使用量の確認 ===")
print(f"データ形状: {data_small.shape}")
print(f"メモリ使用量: {memory_usage_mb(data_small):.2f} MB")
print(f"\n推定: {n_samples:,}サンプルの場合")
print(f"メモリ使用量: {memory_usage_mb(data_small) * 10:.2f} MB ({memory_usage_mb(data_small) * 10 / 1024:.2f} GB)")

出力

=== メモリ使用量の確認 ===
データ形状: (1000000, 100)
メモリ使用量: 762.94 MB

推定: 10,000,000サンプルの場合
メモリ使用量: 7629.40 MB (7.45 GB)

計算時間の問題

データサイズが増加すると、計算時間が非線形に増加します。

import time
import numpy as np
import matplotlib.pyplot as plt

# 異なるサイズでの計算時間を測定
sizes = [1000, 5000, 10000, 50000, 100000]
times = []

print("=== 計算時間の測定 ===")
for size in sizes:
    X = np.random.random((size, 100))

    start = time.time()
    # 簡単な行列演算
    result = X @ X.T
    elapsed = time.time() - start

    times.append(elapsed)
    print(f"サイズ {size:6d}: {elapsed:.4f}秒")

# 可視化
plt.figure(figsize=(10, 6))
plt.plot(sizes, times, marker='o', linewidth=2, markersize=8)
plt.xlabel('データサイズ(サンプル数)', fontsize=12)
plt.ylabel('計算時間(秒)', fontsize=12)
plt.title('データサイズと計算時間の関係', fontsize=14)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 時間複雑度の推定
print(f"\n10倍のサイズ増加による時間増加率: {times[-1] / times[0]:.1f}x")

I/Oボトルネック

ディスクI/Oは、大規模データ処理における主要なボトルネックです。

ストレージ種類 読み取り速度 相対性能
メモリ(RAM) 〜50 GB/s 1,000倍
SSD 〜500 MB/s 10倍
HDD 〜100 MB/s 1倍(基準)
ネットワーク(1Gbps) 〜125 MB/s 1.25倍

1.2 分散処理の概念

水平スケーリング vs 垂直スケーリング

スケーラビリティを実現するアプローチには2種類あります。

種類 説明 長所 短所
垂直スケーリング
(Scale Up)
単一マシンの性能向上
(CPU、メモリ、ストレージ増強)
・シンプルな実装
・通信オーバーヘッドなし
・物理的限界あり
・コストが非線形に増加
水平スケーリング
(Scale Out)
複数マシンで分散処理
(ノード数を増やす)
・理論上無限に拡張可能
・耐障害性向上
・実装が複雑
・通信コスト発生

実務での選択: 通常、垂直スケーリングを限界まで行い、その後に水平スケーリングに移行します。

マスター・ワーカーアーキテクチャ

分散処理の最も一般的なパターンは、マスター・ワーカー(Master-Worker)アーキテクチャです。

graph TD M[マスターノード
タスク分配・結果集約] --> W1[ワーカー 1
部分計算] M --> W2[ワーカー 2
部分計算] M --> W3[ワーカー 3
部分計算] M --> W4[ワーカー 4
部分計算] W1 --> R[結果統合] W2 --> R W3 --> R W4 --> R style M fill:#9d4edd style W1 fill:#e3f2fd style W2 fill:#e3f2fd style W3 fill:#e3f2fd style W4 fill:#e3f2fd style R fill:#c8e6c9

役割分担

データ分割とシャーディング

シャーディング(Sharding)は、データを複数のパーティションに分割する技術です。

分割戦略

戦略 説明 使用例
水平分割 行単位で分割 時系列データ、ユーザーデータ
垂直分割 列単位で分割 特徴量が多い場合
ハッシュベース キーのハッシュ値で分割 均等な分散が必要
範囲ベース 値の範囲で分割 ソート済みデータ
import numpy as np
import pandas as pd

# サンプルデータ
n_samples = 1000
data = pd.DataFrame({
    'user_id': np.random.randint(1, 100, n_samples),
    'timestamp': pd.date_range('2024-01-01', periods=n_samples, freq='1min'),
    'value': np.random.random(n_samples)
})

# 1. 水平分割(行単位)
n_partitions = 4
partition_size = len(data) // n_partitions

horizontal_shards = []
for i in range(n_partitions):
    start = i * partition_size
    end = start + partition_size if i < n_partitions - 1 else len(data)
    shard = data.iloc[start:end]
    horizontal_shards.append(shard)
    print(f"シャード {i+1}: {len(shard)}行")

# 2. ハッシュベース分割
def hash_partition(user_id, n_partitions):
    return hash(user_id) % n_partitions

data['partition'] = data['user_id'].apply(lambda x: hash_partition(x, n_partitions))

hash_shards = []
for i in range(n_partitions):
    shard = data[data['partition'] == i]
    hash_shards.append(shard)
    print(f"ハッシュシャード {i+1}: {len(shard)}行")

# 分散の確認
print("\n=== 分割バランスの確認 ===")
hash_sizes = [len(shard) for shard in hash_shards]
print(f"最小: {min(hash_sizes)}, 最大: {max(hash_sizes)}, 平均: {np.mean(hash_sizes):.1f}")

分散処理アーキテクチャの図解

graph LR subgraph "入力データ" D[大規模データセット
1TB] end subgraph "分散ストレージ" S1[シャード 1
250GB] S2[シャード 2
250GB] S3[シャード 3
250GB] S4[シャード 4
250GB] end subgraph "並列処理" P1[プロセス 1] P2[プロセス 2] P3[プロセス 3] P4[プロセス 4] end subgraph "結果集約" A[集約・統合] end D --> S1 D --> S2 D --> S3 D --> S4 S1 --> P1 S2 --> P2 S3 --> P3 S4 --> P4 P1 --> A P2 --> A P3 --> A P4 --> A style D fill:#ffebee style S1 fill:#e3f2fd style S2 fill:#e3f2fd style S3 fill:#e3f2fd style S4 fill:#e3f2fd style P1 fill:#fff3e0 style P2 fill:#fff3e0 style P3 fill:#fff3e0 style P4 fill:#fff3e0 style A fill:#c8e6c9

1.3 並列化戦略

データ並列化

データ並列化(Data Parallelism)は、データを分割し、各パーティションで同じ処理を並列実行します。

最も一般的で実装しやすい並列化手法です。

import numpy as np
import multiprocessing as mp
import time

# 処理関数
def process_chunk(data_chunk):
    """データチャンクに対する処理(例:平均計算)"""
    return np.mean(data_chunk, axis=0)

# シングルプロセス版
def single_process_compute(data):
    start = time.time()
    result = np.mean(data, axis=0)
    elapsed = time.time() - start
    return result, elapsed

# マルチプロセス版(データ並列化)
def multi_process_compute(data, n_workers=4):
    start = time.time()

    # データを分割
    chunks = np.array_split(data, n_workers)

    # 並列処理
    with mp.Pool(n_workers) as pool:
        results = pool.map(process_chunk, chunks)

    # 結果を統合
    final_result = np.mean(results, axis=0)
    elapsed = time.time() - start

    return final_result, elapsed

# テスト
if __name__ == '__main__':
    # サンプルデータ
    data = np.random.random((10_000_000, 10))

    print("=== データ並列化の比較 ===")
    print(f"データサイズ: {data.shape}")

    # シングルプロセス
    result_single, time_single = single_process_compute(data)
    print(f"\nシングルプロセス: {time_single:.4f}秒")

    # マルチプロセス
    n_workers = mp.cpu_count()
    result_multi, time_multi = multi_process_compute(data, n_workers)
    print(f"マルチプロセス ({n_workers}ワーカー): {time_multi:.4f}秒")

    # スピードアップ
    speedup = time_single / time_multi
    print(f"\nスピードアップ: {speedup:.2f}x")
    print(f"効率: {speedup / n_workers * 100:.1f}%")

モデル並列化

モデル並列化(Model Parallelism)は、モデル自体を分割して複数デバイスに配置します。

大規模なニューラルネットワークで使用されます:

import numpy as np

# 概念的な例:大規模モデルの分割
class DistributedModel:
    """モデル並列化の概念実装"""

    def __init__(self, layer_sizes):
        self.layers = []
        for i in range(len(layer_sizes) - 1):
            # 各層を異なるデバイス(ここでは配列)に配置
            weight = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
            self.layers.append({
                'weight': weight,
                'device': f'GPU_{i % 4}'  # 4つのGPUに分散
            })

    def forward(self, x):
        """順伝播(各層は異なるデバイスで実行)"""
        activation = x
        for i, layer in enumerate(self.layers):
            print(f"層 {i+1} を {layer['device']} で実行")
            activation = np.dot(activation, layer['weight'])
            activation = np.maximum(0, activation)  # ReLU
        return activation

# 使用例
print("=== モデル並列化の例 ===")
layer_sizes = [1000, 2000, 2000, 2000, 100]  # 大規模モデル
model = DistributedModel(layer_sizes)

# 入力データ
x = np.random.randn(1, 1000)
output = model.forward(x)

print(f"\n入力形状: {x.shape}")
print(f"出力形状: {output.shape}")
print(f"総パラメータ数: {sum(layer['weight'].size for layer in model.layers):,}")

パイプライン並列化

パイプライン並列化(Pipeline Parallelism)は、処理を複数のステージに分割し、各ステージを並列実行します。

import multiprocessing as mp
from queue import Queue
import time

# パイプラインのステージ
def stage1_preprocess(input_queue, output_queue):
    """ステージ1: 前処理"""
    while True:
        item = input_queue.get()
        if item is None:
            output_queue.put(None)
            break
        # 前処理(例:正規化)
        processed = item / 255.0
        output_queue.put(processed)

def stage2_feature_extract(input_queue, output_queue):
    """ステージ2: 特徴抽出"""
    while True:
        item = input_queue.get()
        if item is None:
            output_queue.put(None)
            break
        # 特徴抽出(例:統計量計算)
        features = [item.mean(), item.std(), item.max(), item.min()]
        output_queue.put(features)

def stage3_predict(input_queue, results):
    """ステージ3: 予測"""
    while True:
        item = input_queue.get()
        if item is None:
            break
        # 予測(簡易版)
        prediction = sum(item) > 2.0
        results.append(prediction)

# パイプライン並列化の実装例(概念)
print("=== パイプライン並列化の概念 ===")
print("ステージ1: 前処理 → ステージ2: 特徴抽出 → ステージ3: 予測")
print("\n各ステージが異なるプロセスで並列実行されることで、")
print("スループットが向上します。")

並列化戦略の比較

戦略 適用場面 長所 短所
データ並列化 大規模データ、同一処理 実装が簡単、スケーラブル 通信コスト、メモリ複製
モデル並列化 大規模モデル、GPU制約 メモリ制約を回避 実装が複雑、デバイス間通信
パイプライン並列化 多段階処理、ETL スループット向上 レイテンシ増加、バランス調整

1.4 分散システムの課題

通信コスト

分散処理における最大のオーバーヘッドは、ノード間の通信です。

Amdahlの法則: 並列化できない部分(通信など)が全体の性能を制限します。

$$ \text{Speedup} = \frac{1}{(1-P) + \frac{P}{N}} $$

import numpy as np
import matplotlib.pyplot as plt

# Amdahlの法則の可視化
def amdahl_speedup(P, N):
    """Amdahlの法則によるスピードアップ計算"""
    return 1 / ((1 - P) + P / N)

# 並列化率の異なるケース
P_values = [0.5, 0.75, 0.9, 0.95, 0.99]
N_range = np.arange(1, 65)

plt.figure(figsize=(10, 6))
for P in P_values:
    speedups = [amdahl_speedup(P, N) for N in N_range]
    plt.plot(N_range, speedups, label=f'P = {P:.0%}', linewidth=2)

plt.xlabel('プロセッサ数', fontsize=12)
plt.ylabel('スピードアップ', fontsize=12)
plt.title('Amdahlの法則:並列化率と性能', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print("=== Amdahlの法則の示唆 ===")
print("並列化率が低いと、プロセッサを増やしても性能向上は限定的")
print(f"例: P=90%, 64プロセッサで最大{amdahl_speedup(0.9, 64):.1f}倍")

同期 vs 非同期

方式 説明 長所 短所
同期処理 全ワーカーが完了を待つ 実装がシンプル、一貫性保証 最も遅いワーカーに依存
非同期処理 完了を待たずに次の処理 スループット向上 一貫性の管理が複雑

耐障害性

分散システムでは、ノードやネットワークの障害が避けられません。

障害対策の手法

デバッグの困難さ

分散システムのデバッグは、以下の理由で困難です:


1.5 ツールとエコシステム

Apache Hadoop / Spark

Apache HadoopApache Sparkは、大規模データ処理のデファクトスタンダードです。

ツール 特徴 使用例
Hadoop ・MapReduceベース
・ディスク中心の処理
・バッチ処理向き
大規模ETL、ログ解析
Spark ・インメモリ処理
・高速(Hadoopの100倍)
・機械学習ライブラリ(MLlib)
反復計算、機械学習
# Apache Spark の概念的な使用例(PySpark)
# 注: 実行にはSparkのインストールが必要

"""
from pyspark.sql import SparkSession

# Sparkセッション作成
spark = SparkSession.builder \
    .appName("LargeScaleProcessing") \
    .getOrCreate()

# 大規模データの読み込み
df = spark.read.parquet("hdfs://path/to/large/data")

# 分散処理
result = df.groupBy("category") \
    .agg({"value": "mean"}) \
    .orderBy("category")

# 結果の保存
result.write.parquet("hdfs://path/to/output")

spark.stop()
"""

print("=== Apache Spark の特徴 ===")
print("1. 遅延評価(Lazy Evaluation): 最適な実行計画を自動生成")
print("2. インメモリ処理: 中間結果をメモリにキャッシュ")
print("3. 耐障害性: RDD(Resilient Distributed Dataset)による自動復旧")
print("4. 統合API: SQL、機械学習、グラフ処理を統一的に扱える")

Dask

Daskは、Pythonネイティブの並列処理ライブラリです。

import numpy as np

# Daskの概念的な使用例
"""
import dask.array as da
import dask.dataframe as dd

# Dask配列(NumPyライクなAPI)
x = da.random.random((100000, 10000), chunks=(1000, 1000))
result = x.mean(axis=0).compute()  # 遅延評価 → 実行

# Dask DataFrame(pandasライクなAPI)
df = dd.read_csv('large_file_*.csv')
result = df.groupby('category').value.mean().compute()
"""

print("=== Dask の特徴 ===")
print("1. NumPy/pandas互換API: 既存コードの移行が容易")
print("2. タスクグラフ: 処理の依存関係を自動管理")
print("3. スケーラブル: 単一マシン → クラスタへシームレスに拡張")
print("4. Pythonエコシステムとの統合: scikit-learn、XGBoostなど")

# 簡単なDask風の処理例(概念)
print("\n=== チャンク処理の例 ===")
data = np.random.random((10000, 100))
chunk_size = 1000
results = []

for i in range(0, len(data), chunk_size):
    chunk = data[i:i+chunk_size]
    result = np.mean(chunk, axis=0)
    results.append(result)

final_result = np.mean(results, axis=0)
print(f"チャンク数: {len(results)}")
print(f"最終結果形状: {final_result.shape}")

Ray

Rayは、分散アプリケーション用の統合フレームワークです。

# Rayの概念的な使用例
"""
import ray

# Ray初期化
ray.init()

# リモート関数
@ray.remote
def process_data(data):
    return data.sum()

# 並列実行
data_chunks = [np.random.random(1000) for _ in range(10)]
futures = [process_data.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)  # 結果を取得

ray.shutdown()
"""

print("=== Ray の特徴 ===")
print("1. 低レベル制御: タスク・アクターモデルで柔軟な並列化")
print("2. 高性能: 分散スケジューリングと共有メモリ")
print("3. エコシステム: Ray Tune(ハイパーパラメータ調整)、RLlib(強化学習)")
print("4. 使いやすさ: Pythonデコレータで簡単に並列化")

選択基準

状況 推奨ツール 理由
大規模バッチ処理(TB級) Apache Spark 成熟したエコシステム、耐障害性
Python中心の開発 Dask NumPy/pandas互換、学習コスト低
複雑な分散アプリ Ray 柔軟な制御、高性能
単一マシン高速化 multiprocessing, joblib シンプル、追加インストール不要
機械学習パイプライン Spark MLlib, Ray Tune 統合された機械学習ツール

1.6 本章のまとめ

学んだこと

  1. スケーラビリティの課題

    • データサイズ、メモリ、計算時間、I/Oの制約
    • 単一マシンの限界と分散処理の必要性
  2. 分散処理の概念

    • 水平スケーリング vs 垂直スケーリング
    • マスター・ワーカーアーキテクチャ
    • データ分割とシャーディング戦略
  3. 並列化戦略

    • データ並列化:最も一般的、実装が容易
    • モデル並列化:大規模モデル向け
    • パイプライン並列化:多段階処理に有効
  4. 分散システムの課題

    • 通信コストとAmdahlの法則
    • 同期 vs 非同期のトレードオフ
    • 耐障害性とデバッグの困難さ
  5. ツールとエコシステム

    • Hadoop/Spark: 大規模バッチ処理
    • Dask: Python中心の並列化
    • Ray: 柔軟な分散アプリケーション

重要な原則

原則 説明
最適化の順序 まずアルゴリズム、次に実装、最後に並列化
通信最小化 ノード間通信を減らすことが性能向上の鍵
適切な粒度 タスク分割が細かすぎるとオーバーヘッド増
測定駆動 推測せず、実際の性能を測定して判断
段階的スケーリング 小規模で検証してから大規模化

次の章へ

第2章では、MapReduceとSpark基礎を学びます:


演習問題

問題1(難易度:easy)

水平スケーリングと垂直スケーリングの違いを説明し、それぞれの長所と短所を述べてください。

解答例

解答

垂直スケーリング(Scale Up)

水平スケーリング(Scale Out)

実務での選択:通常、垂直スケーリングを限界まで行い、その後に水平スケーリングに移行するのが一般的です。

問題2(難易度:medium)

Amdahlの法則を用いて、並列化率が80%のプログラムを16プロセッサで実行した場合のスピードアップを計算してください。

解答例
def amdahl_speedup(P, N):
    """
    Amdahlの法則によるスピードアップ計算

    Parameters:
    P: 並列化可能な部分の割合(0〜1)
    N: プロセッサ数

    Returns:
    スピードアップ倍率
    """
    return 1 / ((1 - P) + P / N)

# 問題の計算
P = 0.8  # 80%
N = 16   # 16プロセッサ

speedup = amdahl_speedup(P, N)

print("=== Amdahlの法則による計算 ===")
print(f"並列化率: {P:.0%}")
print(f"プロセッサ数: {N}")
print(f"スピードアップ: {speedup:.2f}x")
print(f"\n解説:")
print(f"理論上の最大スピードアップ(無限プロセッサ): {1/(1-P):.2f}x")
print(f"効率: {speedup/N*100:.1f}%")

出力

=== Amdahlの法則による計算 ===
並列化率: 80%
プロセッサ数: 16
スピードアップ: 4.21x

解説:
理論上の最大スピードアップ(無限プロセッサ): 5.00x
効率: 26.3%

計算式

$$ \text{Speedup} = \frac{1}{(1-0.8) + \frac{0.8}{16}} = \frac{1}{0.2 + 0.05} = \frac{1}{0.25} = 4 $$

解説

問題3(難易度:medium)

以下のコードを、multiprocessingを使ってデータ並列化してください。

import numpy as np

# 処理対象のデータ
data = np.random.random((1_000_000, 10))

# 各行の統計量を計算
result = []
for row in data:
    stats = {
        'mean': row.mean(),
        'std': row.std(),
        'max': row.max(),
        'min': row.min()
    }
    result.append(stats)
解答例
import numpy as np
import multiprocessing as mp
import time

# 処理関数
def compute_stats(chunk):
    """データチャンクの統計量を計算"""
    result = []
    for row in chunk:
        stats = {
            'mean': row.mean(),
            'std': row.std(),
            'max': row.max(),
            'min': row.min()
        }
        result.append(stats)
    return result

# シングルプロセス版
def single_process_version(data):
    start = time.time()
    result = []
    for row in data:
        stats = {
            'mean': row.mean(),
            'std': row.std(),
            'max': row.max(),
            'min': row.min()
        }
        result.append(stats)
    elapsed = time.time() - start
    return result, elapsed

# マルチプロセス版(データ並列化)
def multi_process_version(data, n_workers=4):
    start = time.time()

    # データをチャンクに分割
    chunks = np.array_split(data, n_workers)

    # 並列処理
    with mp.Pool(n_workers) as pool:
        results = pool.map(compute_stats, chunks)

    # 結果を統合
    final_result = []
    for chunk_result in results:
        final_result.extend(chunk_result)

    elapsed = time.time() - start
    return final_result, elapsed

if __name__ == '__main__':
    # テストデータ
    data = np.random.random((100_000, 10))  # サイズを調整

    print("=== データ並列化の実装 ===")
    print(f"データ形状: {data.shape}")

    # シングルプロセス
    result_single, time_single = single_process_version(data)
    print(f"\nシングルプロセス: {time_single:.4f}秒")

    # マルチプロセス
    n_workers = mp.cpu_count()
    result_multi, time_multi = multi_process_version(data, n_workers)
    print(f"マルチプロセス ({n_workers}ワーカー): {time_multi:.4f}秒")

    # 検証
    assert len(result_single) == len(result_multi), "結果の長さが異なります"
    print(f"\nスピードアップ: {time_single / time_multi:.2f}x")
    print(f"✓ 並列化成功")

ポイント

問題4(難易度:hard)

1000万行のデータセットを処理する場合、メモリ制約を考慮したチャンク処理を実装してください。各チャンクは100万行とし、結果を集約してください。

解答例
import numpy as np
import time

def process_chunk(chunk):
    """チャンクごとの処理(例:平均、標準偏差、合計)"""
    stats = {
        'mean': chunk.mean(axis=0),
        'std': chunk.std(axis=0),
        'sum': chunk.sum(axis=0),
        'count': len(chunk)
    }
    return stats

def aggregate_results(chunk_results):
    """チャンク結果を最終的に集約"""
    # 全体の合計を計算
    total_sum = sum(r['sum'] for r in chunk_results)
    total_count = sum(r['count'] for r in chunk_results)

    # 全体の平均
    global_mean = total_sum / total_count

    # 全体の標準偏差(加重平均)
    # より正確には分散の加重平均の平方根
    weighted_var = sum(
        r['count'] * (r['std']**2 + (r['mean'] - global_mean)**2)
        for r in chunk_results
    ) / total_count
    global_std = np.sqrt(weighted_var)

    return {
        'global_mean': global_mean,
        'global_std': global_std,
        'total_count': total_count
    }

def chunked_processing(n_samples=10_000_000, n_features=10, chunk_size=1_000_000):
    """メモリ効率的なチャンク処理"""
    print(f"=== チャンク処理 ===")
    print(f"総サンプル数: {n_samples:,}")
    print(f"チャンクサイズ: {chunk_size:,}")
    print(f"チャンク数: {n_samples // chunk_size}")

    start = time.time()
    chunk_results = []

    # チャンクごとに処理
    for i in range(0, n_samples, chunk_size):
        # チャンクのサイズを決定
        current_chunk_size = min(chunk_size, n_samples - i)

        # チャンクデータを生成(実際にはファイルから読み込み)
        chunk = np.random.random((current_chunk_size, n_features))

        # チャンク処理
        result = process_chunk(chunk)
        chunk_results.append(result)

        print(f"チャンク {len(chunk_results)}: {current_chunk_size:,}行処理完了")

    # 結果を集約
    final_result = aggregate_results(chunk_results)

    elapsed = time.time() - start

    print(f"\n=== 処理結果 ===")
    print(f"処理時間: {elapsed:.2f}秒")
    print(f"全体の平均(最初の3次元): {final_result['global_mean'][:3]}")
    print(f"全体の標準偏差(最初の3次元): {final_result['global_std'][:3]}")
    print(f"総サンプル数: {final_result['total_count']:,}")

    # メモリ効率の確認
    import sys
    chunk_memory = sys.getsizeof(np.random.random((chunk_size, n_features))) / (1024**2)
    print(f"\nチャンクあたりのメモリ使用量: {chunk_memory:.2f} MB")
    print(f"(全データを一度に読み込む場合の1/{n_samples//chunk_size}のメモリで処理)")

if __name__ == '__main__':
    # 実行(実際のサイズだと時間がかかるため、縮小版)
    chunked_processing(n_samples=1_000_000, n_features=10, chunk_size=100_000)

出力例

=== チャンク処理 ===
総サンプル数: 1,000,000
チャンクサイズ: 100,000
チャンク数: 10
チャンク 1: 100,000行処理完了
チャンク 2: 100,000行処理完了
...
チャンク 10: 100,000行処理完了

=== 処理結果 ===
処理時間: 2.45秒
全体の平均(最初の3次元): [0.500 0.499 0.501]
全体の標準偏差(最初の3次元): [0.289 0.288 0.290]
総サンプル数: 1,000,000

チャンクあたりのメモリ使用量: 7.63 MB
(全データを一度に読み込む場合の1/10のメモリで処理)

ポイント

問題5(難易度:hard)

データ並列化、モデル並列化、パイプライン並列化の3つの戦略について、それぞれの適用場面と、組み合わせて使う場合の考慮点を説明してください。

解答例

解答

1. データ並列化(Data Parallelism)

適用場面

2. モデル並列化(Model Parallelism)

適用場面

3. パイプライン並列化(Pipeline Parallelism)

適用場面

組み合わせ使用の考慮点

1. データ並列化 + モデル並列化

"""
超大規模モデルの訓練

例: GPT-3の訓練
- モデル並列化: 各層を複数GPUに分割
- データ並列化: 異なるミニバッチを複数のモデルレプリカで処理
- 結果: 数千GPUで効率的に訓練可能
"""

考慮点

2. データ並列化 + パイプライン並列化

"""
大規模ETLと機械学習パイプライン

例: ストリーミング予測システム
- パイプライン: データ取得 → 前処理 → 推論 → 後処理
- データ並列: 各ステージで複数ワーカーが並列実行
- 結果: 高スループットの予測システム
"""

考慮点

3. 3つ全ての組み合わせ

"""
超大規模分散訓練システム

例: 大規模推薦システム
- データ並列: 異なるユーザーセグメントを並列処理
- モデル並列: 埋め込み層を複数GPUに分散
- パイプライン: 特徴抽出 → モデル訓練 → 評価のステージ化
"""

考慮点

選択ガイドライン

ボトルネック 推奨戦略 優先順位
データサイズが大きい データ並列化 1st
モデルサイズが大きい モデル並列化 1st
処理ステージが多い パイプライン並列化 2nd
両方大きい データ+モデル並列化 段階的
リアルタイム要件 パイプライン並列化 1st

実装の原則

  1. まず単一戦略で最適化
  2. 測定して次のボトルネックを特定
  3. 必要に応じて追加の戦略を導入
  4. 常にシステム全体の効率を測定

参考文献

  1. Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified data processing on large clusters. Communications of the ACM, 51(1), 107-113.
  2. Zaharia, M., et al. (2016). Apache Spark: A unified engine for big data processing. Communications of the ACM, 59(11), 56-65.
  3. Moritz, P., et al. (2018). Ray: A distributed framework for emerging AI applications. OSDI, 561-577.
  4. Rocklin, M. (2015). Dask: Parallel computation with blocked algorithms and task scheduling. SciPy, 126-132.
  5. Barroso, L. A., Clidaras, J., & Hölzle, U. (2013). The datacenter as a computer (2nd ed.). Morgan & Claypool Publishers.

免責事項