🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 3: Parallel Computing with Dask

Achieving Scalable Data Processing in Python

📖 Reading Time: 25-30 minutes 📊 Difficulty: Intermediate 💻 Code Examples: 10 📝 Exercises: 5

This chapter covers Parallel Computing with Dask. You will learn Dask's basic concepts, using Dask Array, and mechanisms of lazy evaluation.

Learning Objectives

By reading this chapter, you will master the following:


3.1 Overview of Dask

What is Dask

Dask is a Python-native parallel computing library that is compatible with NumPy and Pandas APIs while being capable of processing data that doesn't fit in memory.

"Dask = Pandas + Parallel Processing + Scalability" - Extending existing Python code to large-scale data

Key Features of Dask

Feature Description Benefit
Pandas/NumPy Compatible Use existing APIs as is Low learning cost
Lazy Evaluation Computation not executed until needed Room for optimization
Distributed Processing Can run in parallel on multiple machines Scalability
Dynamic Task Scheduling Efficient resource utilization Fast processing

Dask Architecture

graph TB A[Dask Collections] --> B[Task Graph] B --> C[Scheduler] C --> D[Worker 1] C --> E[Worker 2] C --> F[Worker 3] D --> G[Result] E --> G F --> G style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#f3e5f5 style D fill:#e8f5e9 style E fill:#e8f5e9 style F fill:#e8f5e9 style G fill:#c8e6c9

Installation and Setup

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Installation and Setup

Purpose: Demonstrate data visualization techniques
Target: Advanced
Execution time: 2-5 seconds
Dependencies: None
"""

import numpy as np
import pandas as pd
import dask
import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client
import matplotlib.pyplot as plt

# Check Dask version
print(f"Dask version: {dask.__version__}")

# Start local cluster
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
print(client)

# Dashboard URL (can be opened in browser)
print(f"\nDask Dashboard: {client.dashboard_link}")

Output:

Dask version: 2023.10.1

<Client: 'tcp://127.0.0.1:8786' processes=4 threads=8, memory=8.00 GB>

Dask Dashboard: http://127.0.0.1:8787/status

Pandas vs Dask DataFrame

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Pandas vs Dask DataFrame

Purpose: Demonstrate data manipulation and preprocessing
Target: Intermediate
Execution time: ~5 seconds
Dependencies: None
"""

import pandas as pd
import dask.dataframe as dd
import numpy as np

# Pandas DataFrame
df_pandas = pd.DataFrame({
    'x': np.random.random(10000),
    'y': np.random.random(10000),
    'z': np.random.choice(['A', 'B', 'C'], 10000)
})

# Convert to Dask DataFrame (split into 4 partitions)
df_dask = dd.from_pandas(df_pandas, npartitions=4)

print("=== Pandas DataFrame ===")
print(f"Type: {type(df_pandas)}")
print(f"Shape: {df_pandas.shape}")
print(f"Memory usage: {df_pandas.memory_usage(deep=True).sum() / 1024:.2f} KB")

print("\n=== Dask DataFrame ===")
print(f"Type: {type(df_dask)}")
print(f"Number of partitions: {df_dask.npartitions}")
print(f"Columns: {df_dask.columns.tolist()}")

# Dask uses lazy evaluation: execute with compute()
print("\nComputing mean:")
print(f"Pandas: {df_pandas['x'].mean():.6f}")
print(f"Dask: {df_dask['x'].mean().compute():.6f}")

Output:

=== Pandas DataFrame ===
Type: <class 'pandas.core.frame.DataFrame'>
Shape: (10000, 3)
Memory usage: 235.47 KB

=== Dask DataFrame ===
Type: <class 'dask.dataframe.core.DataFrame'>
Number of partitions: 4
Columns: ['x', 'y', 'z']

Computing mean:
Pandas: 0.499845
Dask: 0.499845

Visualizing Task Graphs

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Visualizing Task Graphs

Purpose: Demonstrate optimization techniques
Target: Beginner
Execution time: 10-30 seconds
Dependencies: None
"""

import dask.array as da

# Visualize task graph for a simple computation
x = da.random.random((1000, 1000), chunks=(100, 100))
y = x + x.T
z = y.mean(axis=0)

# Display task graph
z.visualize(filename='dask_task_graph.png', optimize_graph=True)
print("Task graph saved to dask_task_graph.png")

# Check number of tasks
print(f"\nNumber of tasks: {len(z.__dask_graph__())}")
print(f"Number of chunks: {x.npartitions}")

Important: Dask automatically optimizes computations and identifies tasks that can be executed in parallel.


3.2 Dask Arrays & DataFrames

Dask Array: Large-Scale NumPy Arrays

Dask Array splits NumPy arrays into chunks and enables parallel processing.

Basic Operations

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0
# - numpy>=1.24.0, <2.0.0

"""
Example: Basic Operations

Purpose: Demonstrate core concepts and implementation patterns
Target: Beginner to Intermediate
Execution time: ~5 seconds
Dependencies: None
"""

import dask.array as da
import numpy as np

# Create large array (size that doesn't fit in memory)
# Array equivalent to 10GB (10000 x 10000 x 100 float64)
x = da.random.random((10000, 10000, 100), chunks=(1000, 1000, 10))

print("=== Dask Array ===")
print(f"Shape: {x.shape}")
print(f"Data type: {x.dtype}")
print(f"Chunk size: {x.chunks}")
print(f"Number of chunks: {x.npartitions}")
print(f"Estimated size: {x.nbytes / 1e9:.2f} GB")

# NumPy-compatible operations
mean_value = x.mean()
std_value = x.std()
max_value = x.max()

# Not yet computed due to lazy evaluation
print(f"\nMean (lazy): {mean_value}")

# Execute actual computation with compute()
print(f"Mean (executed): {mean_value.compute():.6f}")
print(f"Standard deviation: {std_value.compute():.6f}")
print(f"Maximum value: {max_value.compute():.6f}")

Output:

=== Dask Array ===
Shape: (10000, 10000, 100)
Data type: float64
Chunk size: ((1000, 1000, ...), (1000, 1000, ...), (10, 10, ...))
Number of chunks: 1000
Estimated size: 80.00 GB

Mean (lazy): dask.array<mean_agg-aggregate, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>

Mean (executed): 0.500021
Standard deviation: 0.288668
Maximum value: 0.999999

Linear Algebra Operations

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Linear Algebra Operations

Purpose: Demonstrate core concepts and implementation patterns
Target: Beginner
Execution time: ~5 seconds
Dependencies: None
"""

import dask.array as da

# Large-scale matrix operations
A = da.random.random((5000, 5000), chunks=(1000, 1000))
B = da.random.random((5000, 5000), chunks=(1000, 1000))

# Matrix multiplication (parallel computation)
C = da.matmul(A, B)

print("=== Matrix Operations ===")
print(f"A shape: {A.shape}, Number of chunks: {A.npartitions}")
print(f"B shape: {B.shape}, Number of chunks: {B.npartitions}")
print(f"C shape: {C.shape}, Number of chunks: {C.npartitions}")

# SVD (Singular Value Decomposition)
U, s, V = da.linalg.svd_compressed(A, k=50)

print(f"\nSingular Value Decomposition:")
print(f"U shape: {U.shape}")
print(f"Number of singular values: {len(s)}")
print(f"V shape: {V.shape}")

# Compute top 5 singular values
top_5_singular_values = s[:5].compute()
print(f"\nTop 5 singular values: {top_5_singular_values}")

Dask DataFrame: Large-Scale DataFrames

Reading CSV Files

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Reading CSV Files

Purpose: Demonstrate data manipulation and preprocessing
Target: Intermediate
Execution time: 1-3 seconds
Dependencies: None
"""

import dask.dataframe as dd
import pandas as pd
import numpy as np

# Create sample CSV files (simulate large-scale data)
for i in range(5):
    df = pd.DataFrame({
        'id': range(i * 1000000, (i + 1) * 1000000),
        'value': np.random.randn(1000000),
        'category': np.random.choice(['A', 'B', 'C', 'D'], 1000000),
        'timestamp': pd.date_range('2024-01-01', periods=1000000, freq='s')
    })
    df.to_csv(f'data_part_{i}.csv', index=False)

# Read multiple CSV files in parallel with Dask
ddf = dd.read_csv('data_part_*.csv', parse_dates=['timestamp'])

print("=== Dask DataFrame ===")
print(f"Number of partitions: {ddf.npartitions}")
print(f"Columns: {ddf.columns.tolist()}")
print(f"Estimated rows: ~{len(ddf)} rows")  # Can estimate without compute()

# Check data types
print(f"\nData types:")
print(ddf.dtypes)

# Display first few rows (compute only part)
print(f"\nFirst 5 rows:")
print(ddf.head())

DataFrame Operations

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: DataFrame Operations

Purpose: Demonstrate data manipulation and preprocessing
Target: Beginner to Intermediate
Execution time: ~5 seconds
Dependencies: None
"""

import dask.dataframe as dd

# Group aggregation
category_stats = ddf.groupby('category')['value'].agg(['mean', 'std', 'count'])

print("=== Statistics by Category (Lazy Evaluation) ===")
print(category_stats)

# Execute with compute()
print("\n=== Statistics by Category (Execution Result) ===")
result = category_stats.compute()
print(result)

# Filtering and transformation
filtered = ddf[ddf['value'] > 0]
filtered['value_squared'] = filtered['value'] ** 2

# Time series aggregation
daily_stats = ddf.set_index('timestamp').resample('D')['value'].mean()

print("\n=== Daily Average (First 5 Days) ===")
print(daily_stats.head())

# Execute multiple computations at once (efficient)
mean_val, std_val, filtered_count = dask.compute(
    ddf['value'].mean(),
    ddf['value'].std(),
    len(filtered)
)

print(f"\nOverall Statistics:")
print(f"Mean: {mean_val:.6f}")
print(f"Standard deviation: {std_val:.6f}")
print(f"Count of positive values: {filtered_count:,}")

Partition Optimization

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Partition Optimization

Purpose: Demonstrate data manipulation and preprocessing
Target: Beginner
Execution time: 10-30 seconds
Dependencies: None
"""

import dask.dataframe as dd

# Rebalance partitions
print(f"Original partition count: {ddf.npartitions}")

# Adjust number of partitions (balance memory and CPU)
ddf_optimized = ddf.repartition(npartitions=20)
print(f"Optimized partition count: {ddf_optimized.npartitions}")

# Partition by index
ddf_indexed = ddf.set_index('category', sorted=True)
print(f"\nAfter setting index:")
print(f"Partition count: {ddf_indexed.npartitions}")
print(f"Known divisions: {ddf_indexed.known_divisions}")

# Check partition sizes
partition_sizes = ddf.map_partitions(len).compute()
print(f"\nSize of each partition: {partition_sizes.tolist()[:10]}")  # First 10

Best Practice: Ideal partition size is around 100MB-1GB.


3.3 Dask-ML: Scalable Machine Learning

What is Dask-ML

Dask-ML extends scikit-learn's API and enables machine learning on large-scale datasets.

Data Preprocessing

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Data Preprocessing

Purpose: Demonstrate machine learning model training and evaluation
Target: Beginner
Execution time: 1-5 minutes
Dependencies: None
"""

import dask.dataframe as dd
import dask.array as da
from dask_ml.preprocessing import StandardScaler, LabelEncoder
from dask_ml.model_selection import train_test_split

# Create large dataset
ddf = dd.read_csv('data_part_*.csv', parse_dates=['timestamp'])

# Feature extraction
ddf['hour'] = ddf['timestamp'].dt.hour
ddf['day_of_week'] = ddf['timestamp'].dt.dayofweek

# Label encoding
le = LabelEncoder()
ddf['category_encoded'] = le.fit_transform(ddf['category'])

# Separate features and target
X = ddf[['value', 'hour', 'day_of_week', 'category_encoded']].to_dask_array(lengths=True)
y = (ddf['value'] > 0).astype(int).to_dask_array(lengths=True)

print("=== Features ===")
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")

# Data splitting
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, shuffle=True
)

print(f"\nTraining data: {X_train.shape[0].compute():,} rows")
print(f"Test data: {X_test.shape[0].compute():,} rows")

# Standardization
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"\nData type after standardization: {type(X_train_scaled)}")

Incremental Learning

from dask_ml.linear_model import LogisticRegression
from dask_ml.metrics import accuracy_score, log_loss

# Logistic Regression (incremental learning)
clf = LogisticRegression(max_iter=100, solver='lbfgs', random_state=42)

# Parallel training
clf.fit(X_train_scaled, y_train)

# Prediction
y_pred = clf.predict(X_test_scaled)
y_pred_proba = clf.predict_proba(X_test_scaled)

# Evaluation
accuracy = accuracy_score(y_test, y_pred)
loss = log_loss(y_test, y_pred_proba)

print("=== Model Performance ===")
print(f"Accuracy: {accuracy.compute():.4f}")
print(f"Log loss: {loss.compute():.4f}")

# Check coefficients
print(f"\nModel coefficients: {clf.coef_}")
print(f"Intercept: {clf.intercept_}")

Hyperparameter Tuning

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

"""
Example: Hyperparameter Tuning

Purpose: Demonstrate machine learning model training and evaluation
Target: Beginner to Intermediate
Execution time: 1-5 minutes
Dependencies: None
"""

from dask_ml.model_selection import GridSearchCV
from dask_ml.linear_model import LogisticRegression
import numpy as np

# Parameter grid
param_grid = {
    'C': [0.001, 0.01, 0.1, 1, 10],
    'penalty': ['l1', 'l2'],
    'solver': ['saga']
}

# Grid search (parallel execution)
clf = LogisticRegression(max_iter=100, random_state=42)
grid_search = GridSearchCV(
    clf,
    param_grid,
    cv=3,
    scoring='accuracy',
    n_jobs=-1
)

print("=== Grid Search Started ===")
print(f"Number of parameter combinations: {len(param_grid['C']) * len(param_grid['penalty'])}")

# Training (execute with sampled data)
X_train_sample = X_train_scaled[:100000].compute()
y_train_sample = y_train[:100000].compute()

grid_search.fit(X_train_sample, y_train_sample)

print("\n=== Grid Search Results ===")
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_:.4f}")

# Detailed results
results_df = pd.DataFrame(grid_search.cv_results_)
print("\nTop 3 parameter combinations:")
print(results_df[['params', 'mean_test_score', 'std_test_score']].nlargest(3, 'mean_test_score'))

Random Forest with Dask-ML

from dask_ml.ensemble import RandomForestClassifier
from dask_ml.metrics import accuracy_score, classification_report

# Random Forest
rf_clf = RandomForestClassifier(
    n_estimators=10,
    max_depth=10,
    random_state=42,
    n_jobs=-1
)

print("=== Random Forest Training ===")
rf_clf.fit(X_train_scaled, y_train)

# Prediction
y_pred_rf = rf_clf.predict(X_test_scaled)

# Evaluation
accuracy_rf = accuracy_score(y_test, y_pred_rf)

print(f"Accuracy: {accuracy_rf.compute():.4f}")

# Feature importance
feature_importance = rf_clf.feature_importances_
feature_names = ['value', 'hour', 'day_of_week', 'category_encoded']

print("\nFeature Importance:")
for name, importance in zip(feature_names, feature_importance):
    print(f"  {name}: {importance:.4f}")

Building Pipelines

from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline

# Preprocessing pipeline
numeric_features = ['value', 'hour', 'day_of_week']
categorical_features = ['category_encoded']

preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(), categorical_features)
    ]
)

# Complete pipeline
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=100, random_state=42))
])

print("=== Pipeline ===")
print(pipeline)

# Pipeline training and evaluation
pipeline.fit(X_train, y_train)
y_pred_pipeline = pipeline.predict(X_test)
accuracy_pipeline = accuracy_score(y_test, y_pred_pipeline)

print(f"\nPipeline accuracy: {accuracy_pipeline.compute():.4f}")

3.4 Parallel Computing Patterns

dask.delayed: Delayed Function Execution

dask.delayed converts arbitrary Python functions to lazy evaluation.

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: dask.delayedconverts arbitrary Python functions to lazy eval

Purpose: Demonstrate core concepts and implementation patterns
Target: Beginner to Intermediate
Execution time: 5-10 seconds
Dependencies: None
"""

import dask
from dask import delayed
import time

# Regular function
def process_data(x):
    time.sleep(1)  # Simulate time-consuming processing
    return x ** 2

def aggregate(results):
    return sum(results)

# Sequential execution
print("=== Sequential Execution ===")
start = time.time()
results = []
for i in range(8):
    results.append(process_data(i))
total = aggregate(results)
print(f"Result: {total}")
print(f"Execution time: {time.time() - start:.2f} seconds")

# Parallel execution (dask.delayed)
print("\n=== Parallel Execution (dask.delayed) ===")
start = time.time()
results_delayed = []
for i in range(8):
    result = delayed(process_data)(i)
    results_delayed.append(result)

total_delayed = delayed(aggregate)(results_delayed)
total = total_delayed.compute()

print(f"Result: {total}")
print(f"Execution time: {time.time() - start:.2f} seconds")

# Visualize task graph
total_delayed.visualize(filename='delayed_task_graph.png')
print("\nTask graph saved to delayed_task_graph.png")

Output:

=== Sequential Execution ===
Result: 140
Execution time: 8.02 seconds

=== Parallel Execution (dask.delayed) ===
Result: 140
Execution time: 2.03 seconds

Note: With 4 workers executing in parallel, we achieved approximately 4x speedup.

dask.bag: Unstructured Data Processing

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: dask.bag: Unstructured Data Processing

Purpose: Demonstrate core concepts and implementation patterns
Target: Intermediate
Execution time: ~5 seconds
Dependencies: None
"""

import dask.bag as db
import json

# Create JSON file (simulate log data)
logs = [
    {'timestamp': '2024-01-01 10:00:00', 'level': 'INFO', 'message': 'User login'},
    {'timestamp': '2024-01-01 10:01:00', 'level': 'ERROR', 'message': 'Connection failed'},
    {'timestamp': '2024-01-01 10:02:00', 'level': 'INFO', 'message': 'User logout'},
    {'timestamp': '2024-01-01 10:03:00', 'level': 'WARNING', 'message': 'Slow query'},
] * 1000

with open('logs.json', 'w') as f:
    for log in logs:
        f.write(json.dumps(log) + '\n')

# Read with Dask Bag
bag = db.read_text('logs.json').map(json.loads)

print("=== Dask Bag ===")
print(f"Number of partitions: {bag.npartitions}")

# Aggregate each log level
level_counts = bag.pluck('level').frequencies()
print(f"\nCount by log level:")
print(level_counts.compute())

# Filter error logs
errors = bag.filter(lambda x: x['level'] == 'ERROR')
print(f"\nNumber of error logs: {errors.count().compute():,}")

# Custom processing
def extract_hour(log):
    timestamp = log['timestamp']
    return timestamp.split()[1].split(':')[0]

hourly_distribution = bag.map(extract_hour).frequencies()
print(f"\nDistribution by hour:")
print(hourly_distribution.compute())

Custom Task Graphs

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Custom Task Graphs

Purpose: Demonstrate core concepts and implementation patterns
Target: Beginner to Intermediate
Execution time: ~5 seconds
Dependencies: None
"""

import dask
from dask.threaded import get

# Define custom task graph
# DAG (Directed Acyclic Graph) format
task_graph = {
    'x': 1,
    'y': 2,
    'z': (lambda a, b: a + b, 'x', 'y'),
    'w': (lambda a: a * 2, 'z'),
    'result': (lambda a, b: a ** b, 'w', 'y')
}

# Execute task graph
result = get(task_graph, 'result')
print(f"=== Custom Task Graph ===")
print(f"Result: {result}")

# Complex task graph
def load_data(source):
    print(f"Loading from {source}")
    return f"data_{source}"

def process(data):
    print(f"Processing {data}")
    return f"processed_{data}"

def merge(data1, data2):
    print(f"Merging {data1} and {data2}")
    return f"merged_{data1}_{data2}"

complex_graph = {
    'load_a': (load_data, 'source_A'),
    'load_b': (load_data, 'source_B'),
    'process_a': (process, 'load_a'),
    'process_b': (process, 'load_b'),
    'final': (merge, 'process_a', 'process_b')
}

final_result = get(complex_graph, 'final')
print(f"\nFinal result: {final_result}")

Parallel map/apply

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Parallel map/apply

Purpose: Demonstrate data manipulation and preprocessing
Target: Intermediate
Execution time: ~5 seconds
Dependencies: None
"""

import dask.dataframe as dd
import pandas as pd
import numpy as np

# Sample data
ddf = dd.from_pandas(
    pd.DataFrame({
        'A': np.random.randn(10000),
        'B': np.random.randn(10000),
        'C': np.random.choice(['X', 'Y', 'Z'], 10000)
    }),
    npartitions=4
)

# map_partitions: Apply function to each partition
def custom_processing(partition):
    # Custom processing per partition
    partition['A_squared'] = partition['A'] ** 2
    partition['B_log'] = np.log1p(np.abs(partition['B']))
    return partition

ddf_processed = ddf.map_partitions(custom_processing)

print("=== map_partitions ===")
print(ddf_processed.head())

# apply: Apply function to each row
def row_function(row):
    return row['A'] * row['B']

ddf['A_times_B'] = ddf.apply(row_function, axis=1, meta=('A_times_B', 'f8'))

print("\n=== apply ===")
print(ddf.head())

# Customize aggregation function
def custom_agg(partition):
    return pd.Series({
        'mean': partition['A'].mean(),
        'std': partition['A'].std(),
        'min': partition['A'].min(),
        'max': partition['A'].max()
    })

stats = ddf.map_partitions(custom_agg).compute()
print("\n=== Custom Aggregation ===")
print(stats)

3.5 Dask Cluster Management

LocalCluster: Local Parallel Processing

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: LocalCluster: Local Parallel Processing

Purpose: Demonstrate core concepts and implementation patterns
Target: Advanced
Execution time: 5-10 seconds
Dependencies: None
"""

from dask.distributed import Client, LocalCluster
import dask.array as da

# Detailed LocalCluster configuration
cluster = LocalCluster(
    n_workers=4,
    threads_per_worker=2,
    memory_limit='2GB',
    dashboard_address=':8787'
)

client = Client(cluster)

print("=== LocalCluster Information ===")
print(f"Number of workers: {len(client.scheduler_info()['workers'])}")
print(f"Number of threads: {sum(w['nthreads'] for w in client.scheduler_info()['workers'].values())}")
print(f"Memory limit: {cluster.worker_spec[0]['options']['memory_limit']}")
print(f"Dashboard: {client.dashboard_link}")

# Worker information details
for worker_id, info in client.scheduler_info()['workers'].items():
    print(f"\nWorker {worker_id}:")
    print(f"  Threads: {info['nthreads']}")
    print(f"  Memory: {info['memory_limit'] / 1e9:.2f} GB")

# Execute computation
x = da.random.random((10000, 10000), chunks=(1000, 1000))
result = (x + x.T).mean().compute()

print(f"\nComputation result: {result:.6f}")

# Close cluster
client.close()
cluster.close()

Distributed Scheduler

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Distributed Scheduler

Purpose: Demonstrate core concepts and implementation patterns
Target: Advanced
Execution time: 5-10 seconds
Dependencies: None
"""

from dask.distributed import Client, progress
import dask.array as da
import time

# Start client
client = Client(n_workers=4, threads_per_worker=2)

# Schedule large-scale computation
x = da.random.random((50000, 50000), chunks=(5000, 5000))
y = da.random.random((50000, 50000), chunks=(5000, 5000))

# Schedule multiple computations simultaneously
results = []
for i in range(5):
    result = (x + y * i).sum()
    results.append(result)

# Display progress
futures = client.compute(results)
progress(futures)

# Get results
final_results = client.gather(futures)

print("\n=== Computation Results ===")
for i, result in enumerate(final_results):
    print(f"Computation {i + 1}: {result:.2f}")

# Scheduler statistics
print("\n=== Scheduler Statistics ===")
print(f"Completed tasks: {client.scheduler_info()['total_occupancy']}")
print(f"Active workers: {len(client.scheduler_info()['workers'])}")

Performance Optimization

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Performance Optimization

Purpose: Demonstrate data manipulation and preprocessing
Target: Advanced
Execution time: 1-3 seconds
Dependencies: None
"""

from dask.distributed import Client, performance_report
import dask.dataframe as dd
import dask.array as da

client = Client(n_workers=4)

# Generate performance report
with performance_report(filename="dask_performance.html"):
    # DataFrame operations
    ddf = dd.read_csv('data_part_*.csv')
    result1 = ddf.groupby('category')['value'].mean().compute()

    # Array operations
    x = da.random.random((10000, 10000), chunks=(1000, 1000))
    result2 = (x + x.T).mean().compute()

    print("Computation complete")

print("Performance report saved to dask_performance.html")

# Check memory usage
memory_info = client.run(lambda: {
    'used': sum(v['memory'] for v in client.scheduler_info()['workers'].values()),
    'limit': sum(v['memory_limit'] for v in client.scheduler_info()['workers'].values())
})

print("\n=== Memory Usage ===")
for worker, info in memory_info.items():
    print(f"Worker {worker}: Usage rate N/A")

# Task execution statistics
print("\n=== Task Execution Statistics ===")
print(f"Bytes processed: {client.scheduler_info().get('total_occupancy', 'N/A')}")

Cluster Scaling

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

from dask.distributed import Client
from dask_kubernetes import KubeCluster

# Kubernetes cluster configuration (example)
"""
cluster = KubeCluster(
    name='dask-cluster',
    namespace='default',
    image='daskdev/dask:latest',
    n_workers=10,
    resources={
        'requests': {'memory': '4Gi', 'cpu': '2'},
        'limits': {'memory': '8Gi', 'cpu': '4'}
    }
)

client = Client(cluster)

# Dynamic scaling
cluster.adapt(minimum=2, maximum=20)

print(f"Cluster info: {cluster}")
"""

# Adaptive scaling locally
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)

# Dynamically adjust number of workers
cluster.adapt(minimum=2, maximum=8)

print("=== Adaptive Scaling ===")
print(f"Minimum workers: 2")
print(f"Maximum workers: 8")
print(f"Current workers: {len(client.scheduler_info()['workers'])}")

# Load to verify scaling
import dask.array as da
x = da.random.random((50000, 50000), chunks=(1000, 1000))
result = x.sum().compute()

print(f"\nWorkers after computation: {len(client.scheduler_info()['workers'])}")

Monitoring and Debugging

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Monitoring and Debugging

Purpose: Demonstrate neural network implementation
Target: Advanced
Execution time: ~5 seconds
Dependencies: None
"""

from dask.distributed import Client
import dask.array as da

client = Client(n_workers=4)

# Monitor tasks
x = da.random.random((10000, 10000), chunks=(1000, 1000))
future = client.compute(x.sum())

# Check task status
print("=== Task Status ===")
print(f"Status: {future.status}")
print(f"Key: {future.key}")

# Wait for result
result = future.result()
print(f"Result: {result:.6f}")

# Get worker logs
logs = client.get_worker_logs()
print("\n=== Worker Logs (First Worker) ===")
first_worker = list(logs.keys())[0]
print(f"Worker: {first_worker}")
print(logs[first_worker][:500])  # First 500 characters

# Task graph statistics
print("\n=== Task Graph Statistics ===")
print(f"Number of tasks: {len(x.__dask_graph__())}")
print(f"Number of layers: {len(x.__dask_layers__())}")

3.6 Chapter Summary

What We Learned

  1. Dask Basics

    • Pandas/NumPy compatible API
    • Lazy evaluation and task graphs
    • Distributed parallel processing mechanism
  2. Dask Collections

    • Dask Array: Large-scale NumPy arrays
    • Dask DataFrame: Large-scale DataFrames
    • Dask Bag: Unstructured data processing
  3. Dask-ML

    • Scalable machine learning
    • Incremental learning and hyperparameter tuning
    • Preprocessing pipelines
  4. Parallel Computing Patterns

    • dask.delayed: Delayed function execution
    • dask.bag: Unstructured data
    • Custom task graphs
    • map_partitions/apply
  5. Cluster Management

    • LocalCluster: Local parallel processing
    • Distributed scheduler
    • Performance optimization
    • Dynamic scaling

Dask Best Practices

Item Recommendation
Chunk Size Ideally around 100MB-1GB
Number of Partitions 2-4 times the number of workers
Using compute() Execute multiple computations at once with compute()
Utilizing persist() Keep reused data in memory
Setting Index Use sorted=True for faster filtering

Spark vs Dask Comparison

Item Spark Dask
Language Scala/Java/Python Python only
API Proprietary API Pandas/NumPy compatible
Learning Curve Steep Gentle
Ecosystem Large-scale, mature Smaller, growing
Use Cases Ultra-large batch processing Medium-scale, interactive processing
Memory Management JVM-based Python native

To the Next Chapter

In Chapter 4, we will learn about Database and Storage Optimization:


Exercises

Problem 1 (Difficulty: easy)

List three main differences between Dask and Pandas, and explain the characteristics of each.

Answer

Answer:

  1. Execution Model

    • Pandas: Immediate execution (Eager Evaluation)
    • Dask: Lazy Evaluation - executes with compute()
  2. Scalability

    • Pandas: Only data that fits in memory
    • Dask: Can process data that doesn't fit in memory
  3. Parallel Processing

    • Pandas: Single process
    • Dask: Parallel processing with multiple workers

When to Use:

Problem 2 (Difficulty: medium)

Execute the following code and verify the lazy evaluation mechanism. Explain why the two outputs are different.

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Execute the following code and verify the lazy evaluation me

Purpose: Demonstrate core concepts and implementation patterns
Target: Beginner
Execution time: ~5 seconds
Dependencies: None
"""

import dask.array as da

x = da.random.random((1000, 1000), chunks=(100, 100))
y = x + 1
z = y * 2

print("1.", z)
print("2.", z.compute())
Answer
# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Execute the following code and verify the lazy evaluation me

Purpose: Demonstrate core concepts and implementation patterns
Target: Beginner
Execution time: ~5 seconds
Dependencies: None
"""

import dask.array as da

x = da.random.random((1000, 1000), chunks=(100, 100))
y = x + 1
z = y * 2

print("1.", z)
print("2.", z.compute())

Output:

1. dask.array<mul, shape=(1000, 1000), dtype=float64, chunksize=(100, 100), chunktype=numpy.ndarray>
2. [[1.234 2.567 ...] [3.890 1.456 ...] ...]

Explanation:

  1. First output (lazy object):

    • z is a lazy evaluation object, computation not yet executed
    • Only the task graph has been constructed
    • Only metadata (shape, data type, chunk size) is displayed
  2. Second output (computation result):

    • Calling compute() executes the actual computation
    • Task graph is executed and result returned as NumPy array

Benefits of Lazy Evaluation:

Problem 3 (Difficulty: medium)

Calculate the appropriate number of partitions when processing 100 million rows of data with Dask DataFrame. Aim for approximately 100MB per partition, assuming 50 bytes per row.

Answer

Answer:

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Answer:

Purpose: Demonstrate data manipulation and preprocessing
Target: Intermediate
Execution time: ~5 seconds
Dependencies: None
"""

# Given information
total_rows = 100_000_000  # 100 million rows
bytes_per_row = 50  # 50 bytes per row
target_partition_size_mb = 100  # Target partition size (MB)

# Calculation
total_size_bytes = total_rows * bytes_per_row
total_size_mb = total_size_bytes / (1024 ** 2)

partition_count = total_size_mb / target_partition_size_mb

print("=== Partition Count Calculation ===")
print(f"Total data size: {total_size_mb:.2f} MB ({total_size_bytes / 1e9:.2f} GB)")
print(f"Target partition size: {target_partition_size_mb} MB")
print(f"Required partitions: {partition_count:.0f}")
print(f"Rows per partition: {total_rows / partition_count:,.0f} rows")

# Implementation example with Dask DataFrame
import dask.dataframe as dd
import pandas as pd
import numpy as np

# Sample data (actually 100 million rows)
df = pd.DataFrame({
    'id': range(1000000),
    'value': np.random.randn(1000000)
})

# Split with calculated partition count
npartitions = int(partition_count)
ddf = dd.from_pandas(df, npartitions=npartitions)

print(f"\nDask DataFrame:")
print(f"  Number of partitions: {ddf.npartitions}")
print(f"  Estimated size per partition: {total_size_mb / npartitions:.2f} MB")

Output:

=== Partition Count Calculation ===
Total data size: 4768.37 MB (5.00 GB)
Target partition size: 100 MB
Required partitions: 48
Rows per partition: 2,083,333 rows

Dask DataFrame:
  Number of partitions: 48
  Estimated size per partition: 99.34 MB

Best Practices:

Problem 4 (Difficulty: hard)

Use dask.delayed to execute tasks with the following dependencies in parallel. Also visualize the task graph.

Answer
# Requirements:
# - Python 3.9+
# - dask>=2023.5.0

"""
Example: Use dask.delayed to execute tasks with the following depende

Purpose: Demonstrate core concepts and implementation patterns
Target: Beginner to Intermediate
Execution time: ~5 seconds
Dependencies: None
"""

import dask
from dask import delayed
import time

# Task definitions
@delayed
def task_a():
    time.sleep(2)
    print("Task A complete")
    return 10

@delayed
def task_b():
    time.sleep(2)
    print("Task B complete")
    return 20

@delayed
def task_c(a_result, b_result):
    time.sleep(1)
    print("Task C complete")
    return a_result + b_result

@delayed
def task_d(c_result):
    time.sleep(1)
    print("Task D complete")
    return c_result * 2

# Build task graph
print("=== Building Task Graph ===")
a = task_a()
b = task_b()
c = task_c(a, b)
d = task_d(c)

print("Task graph construction complete (not yet executed)")

# Visualize task graph
d.visualize(filename='task_dependency_graph.png')
print("Task graph saved to task_dependency_graph.png")

# Execute
print("\n=== Task Execution Started ===")
start_time = time.time()
result = d.compute()
end_time = time.time()

print(f"\n=== Result ===")
print(f"Final result: {result}")
print(f"Execution time: {end_time - start_time:.2f} seconds")

# Expected execution time
print(f"\nExpected execution time:")
print(f"  Sequential: 2 + 2 + 1 + 1 = 6 seconds")
print(f"  Parallel: max(2, 2) + 1 + 1 = 4 seconds")

Output:

=== Building Task Graph ===
Task graph construction complete (not yet executed)
Task graph saved to task_dependency_graph.png

=== Task Execution Started ===
Task A complete
Task B complete
Task C complete
Task D complete

=== Result ===
Final result: 60
Execution time: 4.02 seconds

Expected execution time:
  Sequential: 2 + 2 + 1 + 1 = 6 seconds
  Parallel: max(2, 2) + 1 + 1 = 4 seconds

Task Graph Explanation:

Problem 5 (Difficulty: hard)

Read a large CSV file with Dask DataFrame and perform the following processing:

  1. Remove rows with missing values
  2. Group by specific column and calculate mean
  3. Save results to Parquet file
  4. Optimize performance
Answer
# Requirements:
# - Python 3.9+
# - dask>=2023.5.0
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Read a large CSV file with Dask DataFrame and perform the fo

Purpose: Demonstrate data manipulation and preprocessing
Target: Advanced
Execution time: 10-30 seconds
Dependencies: None
"""

import dask.dataframe as dd
import pandas as pd
import numpy as np
import time
from dask.distributed import Client, performance_report

# Create sample data (simulate large-scale data)
print("=== Creating Sample Data ===")
for i in range(10):
    df = pd.DataFrame({
        'id': range(i * 100000, (i + 1) * 100000),
        'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], 100000),
        'value1': np.random.randn(100000),
        'value2': np.random.randn(100000),
        'timestamp': pd.date_range('2024-01-01', periods=100000, freq='s')
    })
    # Intentionally add missing values
    df.loc[np.random.choice(100000, 1000, replace=False), 'value1'] = np.nan
    df.to_csv(f'large_data_{i}.csv', index=False)

print("Sample data creation complete")

# Start Dask cluster
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
print(f"\nDask client: {client}")

# Process with performance report
with performance_report(filename="processing_performance.html"):

    print("\n=== Step 1: Data Loading ===")
    start = time.time()

    # Read CSV files in parallel
    ddf = dd.read_csv(
        'large_data_*.csv',
        parse_dates=['timestamp'],
        assume_missing=True
    )

    print(f"Loading complete: {time.time() - start:.2f} seconds")
    print(f"Number of partitions: {ddf.npartitions}")
    print(f"Estimated rows: ~{len(ddf):,} rows")

    print("\n=== Step 2: Remove Missing Values ===")
    start = time.time()

    # Remove rows with missing values
    ddf_clean = ddf.dropna()

    print(f"Missing values removed: {time.time() - start:.2f} seconds")

    print("\n=== Step 3: Group Aggregation ===")
    start = time.time()

    # Calculate mean by category
    result = ddf_clean.groupby('category').agg({
        'value1': ['mean', 'std', 'count'],
        'value2': ['mean', 'std', 'count']
    })

    # Execute computation
    result_computed = result.compute()

    print(f"Aggregation complete: {time.time() - start:.2f} seconds")
    print("\nAggregation results:")
    print(result_computed)

    print("\n=== Step 4: Save to Parquet ===")
    start = time.time()

    # Save in Parquet format (partitioned)
    ddf_clean.to_parquet(
        'output_data.parquet',
        engine='pyarrow',
        partition_on=['category'],
        compression='snappy'
    )

    print(f"Save complete: {time.time() - start:.2f} seconds")

print("\n=== Optimization Points ===")
print("1. Adjust partition count (according to data size)")
print("2. Set index (for faster filtering)")
print("3. Keep intermediate results in memory with persist()")
print("4. Save with Parquet (columnar storage)")
print("5. Analyze with performance report")

# Example of partition optimization
print("\n=== Partition Optimization ===")

# Original partition count
print(f"Original partition count: {ddf.npartitions}")

# Optimize (2-4 times number of workers recommended)
n_workers = len(client.scheduler_info()['workers'])
optimal_partitions = n_workers * 3

ddf_optimized = ddf.repartition(npartitions=optimal_partitions)
print(f"Optimized partition count: {ddf_optimized.npartitions}")

# Speedup by setting index
ddf_indexed = ddf_clean.set_index('category', sorted=True)
print(f"After setting index: {ddf_indexed.npartitions} partitions")

# Close cluster
client.close()

print("\nPerformance report: processing_performance.html")
print("Processing complete!")

Example Output:

=== Creating Sample Data ===
Sample data creation complete

Dask client: <Client: 'tcp://127.0.0.1:xxxxx' processes=4 threads=8>

=== Step 1: Data Loading ===
Loading complete: 0.15 seconds
Number of partitions: 10
Estimated rows: ~1,000,000 rows

=== Step 2: Remove Missing Values ===
Missing values removed: 0.01 seconds

=== Step 3: Group Aggregation ===
Aggregation complete: 1.23 seconds

Aggregation results:
          value1                    value2
            mean       std  count      mean       std  count
category
A        0.0012  0.999845 200145  -0.0008  1.000234 200145
B       -0.0023  1.001234 199876   0.0015  0.999876 199876
C        0.0034  0.998765 200234  -0.0021  1.001345 200234
D       -0.0011  1.000987 199987   0.0028  0.998654 199987
E        0.0019  0.999543 199758  -0.0013  1.000789 199758

=== Step 4: Save to Parquet ===
Save complete: 2.45 seconds

=== Optimization Points ===
1. Adjust partition count (according to data size)
2. Set index (for faster filtering)
3. Keep intermediate results in memory with persist()
4. Save with Parquet (columnar storage)
5. Analyze with performance report

=== Partition Optimization ===
Original partition count: 10
Optimized partition count: 12
After setting index: 5 partitions

Performance report: processing_performance.html
Processing complete!

References

  1. Dask Development Team. (2024). Dask: Scalable analytics in Python. https://docs.dask.org/
  2. Rocklin, M. (2015). Dask: Parallel Computation with Blocked algorithms and Task Scheduling. Proceedings of the 14th Python in Science Conference.
  3. McKinney, W. (2017). Python for Data Analysis (2nd ed.). O'Reilly Media.
  4. VanderPlas, J. (2016). Python Data Science Handbook. O'Reilly Media.
  5. Dask-ML Documentation. https://ml.dask.org/

Disclaimer