第3章:Daskによる並列計算

Pythonでスケーラブルなデータ処理を実現する

📖 読了時間: 25-30分 📊 難易度: 中級 💻 コード例: 10個 📝 演習問題: 5問

学習目標

この章を読むことで、以下を習得できます:


3.1 Daskの概要

Daskとは

Daskは、Pythonネイティブな並列計算ライブラリで、NumPyやPandasのAPIと互換性を持ちながら、メモリに収まらないデータを処理できます。

「Dask = Pandas + 並列処理 + スケーラビリティ」- 既存のPythonコードを大規模データに拡張

Daskの主要な特徴

特徴 説明 利点
Pandas/NumPy互換 既存のAPIをそのまま利用 学習コストが低い
遅延評価 計算は必要になるまで実行されない 最適化の余地
分散処理 複数マシンで並列実行可能 スケーラビリティ
動的タスクスケジューリング 効率的なリソース利用 高速な処理

Daskのアーキテクチャ

graph TB A[Daskコレクション] --> B[タスクグラフ] B --> C[スケジューラ] C --> D[ワーカー1] C --> E[ワーカー2] C --> F[ワーカー3] D --> G[結果] E --> G F --> G style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#f3e5f5 style D fill:#e8f5e9 style E fill:#e8f5e9 style F fill:#e8f5e9 style G fill:#c8e6c9

インストールとセットアップ

import numpy as np
import pandas as pd
import dask
import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client
import matplotlib.pyplot as plt

# Daskバージョン確認
print(f"Dask version: {dask.__version__}")

# ローカルクラスタの起動
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
print(client)

# ダッシュボードURL(ブラウザで開ける)
print(f"\nDaskダッシュボード: {client.dashboard_link}")

出力

Dask version: 2023.10.1

<Client: 'tcp://127.0.0.1:8786' processes=4 threads=8, memory=8.00 GB>

Daskダッシュボード: http://127.0.0.1:8787/status

Pandas vs Dask DataFrame

import pandas as pd
import dask.dataframe as dd
import numpy as np

# Pandasデータフレーム
df_pandas = pd.DataFrame({
    'x': np.random.random(10000),
    'y': np.random.random(10000),
    'z': np.random.choice(['A', 'B', 'C'], 10000)
})

# Dask DataFrameに変換(4パーティションに分割)
df_dask = dd.from_pandas(df_pandas, npartitions=4)

print("=== Pandas DataFrame ===")
print(f"型: {type(df_pandas)}")
print(f"形状: {df_pandas.shape}")
print(f"メモリ使用量: {df_pandas.memory_usage(deep=True).sum() / 1024:.2f} KB")

print("\n=== Dask DataFrame ===")
print(f"型: {type(df_dask)}")
print(f"パーティション数: {df_dask.npartitions}")
print(f"列: {df_dask.columns.tolist()}")

# Daskは遅延評価:compute()で実際に実行
print("\n平均値の計算:")
print(f"Pandas: {df_pandas['x'].mean():.6f}")
print(f"Dask: {df_dask['x'].mean().compute():.6f}")

出力

=== Pandas DataFrame ===
型: <class 'pandas.core.frame.DataFrame'>
形状: (10000, 3)
メモリ使用量: 235.47 KB

=== Dask DataFrame ===
型: <class 'dask.dataframe.core.DataFrame'>
パーティション数: 4
列: ['x', 'y', 'z']

平均値の計算:
Pandas: 0.499845
Dask: 0.499845

タスクグラフの可視化

import dask.array as da

# 簡単な計算のタスクグラフを可視化
x = da.random.random((1000, 1000), chunks=(100, 100))
y = x + x.T
z = y.mean(axis=0)

# タスクグラフの表示
z.visualize(filename='dask_task_graph.png', optimize_graph=True)
print("タスクグラフを dask_task_graph.png に保存しました")

# タスク数の確認
print(f"\nタスク数: {len(z.__dask_graph__())}")
print(f"チャンク数: {x.npartitions}")

重要: Daskは計算を自動的に最適化し、並列実行可能なタスクを特定します。


3.2 Dask Arrays & DataFrames

Dask Array:大規模NumPy配列

Dask Arrayは、NumPy配列をチャンクに分割し、並列処理を可能にします。

基本的な操作

import dask.array as da
import numpy as np

# 大規模な配列の作成(メモリに収まらないサイズ)
# 10GB相当の配列(10000 x 10000 x 100の float64)
x = da.random.random((10000, 10000, 100), chunks=(1000, 1000, 10))

print("=== Dask Array ===")
print(f"形状: {x.shape}")
print(f"データ型: {x.dtype}")
print(f"チャンクサイズ: {x.chunks}")
print(f"チャンク数: {x.npartitions}")
print(f"推定サイズ: {x.nbytes / 1e9:.2f} GB")

# NumPy互換の操作
mean_value = x.mean()
std_value = x.std()
max_value = x.max()

# 遅延評価のため、まだ計算されていない
print(f"\n平均値(遅延): {mean_value}")

# compute()で実際に計算を実行
print(f"平均値(実行): {mean_value.compute():.6f}")
print(f"標準偏差: {std_value.compute():.6f}")
print(f"最大値: {max_value.compute():.6f}")

出力

=== Dask Array ===
形状: (10000, 10000, 100)
データ型: float64
チャンクサイズ: ((1000, 1000, ...), (1000, 1000, ...), (10, 10, ...))
チャンク数: 1000
推定サイズ: 80.00 GB

平均値(遅延): dask.array<mean_agg-aggregate, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>

平均値(実行): 0.500021
標準偏差: 0.288668
最大値: 0.999999

線形代数操作

import dask.array as da

# 大規模行列演算
A = da.random.random((5000, 5000), chunks=(1000, 1000))
B = da.random.random((5000, 5000), chunks=(1000, 1000))

# 行列積(並列計算)
C = da.matmul(A, B)

print("=== 行列演算 ===")
print(f"A形状: {A.shape}, チャンク数: {A.npartitions}")
print(f"B形状: {B.shape}, チャンク数: {B.npartitions}")
print(f"C形状: {C.shape}, チャンク数: {C.npartitions}")

# SVD(特異値分解)
U, s, V = da.linalg.svd_compressed(A, k=50)

print(f"\n特異値分解:")
print(f"U形状: {U.shape}")
print(f"特異値数: {len(s)}")
print(f"V形状: {V.shape}")

# 上位5つの特異値を計算
top_5_singular_values = s[:5].compute()
print(f"\n上位5特異値: {top_5_singular_values}")

Dask DataFrame:大規模データフレーム

CSVファイルの読み込み

import dask.dataframe as dd
import pandas as pd
import numpy as np

# サンプルCSVファイルの作成(大規模データをシミュレート)
for i in range(5):
    df = pd.DataFrame({
        'id': range(i * 1000000, (i + 1) * 1000000),
        'value': np.random.randn(1000000),
        'category': np.random.choice(['A', 'B', 'C', 'D'], 1000000),
        'timestamp': pd.date_range('2024-01-01', periods=1000000, freq='s')
    })
    df.to_csv(f'data_part_{i}.csv', index=False)

# Daskで複数CSVファイルを並列読み込み
ddf = dd.read_csv('data_part_*.csv', parse_dates=['timestamp'])

print("=== Dask DataFrame ===")
print(f"パーティション数: {ddf.npartitions}")
print(f"列: {ddf.columns.tolist()}")
print(f"推定行数: ~{len(ddf)}行")  # compute()なしで推定可能

# データ型の確認
print(f"\nデータ型:")
print(ddf.dtypes)

# 最初の数行を表示(一部のみ計算)
print(f"\n最初の5行:")
print(ddf.head())

DataFrame操作

import dask.dataframe as dd

# グループ集計
category_stats = ddf.groupby('category')['value'].agg(['mean', 'std', 'count'])

print("=== カテゴリ別統計(遅延評価)===")
print(category_stats)

# compute()で実行
print("\n=== カテゴリ別統計(実行結果)===")
result = category_stats.compute()
print(result)

# フィルタリングと変換
filtered = ddf[ddf['value'] > 0]
filtered['value_squared'] = filtered['value'] ** 2

# 時系列集計
daily_stats = ddf.set_index('timestamp').resample('D')['value'].mean()

print("\n=== 日次平均(最初の5日)===")
print(daily_stats.head())

# 複数の計算を一度に実行(効率的)
mean_val, std_val, filtered_count = dask.compute(
    ddf['value'].mean(),
    ddf['value'].std(),
    len(filtered)
)

print(f"\n全体統計:")
print(f"平均: {mean_val:.6f}")
print(f"標準偏差: {std_val:.6f}")
print(f"正の値の数: {filtered_count:,}")

パーティション最適化

import dask.dataframe as dd

# パーティションのリバランス
print(f"元のパーティション数: {ddf.npartitions}")

# パーティション数を調整(メモリとCPUのバランス)
ddf_optimized = ddf.repartition(npartitions=20)
print(f"最適化後のパーティション数: {ddf_optimized.npartitions}")

# インデックスによるパーティション分割
ddf_indexed = ddf.set_index('category', sorted=True)
print(f"\nインデックス設定後:")
print(f"パーティション数: {ddf_indexed.npartitions}")
print(f"既知のディビジョン: {ddf_indexed.known_divisions}")

# パーティションサイズの確認
partition_sizes = ddf.map_partitions(len).compute()
print(f"\n各パーティションのサイズ: {partition_sizes.tolist()[:10]}")  # 最初の10個

ベストプラクティス: パーティションサイズは100MB-1GB程度が理想的です。


3.3 Dask-ML:スケーラブル機械学習

Dask-MLとは

Dask-MLは、scikit-learnのAPIを拡張し、大規模データセットでの機械学習を可能にします。

データ前処理

import dask.dataframe as dd
import dask.array as da
from dask_ml.preprocessing import StandardScaler, LabelEncoder
from dask_ml.model_selection import train_test_split

# 大規模データセットの作成
ddf = dd.read_csv('data_part_*.csv', parse_dates=['timestamp'])

# 特徴量の抽出
ddf['hour'] = ddf['timestamp'].dt.hour
ddf['day_of_week'] = ddf['timestamp'].dt.dayofweek

# ラベルエンコーディング
le = LabelEncoder()
ddf['category_encoded'] = le.fit_transform(ddf['category'])

# 特徴量とターゲットの分離
X = ddf[['value', 'hour', 'day_of_week', 'category_encoded']].to_dask_array(lengths=True)
y = (ddf['value'] > 0).astype(int).to_dask_array(lengths=True)

print("=== 特徴量 ===")
print(f"X形状: {X.shape}")
print(f"y形状: {y.shape}")

# データ分割
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, shuffle=True
)

print(f"\n訓練データ: {X_train.shape[0].compute():,}行")
print(f"テストデータ: {X_test.shape[0].compute():,}行")

# 標準化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"\n標準化後のデータ型: {type(X_train_scaled)}")

増分学習(Incremental Learning)

from dask_ml.linear_model import LogisticRegression
from dask_ml.metrics import accuracy_score, log_loss

# ロジスティック回帰(増分学習)
clf = LogisticRegression(max_iter=100, solver='lbfgs', random_state=42)

# 並列学習
clf.fit(X_train_scaled, y_train)

# 予測
y_pred = clf.predict(X_test_scaled)
y_pred_proba = clf.predict_proba(X_test_scaled)

# 評価
accuracy = accuracy_score(y_test, y_pred)
loss = log_loss(y_test, y_pred_proba)

print("=== モデル性能 ===")
print(f"精度: {accuracy.compute():.4f}")
print(f"対数損失: {loss.compute():.4f}")

# 係数の確認
print(f"\nモデル係数: {clf.coef_}")
print(f"切片: {clf.intercept_}")

ハイパーパラメータチューニング

from dask_ml.model_selection import GridSearchCV
from dask_ml.linear_model import LogisticRegression
import numpy as np

# パラメータグリッド
param_grid = {
    'C': [0.001, 0.01, 0.1, 1, 10],
    'penalty': ['l1', 'l2'],
    'solver': ['saga']
}

# グリッドサーチ(並列実行)
clf = LogisticRegression(max_iter=100, random_state=42)
grid_search = GridSearchCV(
    clf,
    param_grid,
    cv=3,
    scoring='accuracy',
    n_jobs=-1
)

print("=== グリッドサーチ開始 ===")
print(f"パラメータ組み合わせ数: {len(param_grid['C']) * len(param_grid['penalty'])}")

# 学習(サンプリングしたデータで実行)
X_train_sample = X_train_scaled[:100000].compute()
y_train_sample = y_train[:100000].compute()

grid_search.fit(X_train_sample, y_train_sample)

print("\n=== グリッドサーチ結果 ===")
print(f"最良パラメータ: {grid_search.best_params_}")
print(f"最良スコア: {grid_search.best_score_:.4f}")

# 結果の詳細
results_df = pd.DataFrame(grid_search.cv_results_)
print("\nTop 3 パラメータ組み合わせ:")
print(results_df[['params', 'mean_test_score', 'std_test_score']].nlargest(3, 'mean_test_score'))

Random Forest with Dask-ML

from dask_ml.ensemble import RandomForestClassifier
from dask_ml.metrics import accuracy_score, classification_report

# ランダムフォレスト
rf_clf = RandomForestClassifier(
    n_estimators=10,
    max_depth=10,
    random_state=42,
    n_jobs=-1
)

print("=== ランダムフォレスト学習 ===")
rf_clf.fit(X_train_scaled, y_train)

# 予測
y_pred_rf = rf_clf.predict(X_test_scaled)

# 評価
accuracy_rf = accuracy_score(y_test, y_pred_rf)

print(f"精度: {accuracy_rf.compute():.4f}")

# 特徴量重要度
feature_importance = rf_clf.feature_importances_
feature_names = ['value', 'hour', 'day_of_week', 'category_encoded']

print("\n特徴量重要度:")
for name, importance in zip(feature_names, feature_importance):
    print(f"  {name}: {importance:.4f}")

パイプライン構築

from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline

# 前処理パイプライン
numeric_features = ['value', 'hour', 'day_of_week']
categorical_features = ['category_encoded']

preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(), categorical_features)
    ]
)

# 完全なパイプライン
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=100, random_state=42))
])

print("=== パイプライン ===")
print(pipeline)

# パイプラインの学習と評価
pipeline.fit(X_train, y_train)
y_pred_pipeline = pipeline.predict(X_test)
accuracy_pipeline = accuracy_score(y_test, y_pred_pipeline)

print(f"\nパイプライン精度: {accuracy_pipeline.compute():.4f}")

3.4 並列計算パターン

dask.delayed:関数の遅延実行

dask.delayedは、任意のPython関数を遅延評価に変換します。

import dask
from dask import delayed
import time

# 通常の関数
def process_data(x):
    time.sleep(1)  # 処理に時間がかかるシミュレーション
    return x ** 2

def aggregate(results):
    return sum(results)

# 逐次実行
print("=== 逐次実行 ===")
start = time.time()
results = []
for i in range(8):
    results.append(process_data(i))
total = aggregate(results)
print(f"結果: {total}")
print(f"実行時間: {time.time() - start:.2f}秒")

# 並列実行(dask.delayed)
print("\n=== 並列実行(dask.delayed)===")
start = time.time()
results_delayed = []
for i in range(8):
    result = delayed(process_data)(i)
    results_delayed.append(result)

total_delayed = delayed(aggregate)(results_delayed)
total = total_delayed.compute()

print(f"結果: {total}")
print(f"実行時間: {time.time() - start:.2f}秒")

# タスクグラフの可視化
total_delayed.visualize(filename='delayed_task_graph.png')
print("\nタスクグラフを delayed_task_graph.png に保存しました")

出力

=== 逐次実行 ===
結果: 140
実行時間: 8.02秒

=== 並列実行(dask.delayed)===
結果: 140
実行時間: 2.03秒

注目: 4ワーカーで並列実行したため、約4倍高速化されました。

dask.bag:非構造化データ処理

import dask.bag as db
import json

# JSONファイルの作成(ログデータをシミュレート)
logs = [
    {'timestamp': '2024-01-01 10:00:00', 'level': 'INFO', 'message': 'User login'},
    {'timestamp': '2024-01-01 10:01:00', 'level': 'ERROR', 'message': 'Connection failed'},
    {'timestamp': '2024-01-01 10:02:00', 'level': 'INFO', 'message': 'User logout'},
    {'timestamp': '2024-01-01 10:03:00', 'level': 'WARNING', 'message': 'Slow query'},
] * 1000

with open('logs.json', 'w') as f:
    for log in logs:
        f.write(json.dumps(log) + '\n')

# Dask Bagでの読み込み
bag = db.read_text('logs.json').map(json.loads)

print("=== Dask Bag ===")
print(f"パーティション数: {bag.npartitions}")

# 各ログレベルの集計
level_counts = bag.pluck('level').frequencies()
print(f"\nログレベル別カウント:")
print(level_counts.compute())

# エラーログのフィルタリング
errors = bag.filter(lambda x: x['level'] == 'ERROR')
print(f"\nエラーログ数: {errors.count().compute():,}")

# カスタム処理
def extract_hour(log):
    timestamp = log['timestamp']
    return timestamp.split()[1].split(':')[0]

hourly_distribution = bag.map(extract_hour).frequencies()
print(f"\n時間別分布:")
print(hourly_distribution.compute())

カスタムタスクグラフ

import dask
from dask.threaded import get

# カスタムタスクグラフの定義
# DAG (Directed Acyclic Graph) 形式
task_graph = {
    'x': 1,
    'y': 2,
    'z': (lambda a, b: a + b, 'x', 'y'),
    'w': (lambda a: a * 2, 'z'),
    'result': (lambda a, b: a ** b, 'w', 'y')
}

# タスクグラフの実行
result = get(task_graph, 'result')
print(f"=== カスタムタスクグラフ ===")
print(f"結果: {result}")

# 複雑なタスクグラフ
def load_data(source):
    print(f"Loading from {source}")
    return f"data_{source}"

def process(data):
    print(f"Processing {data}")
    return f"processed_{data}"

def merge(data1, data2):
    print(f"Merging {data1} and {data2}")
    return f"merged_{data1}_{data2}"

complex_graph = {
    'load_a': (load_data, 'source_A'),
    'load_b': (load_data, 'source_B'),
    'process_a': (process, 'load_a'),
    'process_b': (process, 'load_b'),
    'final': (merge, 'process_a', 'process_b')
}

final_result = get(complex_graph, 'final')
print(f"\n最終結果: {final_result}")

並列map/apply

import dask.dataframe as dd
import pandas as pd
import numpy as np

# サンプルデータ
ddf = dd.from_pandas(
    pd.DataFrame({
        'A': np.random.randn(10000),
        'B': np.random.randn(10000),
        'C': np.random.choice(['X', 'Y', 'Z'], 10000)
    }),
    npartitions=4
)

# map_partitions: 各パーティションに関数を適用
def custom_processing(partition):
    # パーティションごとのカスタム処理
    partition['A_squared'] = partition['A'] ** 2
    partition['B_log'] = np.log1p(np.abs(partition['B']))
    return partition

ddf_processed = ddf.map_partitions(custom_processing)

print("=== map_partitions ===")
print(ddf_processed.head())

# apply: 各行に関数を適用
def row_function(row):
    return row['A'] * row['B']

ddf['A_times_B'] = ddf.apply(row_function, axis=1, meta=('A_times_B', 'f8'))

print("\n=== apply ===")
print(ddf.head())

# 集計関数のカスタマイズ
def custom_agg(partition):
    return pd.Series({
        'mean': partition['A'].mean(),
        'std': partition['A'].std(),
        'min': partition['A'].min(),
        'max': partition['A'].max()
    })

stats = ddf.map_partitions(custom_agg).compute()
print("\n=== カスタム集計 ===")
print(stats)

3.5 Daskクラスタ管理

LocalCluster:ローカル並列処理

from dask.distributed import Client, LocalCluster
import dask.array as da

# LocalClusterの詳細設定
cluster = LocalCluster(
    n_workers=4,
    threads_per_worker=2,
    memory_limit='2GB',
    dashboard_address=':8787'
)

client = Client(cluster)

print("=== LocalCluster 情報 ===")
print(f"ワーカー数: {len(client.scheduler_info()['workers'])}")
print(f"スレッド数: {sum(w['nthreads'] for w in client.scheduler_info()['workers'].values())}")
print(f"メモリ制限: {cluster.worker_spec[0]['options']['memory_limit']}")
print(f"ダッシュボード: {client.dashboard_link}")

# ワーカー情報の詳細
for worker_id, info in client.scheduler_info()['workers'].items():
    print(f"\nワーカー {worker_id}:")
    print(f"  スレッド: {info['nthreads']}")
    print(f"  メモリ: {info['memory_limit'] / 1e9:.2f} GB")

# 計算の実行
x = da.random.random((10000, 10000), chunks=(1000, 1000))
result = (x + x.T).mean().compute()

print(f"\n計算結果: {result:.6f}")

# クラスタのクローズ
client.close()
cluster.close()

分散スケジューラ

from dask.distributed import Client, progress
import dask.array as da
import time

# クライアントの起動
client = Client(n_workers=4, threads_per_worker=2)

# 大規模計算のスケジューリング
x = da.random.random((50000, 50000), chunks=(5000, 5000))
y = da.random.random((50000, 50000), chunks=(5000, 5000))

# 複数の計算を同時にスケジュール
results = []
for i in range(5):
    result = (x + y * i).sum()
    results.append(result)

# 進捗状況の表示
futures = client.compute(results)
progress(futures)

# 結果の取得
final_results = client.gather(futures)

print("\n=== 計算結果 ===")
for i, result in enumerate(final_results):
    print(f"計算 {i + 1}: {result:.2f}")

# スケジューラの統計
print("\n=== スケジューラ統計 ===")
print(f"完了したタスク数: {client.scheduler_info()['total_occupancy']}")
print(f"アクティブなワーカー: {len(client.scheduler_info()['workers'])}")

パフォーマンス最適化

from dask.distributed import Client, performance_report
import dask.dataframe as dd
import dask.array as da

client = Client(n_workers=4)

# パフォーマンスレポートの生成
with performance_report(filename="dask_performance.html"):
    # データフレーム操作
    ddf = dd.read_csv('data_part_*.csv')
    result1 = ddf.groupby('category')['value'].mean().compute()

    # 配列操作
    x = da.random.random((10000, 10000), chunks=(1000, 1000))
    result2 = (x + x.T).mean().compute()

    print("計算完了")

print("パフォーマンスレポートを dask_performance.html に保存しました")

# メモリ使用量の確認
memory_info = client.run(lambda: {
    'used': sum(v['memory'] for v in client.scheduler_info()['workers'].values()),
    'limit': sum(v['memory_limit'] for v in client.scheduler_info()['workers'].values())
})

print("\n=== メモリ使用状況 ===")
for worker, info in memory_info.items():
    print(f"ワーカー {worker}: 使用率 N/A")

# タスク実行統計
print("\n=== タスク実行統計 ===")
print(f"処理されたバイト数: {client.scheduler_info().get('total_occupancy', 'N/A')}")

クラスタのスケーリング

from dask.distributed import Client
from dask_kubernetes import KubeCluster

# Kubernetesクラスタの設定(例)
"""
cluster = KubeCluster(
    name='dask-cluster',
    namespace='default',
    image='daskdev/dask:latest',
    n_workers=10,
    resources={
        'requests': {'memory': '4Gi', 'cpu': '2'},
        'limits': {'memory': '8Gi', 'cpu': '4'}
    }
)

client = Client(cluster)

# 動的スケーリング
cluster.adapt(minimum=2, maximum=20)

print(f"クラスタ情報: {cluster}")
"""

# ローカルでのアダプティブスケーリング
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)

# ワーカー数を動的に調整
cluster.adapt(minimum=2, maximum=8)

print("=== アダプティブスケーリング ===")
print(f"最小ワーカー数: 2")
print(f"最大ワーカー数: 8")
print(f"現在のワーカー数: {len(client.scheduler_info()['workers'])}")

# 負荷をかけてスケーリングを確認
import dask.array as da
x = da.random.random((50000, 50000), chunks=(1000, 1000))
result = x.sum().compute()

print(f"\n計算後のワーカー数: {len(client.scheduler_info()['workers'])}")

モニタリングとデバッグ

from dask.distributed import Client
import dask.array as da

client = Client(n_workers=4)

# タスクの監視
x = da.random.random((10000, 10000), chunks=(1000, 1000))
future = client.compute(x.sum())

# タスクの状態確認
print("=== タスク状態 ===")
print(f"状態: {future.status}")
print(f"キー: {future.key}")

# 結果を待つ
result = future.result()
print(f"結果: {result:.6f}")

# ワーカーログの取得
logs = client.get_worker_logs()
print("\n=== ワーカーログ(最初のワーカー)===")
first_worker = list(logs.keys())[0]
print(f"ワーカー: {first_worker}")
print(logs[first_worker][:500])  # 最初の500文字

# タスクグラフの統計
print("\n=== タスクグラフ統計 ===")
print(f"タスク数: {len(x.__dask_graph__())}")
print(f"レイヤー数: {len(x.__dask_layers__())}")

3.6 本章のまとめ

学んだこと

  1. Daskの基本

    • Pandas/NumPy互換のAPI
    • 遅延評価とタスクグラフ
    • 分散並列処理の仕組み
  2. Dask コレクション

    • Dask Array: 大規模NumPy配列
    • Dask DataFrame: 大規模データフレーム
    • Dask Bag: 非構造化データ処理
  3. Dask-ML

    • スケーラブルな機械学習
    • 増分学習とハイパーパラメータチューニング
    • 前処理パイプライン
  4. 並列計算パターン

    • dask.delayed: 関数の遅延実行
    • dask.bag: 非構造化データ
    • カスタムタスクグラフ
    • map_partitions/apply
  5. クラスタ管理

    • LocalCluster: ローカル並列処理
    • 分散スケジューラ
    • パフォーマンス最適化
    • 動的スケーリング

Daskのベストプラクティス

項目 推奨事項
チャンクサイズ 100MB-1GB程度が理想的
パーティション数 ワーカー数の2-4倍
compute()の使用 複数の計算を一度にcompute()で実行
persist()の活用 再利用するデータはメモリに保持
インデックス設定 sorted=Trueで高速なフィルタリング

Spark vs Dask 比較

項目 Spark Dask
言語 Scala/Java/Python Python専用
API 独自API Pandas/NumPy互換
学習曲線 緩やか
エコシステム 大規模・成熟 小規模・成長中
適用場面 超大規模バッチ処理 中規模・対話的処理
メモリ管理 JVMベース Pythonネイティブ

次の章へ

第4章では、データベースとストレージ最適化を学びます:


演習問題

問題1(難易度:easy)

DaskとPandasの主な違いを3つ挙げ、それぞれの特徴を説明してください。

解答例

解答

  1. 実行モデル

    • Pandas: 即座に実行(Eager Evaluation)
    • Dask: 遅延評価(Lazy Evaluation)- compute()で実行
  2. スケーラビリティ

    • Pandas: メモリに収まるデータのみ
    • Dask: メモリに収まらないデータも処理可能
  3. 並列処理

    • Pandas: 単一プロセス
    • Dask: 複数ワーカーで並列処理可能

使い分け

問題2(難易度:medium)

以下のコードを実行し、遅延評価の仕組みを確認してください。なぜ2つの出力が異なるのか説明してください。

import dask.array as da

x = da.random.random((1000, 1000), chunks=(100, 100))
y = x + 1
z = y * 2

print("1.", z)
print("2.", z.compute())
解答例
import dask.array as da

x = da.random.random((1000, 1000), chunks=(100, 100))
y = x + 1
z = y * 2

print("1.", z)
print("2.", z.compute())

出力

1. dask.array<mul, shape=(1000, 1000), dtype=float64, chunksize=(100, 100), chunktype=numpy.ndarray>
2. [[1.234 2.567 ...] [3.890 1.456 ...] ...]

説明

  1. 1つ目の出力(遅延オブジェクト)

    • zは遅延評価オブジェクトで、計算はまだ実行されていない
    • タスクグラフのみが構築されている状態
    • メタデータ(形状、データ型、チャンクサイズ)のみ表示
  2. 2つ目の出力(計算結果)

    • compute()を呼ぶことで実際に計算が実行される
    • タスクグラフが実行され、結果がNumPy配列として返される

遅延評価の利点

問題3(難易度:medium)

Dask DataFrameで1億行のデータを処理する際の適切なパーティション数を計算してください。各パーティションが約100MBになるようにします。1行あたり50バイトと仮定します。

解答例

解答

# 与えられた情報
total_rows = 100_000_000  # 1億行
bytes_per_row = 50  # 1行あたり50バイト
target_partition_size_mb = 100  # 目標パーティションサイズ(MB)

# 計算
total_size_bytes = total_rows * bytes_per_row
total_size_mb = total_size_bytes / (1024 ** 2)

partition_count = total_size_mb / target_partition_size_mb

print("=== パーティション数の計算 ===")
print(f"総データサイズ: {total_size_mb:.2f} MB ({total_size_bytes / 1e9:.2f} GB)")
print(f"目標パーティションサイズ: {target_partition_size_mb} MB")
print(f"必要なパーティション数: {partition_count:.0f}")
print(f"各パーティションの行数: {total_rows / partition_count:,.0f}行")

# Dask DataFrameでの実装例
import dask.dataframe as dd
import pandas as pd
import numpy as np

# サンプルデータ(実際は1億行)
df = pd.DataFrame({
    'id': range(1000000),
    'value': np.random.randn(1000000)
})

# 計算したパーティション数で分割
npartitions = int(partition_count)
ddf = dd.from_pandas(df, npartitions=npartitions)

print(f"\nDask DataFrame:")
print(f"  パーティション数: {ddf.npartitions}")
print(f"  各パーティションの推定サイズ: {total_size_mb / npartitions:.2f} MB")

出力

=== パーティション数の計算 ===
総データサイズ: 4768.37 MB (5.00 GB)
目標パーティションサイズ: 100 MB
必要なパーティション数: 48
各パーティションの行数: 2,083,333行

Dask DataFrame:
  パーティション数: 48
  各パーティションの推定サイズ: 99.34 MB

ベストプラクティス

問題4(難易度:hard)

dask.delayedを使って、以下の依存関係を持つタスクを並列実行してください。タスクグラフも可視化してください。

解答例
import dask
from dask import delayed
import time

# タスク定義
@delayed
def task_a():
    time.sleep(2)
    print("タスクA完了")
    return 10

@delayed
def task_b():
    time.sleep(2)
    print("タスクB完了")
    return 20

@delayed
def task_c(a_result, b_result):
    time.sleep(1)
    print("タスクC完了")
    return a_result + b_result

@delayed
def task_d(c_result):
    time.sleep(1)
    print("タスクD完了")
    return c_result * 2

# タスクグラフの構築
print("=== タスクグラフ構築 ===")
a = task_a()
b = task_b()
c = task_c(a, b)
d = task_d(c)

print("タスクグラフ構築完了(まだ実行されていません)")

# タスクグラフの可視化
d.visualize(filename='task_dependency_graph.png')
print("タスクグラフを task_dependency_graph.png に保存しました")

# 実行
print("\n=== タスク実行開始 ===")
start_time = time.time()
result = d.compute()
end_time = time.time()

print(f"\n=== 結果 ===")
print(f"最終結果: {result}")
print(f"実行時間: {end_time - start_time:.2f}秒")

# 期待される実行時間
print(f"\n期待される実行時間:")
print(f"  逐次実行: 2 + 2 + 1 + 1 = 6秒")
print(f"  並列実行: max(2, 2) + 1 + 1 = 4秒")

出力

=== タスクグラフ構築 ===
タスクグラフ構築完了(まだ実行されていません)
タスクグラフを task_dependency_graph.png に保存しました

=== タスク実行開始 ===
タスクA完了
タスクB完了
タスクC完了
タスクD完了

=== 結果 ===
最終結果: 60
実行時間: 4.02秒

期待される実行時間:
  逐次実行: 2 + 2 + 1 + 1 = 6秒
  並列実行: max(2, 2) + 1 + 1 = 4秒

タスクグラフの説明

問題5(難易度:hard)

Dask DataFrameで大規模なCSVファイルを読み込み、以下の処理を実行してください:

  1. 欠損値を含む行を削除
  2. 特定のカラムでグループ化し、平均を計算
  3. 結果をParquetファイルに保存
  4. パフォーマンスを最適化する
解答例
import dask.dataframe as dd
import pandas as pd
import numpy as np
import time
from dask.distributed import Client, performance_report

# サンプルデータの作成(大規模データをシミュレート)
print("=== サンプルデータ作成 ===")
for i in range(10):
    df = pd.DataFrame({
        'id': range(i * 100000, (i + 1) * 100000),
        'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], 100000),
        'value1': np.random.randn(100000),
        'value2': np.random.randn(100000),
        'timestamp': pd.date_range('2024-01-01', periods=100000, freq='s')
    })
    # 意図的に欠損値を追加
    df.loc[np.random.choice(100000, 1000, replace=False), 'value1'] = np.nan
    df.to_csv(f'large_data_{i}.csv', index=False)

print("サンプルデータ作成完了")

# Daskクラスタの起動
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
print(f"\nDaskクライアント: {client}")

# パフォーマンスレポート付きで処理
with performance_report(filename="processing_performance.html"):

    print("\n=== Step 1: データ読み込み ===")
    start = time.time()

    # CSVファイルの並列読み込み
    ddf = dd.read_csv(
        'large_data_*.csv',
        parse_dates=['timestamp'],
        assume_missing=True
    )

    print(f"読み込み完了: {time.time() - start:.2f}秒")
    print(f"パーティション数: {ddf.npartitions}")
    print(f"推定行数: ~{len(ddf):,}行")

    print("\n=== Step 2: 欠損値削除 ===")
    start = time.time()

    # 欠損値を含む行を削除
    ddf_clean = ddf.dropna()

    print(f"欠損値削除完了: {time.time() - start:.2f}秒")

    print("\n=== Step 3: グループ集計 ===")
    start = time.time()

    # カテゴリ別の平均計算
    result = ddf_clean.groupby('category').agg({
        'value1': ['mean', 'std', 'count'],
        'value2': ['mean', 'std', 'count']
    })

    # 計算の実行
    result_computed = result.compute()

    print(f"集計完了: {time.time() - start:.2f}秒")
    print("\n集計結果:")
    print(result_computed)

    print("\n=== Step 4: Parquet保存 ===")
    start = time.time()

    # Parquetフォーマットで保存(パーティション分割)
    ddf_clean.to_parquet(
        'output_data.parquet',
        engine='pyarrow',
        partition_on=['category'],
        compression='snappy'
    )

    print(f"保存完了: {time.time() - start:.2f}秒")

print("\n=== 最適化のポイント ===")
print("1. パーティション数を調整(データサイズに応じて)")
print("2. インデックスを設定(高速なフィルタリング)")
print("3. persist()で中間結果をメモリに保持")
print("4. Parquetで保存(カラムナーストレージ)")
print("5. パフォーマンスレポートで分析")

# パーティション最適化の例
print("\n=== パーティション最適化 ===")

# 元のパーティション数
print(f"元のパーティション数: {ddf.npartitions}")

# 最適化(ワーカー数の2-4倍が推奨)
n_workers = len(client.scheduler_info()['workers'])
optimal_partitions = n_workers * 3

ddf_optimized = ddf.repartition(npartitions=optimal_partitions)
print(f"最適化後のパーティション数: {ddf_optimized.npartitions}")

# インデックス設定による高速化
ddf_indexed = ddf_clean.set_index('category', sorted=True)
print(f"インデックス設定後: {ddf_indexed.npartitions}パーティション")

# クラスタのクローズ
client.close()

print("\nパフォーマンスレポート: processing_performance.html")
print("処理完了!")

出力例

=== サンプルデータ作成 ===
サンプルデータ作成完了

Daskクライアント: <Client: 'tcp://127.0.0.1:xxxxx' processes=4 threads=8>

=== Step 1: データ読み込み ===
読み込み完了: 0.15秒
パーティション数: 10
推定行数: ~1,000,000行

=== Step 2: 欠損値削除 ===
欠損値削除完了: 0.01秒

=== Step 3: グループ集計 ===
集計完了: 1.23秒

集計結果:
          value1                    value2
            mean       std  count      mean       std  count
category
A        0.0012  0.999845 200145  -0.0008  1.000234 200145
B       -0.0023  1.001234 199876   0.0015  0.999876 199876
C        0.0034  0.998765 200234  -0.0021  1.001345 200234
D       -0.0011  1.000987 199987   0.0028  0.998654 199987
E        0.0019  0.999543 199758  -0.0013  1.000789 199758

=== Step 4: Parquet保存 ===
保存完了: 2.45秒

=== 最適化のポイント ===
1. パーティション数を調整(データサイズに応じて)
2. インデックスを設定(高速なフィルタリング)
3. persist()で中間結果をメモリに保持
4. Parquetで保存(カラムナーストレージ)
5. パフォーマンスレポートで分析

=== パーティション最適化 ===
元のパーティション数: 10
最適化後のパーティション数: 12
インデックス設定後: 5パーティション

パフォーマンスレポート: processing_performance.html
処理完了!

参考文献

  1. Dask Development Team. (2024). Dask: Scalable analytics in Python. https://docs.dask.org/
  2. Rocklin, M. (2015). Dask: Parallel Computation with Blocked algorithms and Task Scheduling. Proceedings of the 14th Python in Science Conference.
  3. McKinney, W. (2017). Python for Data Analysis (2nd ed.). O'Reilly Media.
  4. VanderPlas, J. (2016). Python Data Science Handbook. O'Reilly Media.
  5. Dask-ML Documentation. https://ml.dask.org/

免責事項