学習目標
この章を読むことで、以下を習得できます:
- ✅ 大規模データ処理におけるスケーラビリティの課題を理解する
- ✅ 分散処理の基本概念とアーキテクチャを把握する
- ✅ 並列化戦略の種類と使い分けを学ぶ
- ✅ 分散システムの課題と解決策を知る
- ✅ 主要な分散処理ツールとエコシステムを理解する
- ✅ 実際のコードで並列化を実装できる
1.1 スケーラビリティの課題
データサイズの増大
現代の機械学習プロジェクトでは、データ量が爆発的に増加しています。
「単一マシンでは処理しきれないデータ量に直面することは、もはや例外ではなく標準となっています。」
| データ規模 | サイズの目安 | 処理方法 |
|---|---|---|
| 小規模 | 〜1GB | 単一マシンのメモリ内処理 |
| 中規模 | 1GB〜100GB | メモリ最適化、チャンク処理 |
| 大規模 | 100GB〜1TB | 分散処理、並列化 |
| 超大規模 | 1TB以上 | クラスタ、分散ファイルシステム |
メモリ制約
最も一般的な問題は、データがメモリに収まらないことです。
import numpy as np
import sys
# メモリ使用量の確認
def memory_usage_mb(data):
"""データのメモリ使用量をMBで返す"""
return sys.getsizeof(data) / (1024 ** 2)
# 大規模データの例
n_samples = 10_000_000 # 1000万サンプル
n_features = 100
# 通常の配列(全データをメモリに読み込み)
# これは約7.5GBのメモリを消費
# data = np.random.random((n_samples, n_features)) # メモリ不足の可能性
# より小さいデータで確認
n_samples_small = 1_000_000 # 100万サンプル
data_small = np.random.random((n_samples_small, n_features))
print("=== メモリ使用量の確認 ===")
print(f"データ形状: {data_small.shape}")
print(f"メモリ使用量: {memory_usage_mb(data_small):.2f} MB")
print(f"\n推定: {n_samples:,}サンプルの場合")
print(f"メモリ使用量: {memory_usage_mb(data_small) * 10:.2f} MB ({memory_usage_mb(data_small) * 10 / 1024:.2f} GB)")
出力:
=== メモリ使用量の確認 ===
データ形状: (1000000, 100)
メモリ使用量: 762.94 MB
推定: 10,000,000サンプルの場合
メモリ使用量: 7629.40 MB (7.45 GB)
計算時間の問題
データサイズが増加すると、計算時間が非線形に増加します。
import time
import numpy as np
import matplotlib.pyplot as plt
# 異なるサイズでの計算時間を測定
sizes = [1000, 5000, 10000, 50000, 100000]
times = []
print("=== 計算時間の測定 ===")
for size in sizes:
X = np.random.random((size, 100))
start = time.time()
# 簡単な行列演算
result = X @ X.T
elapsed = time.time() - start
times.append(elapsed)
print(f"サイズ {size:6d}: {elapsed:.4f}秒")
# 可視化
plt.figure(figsize=(10, 6))
plt.plot(sizes, times, marker='o', linewidth=2, markersize=8)
plt.xlabel('データサイズ(サンプル数)', fontsize=12)
plt.ylabel('計算時間(秒)', fontsize=12)
plt.title('データサイズと計算時間の関係', fontsize=14)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 時間複雑度の推定
print(f"\n10倍のサイズ増加による時間増加率: {times[-1] / times[0]:.1f}x")
I/Oボトルネック
ディスクI/Oは、大規模データ処理における主要なボトルネックです。
| ストレージ種類 | 読み取り速度 | 相対性能 |
|---|---|---|
| メモリ(RAM) | 〜50 GB/s | 1,000倍 |
| SSD | 〜500 MB/s | 10倍 |
| HDD | 〜100 MB/s | 1倍(基準) |
| ネットワーク(1Gbps) | 〜125 MB/s | 1.25倍 |
1.2 分散処理の概念
水平スケーリング vs 垂直スケーリング
スケーラビリティを実現するアプローチには2種類あります。
| 種類 | 説明 | 長所 | 短所 |
|---|---|---|---|
| 垂直スケーリング (Scale Up) |
単一マシンの性能向上 (CPU、メモリ、ストレージ増強) |
・シンプルな実装 ・通信オーバーヘッドなし |
・物理的限界あり ・コストが非線形に増加 |
| 水平スケーリング (Scale Out) |
複数マシンで分散処理 (ノード数を増やす) |
・理論上無限に拡張可能 ・耐障害性向上 |
・実装が複雑 ・通信コスト発生 |
実務での選択: 通常、垂直スケーリングを限界まで行い、その後に水平スケーリングに移行します。
マスター・ワーカーアーキテクチャ
分散処理の最も一般的なパターンは、マスター・ワーカー(Master-Worker)アーキテクチャです。
タスク分配・結果集約] --> W1[ワーカー 1
部分計算] M --> W2[ワーカー 2
部分計算] M --> W3[ワーカー 3
部分計算] M --> W4[ワーカー 4
部分計算] W1 --> R[結果統合] W2 --> R W3 --> R W4 --> R style M fill:#9d4edd style W1 fill:#e3f2fd style W2 fill:#e3f2fd style W3 fill:#e3f2fd style W4 fill:#e3f2fd style R fill:#c8e6c9
役割分担
- マスターノード:
- タスクの分割と割り当て
- ワーカーの監視
- 結果の集約
- 障害の検出と回復
- ワーカーノード:
- 割り当てられたタスクの実行
- 結果の返送
- ステータスの報告
データ分割とシャーディング
シャーディング(Sharding)は、データを複数のパーティションに分割する技術です。
分割戦略
| 戦略 | 説明 | 使用例 |
|---|---|---|
| 水平分割 | 行単位で分割 | 時系列データ、ユーザーデータ |
| 垂直分割 | 列単位で分割 | 特徴量が多い場合 |
| ハッシュベース | キーのハッシュ値で分割 | 均等な分散が必要 |
| 範囲ベース | 値の範囲で分割 | ソート済みデータ |
import numpy as np
import pandas as pd
# サンプルデータ
n_samples = 1000
data = pd.DataFrame({
'user_id': np.random.randint(1, 100, n_samples),
'timestamp': pd.date_range('2024-01-01', periods=n_samples, freq='1min'),
'value': np.random.random(n_samples)
})
# 1. 水平分割(行単位)
n_partitions = 4
partition_size = len(data) // n_partitions
horizontal_shards = []
for i in range(n_partitions):
start = i * partition_size
end = start + partition_size if i < n_partitions - 1 else len(data)
shard = data.iloc[start:end]
horizontal_shards.append(shard)
print(f"シャード {i+1}: {len(shard)}行")
# 2. ハッシュベース分割
def hash_partition(user_id, n_partitions):
return hash(user_id) % n_partitions
data['partition'] = data['user_id'].apply(lambda x: hash_partition(x, n_partitions))
hash_shards = []
for i in range(n_partitions):
shard = data[data['partition'] == i]
hash_shards.append(shard)
print(f"ハッシュシャード {i+1}: {len(shard)}行")
# 分散の確認
print("\n=== 分割バランスの確認 ===")
hash_sizes = [len(shard) for shard in hash_shards]
print(f"最小: {min(hash_sizes)}, 最大: {max(hash_sizes)}, 平均: {np.mean(hash_sizes):.1f}")
分散処理アーキテクチャの図解
1TB] end subgraph "分散ストレージ" S1[シャード 1
250GB] S2[シャード 2
250GB] S3[シャード 3
250GB] S4[シャード 4
250GB] end subgraph "並列処理" P1[プロセス 1] P2[プロセス 2] P3[プロセス 3] P4[プロセス 4] end subgraph "結果集約" A[集約・統合] end D --> S1 D --> S2 D --> S3 D --> S4 S1 --> P1 S2 --> P2 S3 --> P3 S4 --> P4 P1 --> A P2 --> A P3 --> A P4 --> A style D fill:#ffebee style S1 fill:#e3f2fd style S2 fill:#e3f2fd style S3 fill:#e3f2fd style S4 fill:#e3f2fd style P1 fill:#fff3e0 style P2 fill:#fff3e0 style P3 fill:#fff3e0 style P4 fill:#fff3e0 style A fill:#c8e6c9
1.3 並列化戦略
データ並列化
データ並列化(Data Parallelism)は、データを分割し、各パーティションで同じ処理を並列実行します。
最も一般的で実装しやすい並列化手法です。
import numpy as np
import multiprocessing as mp
import time
# 処理関数
def process_chunk(data_chunk):
"""データチャンクに対する処理(例:平均計算)"""
return np.mean(data_chunk, axis=0)
# シングルプロセス版
def single_process_compute(data):
start = time.time()
result = np.mean(data, axis=0)
elapsed = time.time() - start
return result, elapsed
# マルチプロセス版(データ並列化)
def multi_process_compute(data, n_workers=4):
start = time.time()
# データを分割
chunks = np.array_split(data, n_workers)
# 並列処理
with mp.Pool(n_workers) as pool:
results = pool.map(process_chunk, chunks)
# 結果を統合
final_result = np.mean(results, axis=0)
elapsed = time.time() - start
return final_result, elapsed
# テスト
if __name__ == '__main__':
# サンプルデータ
data = np.random.random((10_000_000, 10))
print("=== データ並列化の比較 ===")
print(f"データサイズ: {data.shape}")
# シングルプロセス
result_single, time_single = single_process_compute(data)
print(f"\nシングルプロセス: {time_single:.4f}秒")
# マルチプロセス
n_workers = mp.cpu_count()
result_multi, time_multi = multi_process_compute(data, n_workers)
print(f"マルチプロセス ({n_workers}ワーカー): {time_multi:.4f}秒")
# スピードアップ
speedup = time_single / time_multi
print(f"\nスピードアップ: {speedup:.2f}x")
print(f"効率: {speedup / n_workers * 100:.1f}%")
モデル並列化
モデル並列化(Model Parallelism)は、モデル自体を分割して複数デバイスに配置します。
大規模なニューラルネットワークで使用されます:
- 各層を異なるGPUに配置
- モデルパラメータが単一デバイスのメモリに収まらない場合
import numpy as np
# 概念的な例:大規模モデルの分割
class DistributedModel:
"""モデル並列化の概念実装"""
def __init__(self, layer_sizes):
self.layers = []
for i in range(len(layer_sizes) - 1):
# 各層を異なるデバイス(ここでは配列)に配置
weight = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
self.layers.append({
'weight': weight,
'device': f'GPU_{i % 4}' # 4つのGPUに分散
})
def forward(self, x):
"""順伝播(各層は異なるデバイスで実行)"""
activation = x
for i, layer in enumerate(self.layers):
print(f"層 {i+1} を {layer['device']} で実行")
activation = np.dot(activation, layer['weight'])
activation = np.maximum(0, activation) # ReLU
return activation
# 使用例
print("=== モデル並列化の例 ===")
layer_sizes = [1000, 2000, 2000, 2000, 100] # 大規模モデル
model = DistributedModel(layer_sizes)
# 入力データ
x = np.random.randn(1, 1000)
output = model.forward(x)
print(f"\n入力形状: {x.shape}")
print(f"出力形状: {output.shape}")
print(f"総パラメータ数: {sum(layer['weight'].size for layer in model.layers):,}")
パイプライン並列化
パイプライン並列化(Pipeline Parallelism)は、処理を複数のステージに分割し、各ステージを並列実行します。
import multiprocessing as mp
from queue import Queue
import time
# パイプラインのステージ
def stage1_preprocess(input_queue, output_queue):
"""ステージ1: 前処理"""
while True:
item = input_queue.get()
if item is None:
output_queue.put(None)
break
# 前処理(例:正規化)
processed = item / 255.0
output_queue.put(processed)
def stage2_feature_extract(input_queue, output_queue):
"""ステージ2: 特徴抽出"""
while True:
item = input_queue.get()
if item is None:
output_queue.put(None)
break
# 特徴抽出(例:統計量計算)
features = [item.mean(), item.std(), item.max(), item.min()]
output_queue.put(features)
def stage3_predict(input_queue, results):
"""ステージ3: 予測"""
while True:
item = input_queue.get()
if item is None:
break
# 予測(簡易版)
prediction = sum(item) > 2.0
results.append(prediction)
# パイプライン並列化の実装例(概念)
print("=== パイプライン並列化の概念 ===")
print("ステージ1: 前処理 → ステージ2: 特徴抽出 → ステージ3: 予測")
print("\n各ステージが異なるプロセスで並列実行されることで、")
print("スループットが向上します。")
並列化戦略の比較
| 戦略 | 適用場面 | 長所 | 短所 |
|---|---|---|---|
| データ並列化 | 大規模データ、同一処理 | 実装が簡単、スケーラブル | 通信コスト、メモリ複製 |
| モデル並列化 | 大規模モデル、GPU制約 | メモリ制約を回避 | 実装が複雑、デバイス間通信 |
| パイプライン並列化 | 多段階処理、ETL | スループット向上 | レイテンシ増加、バランス調整 |
1.4 分散システムの課題
通信コスト
分散処理における最大のオーバーヘッドは、ノード間の通信です。
Amdahlの法則: 並列化できない部分(通信など)が全体の性能を制限します。
$$ \text{Speedup} = \frac{1}{(1-P) + \frac{P}{N}} $$
- $P$: 並列化可能な部分の割合
- $N$: プロセッサ数
import numpy as np
import matplotlib.pyplot as plt
# Amdahlの法則の可視化
def amdahl_speedup(P, N):
"""Amdahlの法則によるスピードアップ計算"""
return 1 / ((1 - P) + P / N)
# 並列化率の異なるケース
P_values = [0.5, 0.75, 0.9, 0.95, 0.99]
N_range = np.arange(1, 65)
plt.figure(figsize=(10, 6))
for P in P_values:
speedups = [amdahl_speedup(P, N) for N in N_range]
plt.plot(N_range, speedups, label=f'P = {P:.0%}', linewidth=2)
plt.xlabel('プロセッサ数', fontsize=12)
plt.ylabel('スピードアップ', fontsize=12)
plt.title('Amdahlの法則:並列化率と性能', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("=== Amdahlの法則の示唆 ===")
print("並列化率が低いと、プロセッサを増やしても性能向上は限定的")
print(f"例: P=90%, 64プロセッサで最大{amdahl_speedup(0.9, 64):.1f}倍")
同期 vs 非同期
| 方式 | 説明 | 長所 | 短所 |
|---|---|---|---|
| 同期処理 | 全ワーカーが完了を待つ | 実装がシンプル、一貫性保証 | 最も遅いワーカーに依存 |
| 非同期処理 | 完了を待たずに次の処理 | スループット向上 | 一貫性の管理が複雑 |
耐障害性
分散システムでは、ノードやネットワークの障害が避けられません。
障害対策の手法
- チェックポイント: 定期的に状態を保存
- レプリケーション: データを複数ノードに複製
- リトライ: 失敗したタスクを再実行
- 冗長性: 複数ノードで同じ処理を実行
デバッグの困難さ
分散システムのデバッグは、以下の理由で困難です:
- 非決定的な実行順序
- タイミングに依存するバグ
- 複数ノードにまたがるログ
- 再現が困難
1.5 ツールとエコシステム
Apache Hadoop / Spark
Apache HadoopとApache Sparkは、大規模データ処理のデファクトスタンダードです。
| ツール | 特徴 | 使用例 |
|---|---|---|
| Hadoop | ・MapReduceベース ・ディスク中心の処理 ・バッチ処理向き |
大規模ETL、ログ解析 |
| Spark | ・インメモリ処理 ・高速(Hadoopの100倍) ・機械学習ライブラリ(MLlib) |
反復計算、機械学習 |
# Apache Spark の概念的な使用例(PySpark)
# 注: 実行にはSparkのインストールが必要
"""
from pyspark.sql import SparkSession
# Sparkセッション作成
spark = SparkSession.builder \
.appName("LargeScaleProcessing") \
.getOrCreate()
# 大規模データの読み込み
df = spark.read.parquet("hdfs://path/to/large/data")
# 分散処理
result = df.groupBy("category") \
.agg({"value": "mean"}) \
.orderBy("category")
# 結果の保存
result.write.parquet("hdfs://path/to/output")
spark.stop()
"""
print("=== Apache Spark の特徴 ===")
print("1. 遅延評価(Lazy Evaluation): 最適な実行計画を自動生成")
print("2. インメモリ処理: 中間結果をメモリにキャッシュ")
print("3. 耐障害性: RDD(Resilient Distributed Dataset)による自動復旧")
print("4. 統合API: SQL、機械学習、グラフ処理を統一的に扱える")
Dask
Daskは、Pythonネイティブの並列処理ライブラリです。
import numpy as np
# Daskの概念的な使用例
"""
import dask.array as da
import dask.dataframe as dd
# Dask配列(NumPyライクなAPI)
x = da.random.random((100000, 10000), chunks=(1000, 1000))
result = x.mean(axis=0).compute() # 遅延評価 → 実行
# Dask DataFrame(pandasライクなAPI)
df = dd.read_csv('large_file_*.csv')
result = df.groupby('category').value.mean().compute()
"""
print("=== Dask の特徴 ===")
print("1. NumPy/pandas互換API: 既存コードの移行が容易")
print("2. タスクグラフ: 処理の依存関係を自動管理")
print("3. スケーラブル: 単一マシン → クラスタへシームレスに拡張")
print("4. Pythonエコシステムとの統合: scikit-learn、XGBoostなど")
# 簡単なDask風の処理例(概念)
print("\n=== チャンク処理の例 ===")
data = np.random.random((10000, 100))
chunk_size = 1000
results = []
for i in range(0, len(data), chunk_size):
chunk = data[i:i+chunk_size]
result = np.mean(chunk, axis=0)
results.append(result)
final_result = np.mean(results, axis=0)
print(f"チャンク数: {len(results)}")
print(f"最終結果形状: {final_result.shape}")
Ray
Rayは、分散アプリケーション用の統合フレームワークです。
# Rayの概念的な使用例
"""
import ray
# Ray初期化
ray.init()
# リモート関数
@ray.remote
def process_data(data):
return data.sum()
# 並列実行
data_chunks = [np.random.random(1000) for _ in range(10)]
futures = [process_data.remote(chunk) for chunk in data_chunks]
results = ray.get(futures) # 結果を取得
ray.shutdown()
"""
print("=== Ray の特徴 ===")
print("1. 低レベル制御: タスク・アクターモデルで柔軟な並列化")
print("2. 高性能: 分散スケジューリングと共有メモリ")
print("3. エコシステム: Ray Tune(ハイパーパラメータ調整)、RLlib(強化学習)")
print("4. 使いやすさ: Pythonデコレータで簡単に並列化")
選択基準
| 状況 | 推奨ツール | 理由 |
|---|---|---|
| 大規模バッチ処理(TB級) | Apache Spark | 成熟したエコシステム、耐障害性 |
| Python中心の開発 | Dask | NumPy/pandas互換、学習コスト低 |
| 複雑な分散アプリ | Ray | 柔軟な制御、高性能 |
| 単一マシン高速化 | multiprocessing, joblib | シンプル、追加インストール不要 |
| 機械学習パイプライン | Spark MLlib, Ray Tune | 統合された機械学習ツール |
1.6 本章のまとめ
学んだこと
スケーラビリティの課題
- データサイズ、メモリ、計算時間、I/Oの制約
- 単一マシンの限界と分散処理の必要性
分散処理の概念
- 水平スケーリング vs 垂直スケーリング
- マスター・ワーカーアーキテクチャ
- データ分割とシャーディング戦略
並列化戦略
- データ並列化:最も一般的、実装が容易
- モデル並列化:大規模モデル向け
- パイプライン並列化:多段階処理に有効
分散システムの課題
- 通信コストとAmdahlの法則
- 同期 vs 非同期のトレードオフ
- 耐障害性とデバッグの困難さ
ツールとエコシステム
- Hadoop/Spark: 大規模バッチ処理
- Dask: Python中心の並列化
- Ray: 柔軟な分散アプリケーション
重要な原則
| 原則 | 説明 |
|---|---|
| 最適化の順序 | まずアルゴリズム、次に実装、最後に並列化 |
| 通信最小化 | ノード間通信を減らすことが性能向上の鍵 |
| 適切な粒度 | タスク分割が細かすぎるとオーバーヘッド増 |
| 測定駆動 | 推測せず、実際の性能を測定して判断 |
| 段階的スケーリング | 小規模で検証してから大規模化 |
次の章へ
第2章では、MapReduceとSpark基礎を学びます:
- MapReduceプログラミングモデル
- Apache Sparkの基本操作
- RDDとDataFrame
- 実践的な分散処理パイプライン
演習問題
問題1(難易度:easy)
水平スケーリングと垂直スケーリングの違いを説明し、それぞれの長所と短所を述べてください。
解答例
解答:
垂直スケーリング(Scale Up):
- 定義: 単一マシンの性能を向上(CPU、メモリ、ストレージの増強)
- 長所:
- 実装がシンプル(既存コードをそのまま使用可能)
- ノード間通信のオーバーヘッドがない
- データの一貫性が保ちやすい
- 短所:
- 物理的な限界がある
- コストが非線形に増加(高性能ハードウェアは割高)
- 単一障害点(SPOF)のリスク
水平スケーリング(Scale Out):
- 定義: マシン(ノード)の数を増やして分散処理
- 長所:
- 理論上無限に拡張可能
- コストが線形(通常のサーバーを追加)
- 耐障害性が高い(ノードの冗長化)
- 短所:
- 実装が複雑(分散処理のロジック必要)
- ノード間通信のコスト発生
- データの一貫性管理が難しい
実務での選択:通常、垂直スケーリングを限界まで行い、その後に水平スケーリングに移行するのが一般的です。
問題2(難易度:medium)
Amdahlの法則を用いて、並列化率が80%のプログラムを16プロセッサで実行した場合のスピードアップを計算してください。
解答例
def amdahl_speedup(P, N):
"""
Amdahlの法則によるスピードアップ計算
Parameters:
P: 並列化可能な部分の割合(0〜1)
N: プロセッサ数
Returns:
スピードアップ倍率
"""
return 1 / ((1 - P) + P / N)
# 問題の計算
P = 0.8 # 80%
N = 16 # 16プロセッサ
speedup = amdahl_speedup(P, N)
print("=== Amdahlの法則による計算 ===")
print(f"並列化率: {P:.0%}")
print(f"プロセッサ数: {N}")
print(f"スピードアップ: {speedup:.2f}x")
print(f"\n解説:")
print(f"理論上の最大スピードアップ(無限プロセッサ): {1/(1-P):.2f}x")
print(f"効率: {speedup/N*100:.1f}%")
出力:
=== Amdahlの法則による計算 ===
並列化率: 80%
プロセッサ数: 16
スピードアップ: 4.21x
解説:
理論上の最大スピードアップ(無限プロセッサ): 5.00x
効率: 26.3%
計算式:
$$ \text{Speedup} = \frac{1}{(1-0.8) + \frac{0.8}{16}} = \frac{1}{0.2 + 0.05} = \frac{1}{0.25} = 4 $$
解説:
- 16プロセッサでも4.21倍のスピードアップにとどまる
- 並列化できない20%の部分が性能のボトルネックになる
- プロセッサを増やしても5倍以上のスピードアップは不可能
問題3(難易度:medium)
以下のコードを、multiprocessingを使ってデータ並列化してください。
import numpy as np
# 処理対象のデータ
data = np.random.random((1_000_000, 10))
# 各行の統計量を計算
result = []
for row in data:
stats = {
'mean': row.mean(),
'std': row.std(),
'max': row.max(),
'min': row.min()
}
result.append(stats)
解答例
import numpy as np
import multiprocessing as mp
import time
# 処理関数
def compute_stats(chunk):
"""データチャンクの統計量を計算"""
result = []
for row in chunk:
stats = {
'mean': row.mean(),
'std': row.std(),
'max': row.max(),
'min': row.min()
}
result.append(stats)
return result
# シングルプロセス版
def single_process_version(data):
start = time.time()
result = []
for row in data:
stats = {
'mean': row.mean(),
'std': row.std(),
'max': row.max(),
'min': row.min()
}
result.append(stats)
elapsed = time.time() - start
return result, elapsed
# マルチプロセス版(データ並列化)
def multi_process_version(data, n_workers=4):
start = time.time()
# データをチャンクに分割
chunks = np.array_split(data, n_workers)
# 並列処理
with mp.Pool(n_workers) as pool:
results = pool.map(compute_stats, chunks)
# 結果を統合
final_result = []
for chunk_result in results:
final_result.extend(chunk_result)
elapsed = time.time() - start
return final_result, elapsed
if __name__ == '__main__':
# テストデータ
data = np.random.random((100_000, 10)) # サイズを調整
print("=== データ並列化の実装 ===")
print(f"データ形状: {data.shape}")
# シングルプロセス
result_single, time_single = single_process_version(data)
print(f"\nシングルプロセス: {time_single:.4f}秒")
# マルチプロセス
n_workers = mp.cpu_count()
result_multi, time_multi = multi_process_version(data, n_workers)
print(f"マルチプロセス ({n_workers}ワーカー): {time_multi:.4f}秒")
# 検証
assert len(result_single) == len(result_multi), "結果の長さが異なります"
print(f"\nスピードアップ: {time_single / time_multi:.2f}x")
print(f"✓ 並列化成功")
ポイント:
np.array_split()でデータを均等に分割multiprocessing.Poolでワーカープールを作成pool.map()で各チャンクを並列処理- 結果を
extend()で統合
問題4(難易度:hard)
1000万行のデータセットを処理する場合、メモリ制約を考慮したチャンク処理を実装してください。各チャンクは100万行とし、結果を集約してください。
解答例
import numpy as np
import time
def process_chunk(chunk):
"""チャンクごとの処理(例:平均、標準偏差、合計)"""
stats = {
'mean': chunk.mean(axis=0),
'std': chunk.std(axis=0),
'sum': chunk.sum(axis=0),
'count': len(chunk)
}
return stats
def aggregate_results(chunk_results):
"""チャンク結果を最終的に集約"""
# 全体の合計を計算
total_sum = sum(r['sum'] for r in chunk_results)
total_count = sum(r['count'] for r in chunk_results)
# 全体の平均
global_mean = total_sum / total_count
# 全体の標準偏差(加重平均)
# より正確には分散の加重平均の平方根
weighted_var = sum(
r['count'] * (r['std']**2 + (r['mean'] - global_mean)**2)
for r in chunk_results
) / total_count
global_std = np.sqrt(weighted_var)
return {
'global_mean': global_mean,
'global_std': global_std,
'total_count': total_count
}
def chunked_processing(n_samples=10_000_000, n_features=10, chunk_size=1_000_000):
"""メモリ効率的なチャンク処理"""
print(f"=== チャンク処理 ===")
print(f"総サンプル数: {n_samples:,}")
print(f"チャンクサイズ: {chunk_size:,}")
print(f"チャンク数: {n_samples // chunk_size}")
start = time.time()
chunk_results = []
# チャンクごとに処理
for i in range(0, n_samples, chunk_size):
# チャンクのサイズを決定
current_chunk_size = min(chunk_size, n_samples - i)
# チャンクデータを生成(実際にはファイルから読み込み)
chunk = np.random.random((current_chunk_size, n_features))
# チャンク処理
result = process_chunk(chunk)
chunk_results.append(result)
print(f"チャンク {len(chunk_results)}: {current_chunk_size:,}行処理完了")
# 結果を集約
final_result = aggregate_results(chunk_results)
elapsed = time.time() - start
print(f"\n=== 処理結果 ===")
print(f"処理時間: {elapsed:.2f}秒")
print(f"全体の平均(最初の3次元): {final_result['global_mean'][:3]}")
print(f"全体の標準偏差(最初の3次元): {final_result['global_std'][:3]}")
print(f"総サンプル数: {final_result['total_count']:,}")
# メモリ効率の確認
import sys
chunk_memory = sys.getsizeof(np.random.random((chunk_size, n_features))) / (1024**2)
print(f"\nチャンクあたりのメモリ使用量: {chunk_memory:.2f} MB")
print(f"(全データを一度に読み込む場合の1/{n_samples//chunk_size}のメモリで処理)")
if __name__ == '__main__':
# 実行(実際のサイズだと時間がかかるため、縮小版)
chunked_processing(n_samples=1_000_000, n_features=10, chunk_size=100_000)
出力例:
=== チャンク処理 ===
総サンプル数: 1,000,000
チャンクサイズ: 100,000
チャンク数: 10
チャンク 1: 100,000行処理完了
チャンク 2: 100,000行処理完了
...
チャンク 10: 100,000行処理完了
=== 処理結果 ===
処理時間: 2.45秒
全体の平均(最初の3次元): [0.500 0.499 0.501]
全体の標準偏差(最初の3次元): [0.289 0.288 0.290]
総サンプル数: 1,000,000
チャンクあたりのメモリ使用量: 7.63 MB
(全データを一度に読み込む場合の1/10のメモリで処理)
ポイント:
- チャンク単位で処理してメモリ使用量を制限
- 各チャンクの統計量を保存
- 最後に統計的に正しい方法で集約(加重平均)
- 実際にはファイルI/Oやデータベースクエリと組み合わせる
問題5(難易度:hard)
データ並列化、モデル並列化、パイプライン並列化の3つの戦略について、それぞれの適用場面と、組み合わせて使う場合の考慮点を説明してください。
解答例
解答:
1. データ並列化(Data Parallelism)
適用場面:
- 大規模データセット(TB級)の処理
- 各サンプルが独立に処理可能
- 同じモデルを異なるデータで訓練(ミニバッチ学習)
例:
- 画像分類の大規模データセット訓練
- ログデータの集約・分析
- バッチ予測処理
2. モデル並列化(Model Parallelism)
適用場面:
- モデルが単一デバイスのメモリに収まらない
- 大規模なニューラルネットワーク(数十億パラメータ)
- 計算グラフを分割できる場合
例:
- GPT-3のような大規模言語モデル
- 高解像度画像処理ネットワーク
- グラフニューラルネットワーク
3. パイプライン並列化(Pipeline Parallelism)
適用場面:
- 複数の処理ステージがある
- 各ステージが異なるリソース要件
- ストリーミングデータ処理
例:
- ETLパイプライン(抽出→変換→ロード)
- リアルタイムデータ処理(前処理→推論→後処理)
- 深層学習の層間パイプライン
組み合わせ使用の考慮点
1. データ並列化 + モデル並列化:
"""
超大規模モデルの訓練
例: GPT-3の訓練
- モデル並列化: 各層を複数GPUに分割
- データ並列化: 異なるミニバッチを複数のモデルレプリカで処理
- 結果: 数千GPUで効率的に訓練可能
"""
考慮点:
- 通信パターンの複雑化(層間通信 + レプリカ間勾配同期)
- メモリ管理の最適化(アクティベーション、勾配の保存場所)
- 負荷バランシング(データとモデルの両方)
2. データ並列化 + パイプライン並列化:
"""
大規模ETLと機械学習パイプライン
例: ストリーミング予測システム
- パイプライン: データ取得 → 前処理 → 推論 → 後処理
- データ並列: 各ステージで複数ワーカーが並列実行
- 結果: 高スループットの予測システム
"""
考慮点:
- ステージ間のバッファ管理
- 背圧制御(遅いステージが速いステージをブロック)
- エンドツーエンドのレイテンシ管理
3. 3つ全ての組み合わせ:
"""
超大規模分散訓練システム
例: 大規模推薦システム
- データ並列: 異なるユーザーセグメントを並列処理
- モデル並列: 埋め込み層を複数GPUに分散
- パイプライン: 特徴抽出 → モデル訓練 → 評価のステージ化
"""
考慮点:
- システム複雑性の管理(デバッグ、監視が困難)
- 全体的な効率の最適化(どの戦略が最もインパクトがあるか)
- 段階的な実装(まず単純な戦略から開始)
選択ガイドライン
| ボトルネック | 推奨戦略 | 優先順位 |
|---|---|---|
| データサイズが大きい | データ並列化 | 1st |
| モデルサイズが大きい | モデル並列化 | 1st |
| 処理ステージが多い | パイプライン並列化 | 2nd |
| 両方大きい | データ+モデル並列化 | 段階的 |
| リアルタイム要件 | パイプライン並列化 | 1st |
実装の原則:
- まず単一戦略で最適化
- 測定して次のボトルネックを特定
- 必要に応じて追加の戦略を導入
- 常にシステム全体の効率を測定
参考文献
- Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified data processing on large clusters. Communications of the ACM, 51(1), 107-113.
- Zaharia, M., et al. (2016). Apache Spark: A unified engine for big data processing. Communications of the ACM, 59(11), 56-65.
- Moritz, P., et al. (2018). Ray: A distributed framework for emerging AI applications. OSDI, 561-577.
- Rocklin, M. (2015). Dask: Parallel computation with blocked algorithms and task scheduling. SciPy, 126-132.
- Barroso, L. A., Clidaras, J., & Hölzle, U. (2013). The datacenter as a computer (2nd ed.). Morgan & Claypool Publishers.