🌐 EN | 🇯🇵 JP | Last sync: 2025-11-16

Chapter 1: Fundamentals of Large-Scale Data Processing

Understanding the Principles of Scalability and Distributed Processing

📖 Reading Time: 25-30 minutes 📊 Difficulty: Beginner to Intermediate 💻 Code Examples: 7 📝 Exercises: 5

This chapter covers the fundamentals of Fundamentals of Large, which scale data processing. You will learn types of parallelization strategies, challenges in distributed systems, and major distributed processing tools.

Learning Objectives

By reading this chapter, you will master the following:


1.1 Scalability Challenges

Data Size Growth

In modern machine learning projects, data volumes are growing explosively.

"Facing data volumes that cannot be processed on a single machine is no longer an exception but the norm."

Data Scale Size Range Processing Method
Small-scale ~1GB Single-machine in-memory processing
Medium-scale 1GB~100GB Memory optimization, chunk processing
Large-scale 100GB~1TB Distributed processing, parallelization
Ultra-large-scale 1TB+ Clusters, distributed file systems

Memory Constraints

The most common problem is that data doesn't fit in memory.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

import numpy as np
import sys

# Check memory usage
def memory_usage_mb(data):
    """Return memory usage of data in MB"""
    return sys.getsizeof(data) / (1024 ** 2)

# Large-scale data example
n_samples = 10_000_000  # 10 million samples
n_features = 100

# Normal array (loading all data into memory)
# This consumes approximately 7.5GB of memory
# data = np.random.random((n_samples, n_features))  # Potential out-of-memory error

# Check with smaller data
n_samples_small = 1_000_000  # 1 million samples
data_small = np.random.random((n_samples_small, n_features))

print("=== Memory Usage Check ===")
print(f"Data shape: {data_small.shape}")
print(f"Memory usage: {memory_usage_mb(data_small):.2f} MB")
print(f"\nEstimate: For {n_samples:,} samples")
print(f"Memory usage: {memory_usage_mb(data_small) * 10:.2f} MB ({memory_usage_mb(data_small) * 10 / 1024:.2f} GB)")

Output:

=== Memory Usage Check ===
Data shape: (1000000, 100)
Memory usage: 762.94 MB

Estimate: For 10,000,000 samples
Memory usage: 7629.40 MB (7.45 GB)

Computation Time Issues

As data size increases, computation time increases non-linearly.

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0

"""
Example: As data size increases, computation time increases non-linea

Purpose: Demonstrate data visualization techniques
Target: Intermediate
Execution time: 2-5 seconds
Dependencies: None
"""

import time
import numpy as np
import matplotlib.pyplot as plt

# Measure computation time for different sizes
sizes = [1000, 5000, 10000, 50000, 100000]
times = []

print("=== Computation Time Measurement ===")
for size in sizes:
    X = np.random.random((size, 100))

    start = time.time()
    # Simple matrix operation
    result = X @ X.T
    elapsed = time.time() - start

    times.append(elapsed)
    print(f"Size {size:6d}: {elapsed:.4f}s")

# Visualization
plt.figure(figsize=(10, 6))
plt.plot(sizes, times, marker='o', linewidth=2, markersize=8)
plt.xlabel('Data Size (Number of Samples)', fontsize=12)
plt.ylabel('Computation Time (seconds)', fontsize=12)
plt.title('Relationship Between Data Size and Computation Time', fontsize=14)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# Estimate time complexity
print(f"\nTime increase ratio for 10x size increase: {times[-1] / times[0]:.1f}x")

I/O Bottlenecks

Disk I/O is a major bottleneck in large-scale data processing.

Storage Type Read Speed Relative Performance
Memory (RAM) ~50 GB/s 1,000x
SSD ~500 MB/s 10x
HDD ~100 MB/s 1x (baseline)
Network (1Gbps) ~125 MB/s 1.25x

1.2 Distributed Processing Concepts

Horizontal Scaling vs Vertical Scaling

There are two approaches to achieving scalability.

Type Description Advantages Disadvantages
Vertical Scaling
(Scale Up)
Improving single machine performance
(CPU, memory, storage enhancement)
・Simple implementation
・No communication overhead
・Physical limitations exist
・Cost increases non-linearly
Horizontal Scaling
(Scale Out)
Distributed processing across multiple machines
(Increasing number of nodes)
・Theoretically infinitely scalable
・Improved fault tolerance
・Complex implementation
・Communication costs occur

Practical Selection: Typically, vertical scaling is pursued to its limit before transitioning to horizontal scaling.

Master-Worker Architecture

The most common pattern in distributed processing is the Master-Worker architecture.

graph TD M[Master Node
Task Distribution & Result Aggregation] --> W1[Worker 1
Partial Computation] M --> W2[Worker 2
Partial Computation] M --> W3[Worker 3
Partial Computation] M --> W4[Worker 4
Partial Computation] W1 --> R[Result Integration] W2 --> R W3 --> R W4 --> R style M fill:#9d4edd style W1 fill:#e3f2fd style W2 fill:#e3f2fd style W3 fill:#e3f2fd style W4 fill:#e3f2fd style R fill:#c8e6c9

Role Division

Data Partitioning and Sharding

Sharding is the technique of dividing data into multiple partitions.

Partitioning Strategies

Strategy Description Use Cases
Horizontal Partitioning Split by rows Time-series data, user data
Vertical Partitioning Split by columns Cases with many features
Hash-based Split by key hash value Requires uniform distribution
Range-based Split by value ranges Sorted data
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0
# - pandas>=2.0.0, <2.2.0

"""
Example: Partitioning Strategies

Purpose: Demonstrate data manipulation and preprocessing
Target: Intermediate
Execution time: 10-30 seconds
Dependencies: None
"""

import numpy as np
import pandas as pd

# Sample data
n_samples = 1000
data = pd.DataFrame({
    'user_id': np.random.randint(1, 100, n_samples),
    'timestamp': pd.date_range('2024-01-01', periods=n_samples, freq='1min'),
    'value': np.random.random(n_samples)
})

# 1. Horizontal partitioning (by rows)
n_partitions = 4
partition_size = len(data) // n_partitions

horizontal_shards = []
for i in range(n_partitions):
    start = i * partition_size
    end = start + partition_size if i < n_partitions - 1 else len(data)
    shard = data.iloc[start:end]
    horizontal_shards.append(shard)
    print(f"Shard {i+1}: {len(shard)} rows")

# 2. Hash-based partitioning
def hash_partition(user_id, n_partitions):
    return hash(user_id) % n_partitions

data['partition'] = data['user_id'].apply(lambda x: hash_partition(x, n_partitions))

hash_shards = []
for i in range(n_partitions):
    shard = data[data['partition'] == i]
    hash_shards.append(shard)
    print(f"Hash shard {i+1}: {len(shard)} rows")

# Check distribution
print("\n=== Check Partitioning Balance ===")
hash_sizes = [len(shard) for shard in hash_shards]
print(f"Min: {min(hash_sizes)}, Max: {max(hash_sizes)}, Mean: {np.mean(hash_sizes):.1f}")

Distributed Processing Architecture Diagram

graph LR subgraph "Input Data" D[Large Dataset
1TB] end subgraph "Distributed Storage" S1[Shard 1
250GB] S2[Shard 2
250GB] S3[Shard 3
250GB] S4[Shard 4
250GB] end subgraph "Parallel Processing" P1[Process 1] P2[Process 2] P3[Process 3] P4[Process 4] end subgraph "Result Aggregation" A[Aggregation & Integration] end D --> S1 D --> S2 D --> S3 D --> S4 S1 --> P1 S2 --> P2 S3 --> P3 S4 --> P4 P1 --> A P2 --> A P3 --> A P4 --> A style D fill:#ffebee style S1 fill:#e3f2fd style S2 fill:#e3f2fd style S3 fill:#e3f2fd style S4 fill:#e3f2fd style P1 fill:#fff3e0 style P2 fill:#fff3e0 style P3 fill:#fff3e0 style P4 fill:#fff3e0 style A fill:#c8e6c9

1.3 Parallelization Strategies

Data Parallelism

Data Parallelism involves splitting data and executing the same processing on each partition in parallel.

This is the most common and easiest-to-implement parallelization technique.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

import numpy as np
import multiprocessing as mp
import time

# Processing function
def process_chunk(data_chunk):
    """Process data chunk (example: mean calculation)"""
    return np.mean(data_chunk, axis=0)

# Single-process version
def single_process_compute(data):
    start = time.time()
    result = np.mean(data, axis=0)
    elapsed = time.time() - start
    return result, elapsed

# Multi-process version (data parallelism)
def multi_process_compute(data, n_workers=4):
    start = time.time()

    # Split data
    chunks = np.array_split(data, n_workers)

    # Parallel processing
    with mp.Pool(n_workers) as pool:
        results = pool.map(process_chunk, chunks)

    # Integrate results
    final_result = np.mean(results, axis=0)
    elapsed = time.time() - start

    return final_result, elapsed

# Test
if __name__ == '__main__':
    # Sample data
    data = np.random.random((10_000_000, 10))

    print("=== Data Parallelism Comparison ===")
    print(f"Data size: {data.shape}")

    # Single-process
    result_single, time_single = single_process_compute(data)
    print(f"\nSingle-process: {time_single:.4f}s")

    # Multi-process
    n_workers = mp.cpu_count()
    result_multi, time_multi = multi_process_compute(data, n_workers)
    print(f"Multi-process ({n_workers} workers): {time_multi:.4f}s")

    # Speedup
    speedup = time_single / time_multi
    print(f"\nSpeedup: {speedup:.2f}x")
    print(f"Efficiency: {speedup / n_workers * 100:.1f}%")

Model Parallelism

Model Parallelism involves splitting the model itself and distributing it across multiple devices.

Used for large-scale neural networks:

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

import numpy as np

# Conceptual example: splitting large models
class DistributedModel:
    """Conceptual implementation of model parallelism"""

    def __init__(self, layer_sizes):
        self.layers = []
        for i in range(len(layer_sizes) - 1):
            # Place each layer on different devices (here, arrays)
            weight = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
            self.layers.append({
                'weight': weight,
                'device': f'GPU_{i % 4}'  # Distributed across 4 GPUs
            })

    def forward(self, x):
        """Forward propagation (each layer executed on different device)"""
        activation = x
        for i, layer in enumerate(self.layers):
            print(f"Layer {i+1} executing on {layer['device']}")
            activation = np.dot(activation, layer['weight'])
            activation = np.maximum(0, activation)  # ReLU
        return activation

# Usage example
print("=== Model Parallelism Example ===")
layer_sizes = [1000, 2000, 2000, 2000, 100]  # Large model
model = DistributedModel(layer_sizes)

# Input data
x = np.random.randn(1, 1000)
output = model.forward(x)

print(f"\nInput shape: {x.shape}")
print(f"Output shape: {output.shape}")
print(f"Total parameters: {sum(layer['weight'].size for layer in model.layers):,}")

Pipeline Parallelism

Pipeline Parallelism involves splitting processing into multiple stages and executing each stage in parallel.

import multiprocessing as mp
from queue import Queue
import time

# Pipeline stages
def stage1_preprocess(input_queue, output_queue):
    """Stage 1: Preprocessing"""
    while True:
        item = input_queue.get()
        if item is None:
            output_queue.put(None)
            break
        # Preprocessing (example: normalization)
        processed = item / 255.0
        output_queue.put(processed)

def stage2_feature_extract(input_queue, output_queue):
    """Stage 2: Feature extraction"""
    while True:
        item = input_queue.get()
        if item is None:
            output_queue.put(None)
            break
        # Feature extraction (example: calculate statistics)
        features = [item.mean(), item.std(), item.max(), item.min()]
        output_queue.put(features)

def stage3_predict(input_queue, results):
    """Stage 3: Prediction"""
    while True:
        item = input_queue.get()
        if item is None:
            break
        # Prediction (simplified version)
        prediction = sum(item) > 2.0
        results.append(prediction)

# Pipeline parallelization implementation example (conceptual)
print("=== Pipeline Parallelism Concept ===")
print("Stage 1: Preprocessing → Stage 2: Feature Extraction → Stage 3: Prediction")
print("\nBy executing each stage in different processes in parallel,")
print("throughput improves.")

Comparison of Parallelization Strategies

Strategy Application Scenarios Advantages Disadvantages
Data Parallelism Large-scale data, same processing Easy implementation, scalable Communication cost, memory duplication
Model Parallelism Large-scale models, GPU constraints Avoids memory constraints Complex implementation, inter-device communication
Pipeline Parallelism Multi-stage processing, ETL Improved throughput Increased latency, balance adjustment

1.4 Challenges in Distributed Systems

Communication Costs

The largest overhead in distributed processing is inter-node communication.

Amdahl's Law: The non-parallelizable portions (such as communication) limit overall performance.

$$ \text{Speedup} = \frac{1}{(1-P) + \frac{P}{N}} $$

# Requirements:
# - Python 3.9+
# - matplotlib>=3.7.0
# - numpy>=1.24.0, <2.0.0

import numpy as np
import matplotlib.pyplot as plt

# Visualize Amdahl's Law
def amdahl_speedup(P, N):
    """Calculate speedup according to Amdahl's Law"""
    return 1 / ((1 - P) + P / N)

# Cases with different parallelization rates
P_values = [0.5, 0.75, 0.9, 0.95, 0.99]
N_range = np.arange(1, 65)

plt.figure(figsize=(10, 6))
for P in P_values:
    speedups = [amdahl_speedup(P, N) for N in N_range]
    plt.plot(N_range, speedups, label=f'P = {P:.0%}', linewidth=2)

plt.xlabel('Number of Processors', fontsize=12)
plt.ylabel('Speedup', fontsize=12)
plt.title("Amdahl's Law: Parallelization Rate and Performance", fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print("=== Implications of Amdahl's Law ===")
print("With low parallelization rate, performance improvement is limited even with more processors")
print(f"Example: P=90%, 64 processors gives max {amdahl_speedup(0.9, 64):.1f}x speedup")

Synchronous vs Asynchronous

Method Description Advantages Disadvantages
Synchronous Processing Wait for all workers to complete Simple implementation, consistency guaranteed Depends on slowest worker
Asynchronous Processing Proceed to next processing without waiting Improved throughput Complex consistency management

Fault Tolerance

In distributed systems, node and network failures are unavoidable.

Fault Handling Techniques

Debugging Difficulties

Debugging distributed systems is difficult for the following reasons:


1.5 Tools and Ecosystem

Apache Hadoop / Spark

Apache Hadoop and Apache Spark are de facto standards for large-scale data processing.

Tool Features Use Cases
Hadoop ・MapReduce-based
・Disk-centric processing
・Suited for batch processing
Large-scale ETL, log analysis
Spark ・In-memory processing
・High-speed (100x faster than Hadoop)
・Machine learning library (MLlib)
Iterative computation, machine learning
# Conceptual usage example of Apache Spark (PySpark)
# Note: Requires Spark installation

"""
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("LargeScaleProcessing") \
    .getOrCreate()

# Load large-scale data
df = spark.read.parquet("hdfs://path/to/large/data")

# Distributed processing
result = df.groupBy("category") \
    .agg({"value": "mean"}) \
    .orderBy("category")

# Save results
result.write.parquet("hdfs://path/to/output")

spark.stop()
"""

print("=== Apache Spark Features ===")
print("1. Lazy Evaluation: Automatically generates optimal execution plan")
print("2. In-memory processing: Cache intermediate results in memory")
print("3. Fault tolerance: Automatic recovery via RDD (Resilient Distributed Dataset)")
print("4. Unified API: Handle SQL, machine learning, graph processing uniformly")

Dask

Dask is a Python-native parallel processing library.

# Requirements:
# - Python 3.9+
# - dask>=2023.5.0
# - numpy>=1.24.0, <2.0.0

import numpy as np

# Conceptual usage example of Dask
"""
import dask.array as da
import dask.dataframe as dd

# Dask array (NumPy-like API)
x = da.random.random((100000, 10000), chunks=(1000, 1000))
result = x.mean(axis=0).compute()  # Lazy evaluation → execution

# Dask DataFrame (pandas-like API)
df = dd.read_csv('large_file_*.csv')
result = df.groupby('category').value.mean().compute()
"""

print("=== Dask Features ===")
print("1. NumPy/pandas compatible API: Easy migration of existing code")
print("2. Task graph: Automatically manages processing dependencies")
print("3. Scalable: Seamlessly scale from single machine → cluster")
print("4. Integration with Python ecosystem: scikit-learn, XGBoost, etc.")

# Simple Dask-style processing example (conceptual)
print("\n=== Chunk Processing Example ===")
data = np.random.random((10000, 100))
chunk_size = 1000
results = []

for i in range(0, len(data), chunk_size):
    chunk = data[i:i+chunk_size]
    result = np.mean(chunk, axis=0)
    results.append(result)

final_result = np.mean(results, axis=0)
print(f"Number of chunks: {len(results)}")
print(f"Final result shape: {final_result.shape}")

Ray

Ray is a unified framework for distributed applications.

# Requirements:
# - Python 3.9+
# - ray>=2.5.0

# Conceptual usage example of Ray
"""
import ray

# Initialize Ray
ray.init()

# Remote function
@ray.remote
def process_data(data):
    return data.sum()

# Parallel execution
data_chunks = [np.random.random(1000) for _ in range(10)]
futures = [process_data.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)  # Get results

ray.shutdown()
"""

print("=== Ray Features ===")
print("1. Low-level control: Flexible parallelization with task/actor model")
print("2. High performance: Distributed scheduling and shared memory")
print("3. Ecosystem: Ray Tune (hyperparameter tuning), RLlib (reinforcement learning)")
print("4. Ease of use: Easy parallelization with Python decorators")

Selection Criteria

Situation Recommended Tool Reason
Large-scale batch processing (TB-scale) Apache Spark Mature ecosystem, fault tolerance
Python-centric development Dask NumPy/pandas compatibility, low learning curve
Complex distributed apps Ray Flexible control, high performance
Single-machine acceleration multiprocessing, joblib Simple, no additional installation required
Machine learning pipelines Spark MLlib, Ray Tune Integrated machine learning tools

1.6 Chapter Summary

What We Learned

  1. Scalability Challenges

    • Constraints in data size, memory, computation time, and I/O
    • Single-machine limitations and the need for distributed processing
  2. Distributed Processing Concepts

    • Horizontal scaling vs vertical scaling
    • Master-worker architecture
    • Data partitioning and sharding strategies
  3. Parallelization Strategies

    • Data parallelism: Most common, easy to implement
    • Model parallelism: For large-scale models
    • Pipeline parallelism: Effective for multi-stage processing
  4. Distributed System Challenges

    • Communication costs and Amdahl's Law
    • Synchronous vs asynchronous trade-offs
    • Fault tolerance and debugging difficulties
  5. Tools and Ecosystem

    • Hadoop/Spark: Large-scale batch processing
    • Dask: Python-centric parallelization
    • Ray: Flexible distributed applications

Important Principles

Principle Description
Optimization Order First algorithm, then implementation, finally parallelization
Minimize Communication Reducing inter-node communication is key to performance improvement
Appropriate Granularity Too fine-grained task splitting increases overhead
Measurement-Driven Don't guess, measure actual performance and decide
Gradual Scaling Validate at small scale before scaling up

Next Chapter

In Chapter 2, we will learn MapReduce and Spark Fundamentals:


Exercises

Problem 1 (Difficulty: easy)

Explain the differences between horizontal scaling and vertical scaling, and describe their respective advantages and disadvantages.

Sample Answer

Answer:

Vertical Scaling (Scale Up):

Horizontal Scaling (Scale Out):

Practical Selection: Typically, vertical scaling is pursued to its limit before transitioning to horizontal scaling.

Problem 2 (Difficulty: medium)

Using Amdahl's Law, calculate the speedup when running a program with 80% parallelization rate on 16 processors.

Sample Answer
def amdahl_speedup(P, N):
    """
    Calculate speedup according to Amdahl's Law

    Parameters:
    P: Proportion of parallelizable portion (0~1)
    N: Number of processors

    Returns:
    Speedup factor
    """
    return 1 / ((1 - P) + P / N)

# Problem calculation
P = 0.8  # 80%
N = 16   # 16 processors

speedup = amdahl_speedup(P, N)

print("=== Calculation by Amdahl's Law ===")
print(f"Parallelization rate: {P:.0%}")
print(f"Number of processors: {N}")
print(f"Speedup: {speedup:.2f}x")
print(f"\nExplanation:")
print(f"Theoretical maximum speedup (infinite processors): {1/(1-P):.2f}x")
print(f"Efficiency: {speedup/N*100:.1f}%")

Output:

=== Calculation by Amdahl's Law ===
Parallelization rate: 80%
Number of processors: 16
Speedup: 4.21x

Explanation:
Theoretical maximum speedup (infinite processors): 5.00x
Efficiency: 26.3%

Formula:

$$ \text{Speedup} = \frac{1}{(1-0.8) + \frac{0.8}{16}} = \frac{1}{0.2 + 0.05} = \frac{1}{0.25} = 4 $$

Explanation:

Problem 3 (Difficulty: medium)

Parallelize the following code using multiprocessing for data parallelism.

# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

"""
Example: Parallelize the following code using multiprocessing for dat

Purpose: Demonstrate core concepts and implementation patterns
Target: Beginner to Intermediate
Execution time: 5-10 seconds
Dependencies: None
"""

import numpy as np

# Data to process
data = np.random.random((1_000_000, 10))

# Calculate statistics for each row
result = []
for row in data:
    stats = {
        'mean': row.mean(),
        'std': row.std(),
        'max': row.max(),
        'min': row.min()
    }
    result.append(stats)
Sample Answer
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

import numpy as np
import multiprocessing as mp
import time

# Processing function
def compute_stats(chunk):
    """Calculate statistics for data chunk"""
    result = []
    for row in chunk:
        stats = {
            'mean': row.mean(),
            'std': row.std(),
            'max': row.max(),
            'min': row.min()
        }
        result.append(stats)
    return result

# Single-process version
def single_process_version(data):
    start = time.time()
    result = []
    for row in data:
        stats = {
            'mean': row.mean(),
            'std': row.std(),
            'max': row.max(),
            'min': row.min()
        }
        result.append(stats)
    elapsed = time.time() - start
    return result, elapsed

# Multi-process version (data parallelism)
def multi_process_version(data, n_workers=4):
    start = time.time()

    # Split data into chunks
    chunks = np.array_split(data, n_workers)

    # Parallel processing
    with mp.Pool(n_workers) as pool:
        results = pool.map(compute_stats, chunks)

    # Integrate results
    final_result = []
    for chunk_result in results:
        final_result.extend(chunk_result)

    elapsed = time.time() - start
    return final_result, elapsed

if __name__ == '__main__':
    # Test data
    data = np.random.random((100_000, 10))  # Adjusted size

    print("=== Data Parallelism Implementation ===")
    print(f"Data shape: {data.shape}")

    # Single-process
    result_single, time_single = single_process_version(data)
    print(f"\nSingle-process: {time_single:.4f}s")

    # Multi-process
    n_workers = mp.cpu_count()
    result_multi, time_multi = multi_process_version(data, n_workers)
    print(f"Multi-process ({n_workers} workers): {time_multi:.4f}s")

    # Validation
    assert len(result_single) == len(result_multi), "Result lengths differ"
    print(f"\nSpeedup: {time_single / time_multi:.2f}x")
    print(f"✓ Parallelization successful")

Key Points:

Problem 4 (Difficulty: hard)

When processing a dataset of 10 million rows, implement chunk processing that considers memory constraints. Set each chunk to 1 million rows and aggregate the results.

Sample Answer
# Requirements:
# - Python 3.9+
# - numpy>=1.24.0, <2.0.0

import numpy as np
import time

def process_chunk(chunk):
    """Process each chunk (example: mean, std, sum)"""
    stats = {
        'mean': chunk.mean(axis=0),
        'std': chunk.std(axis=0),
        'sum': chunk.sum(axis=0),
        'count': len(chunk)
    }
    return stats

def aggregate_results(chunk_results):
    """Final aggregation of chunk results"""
    # Calculate total sum
    total_sum = sum(r['sum'] for r in chunk_results)
    total_count = sum(r['count'] for r in chunk_results)

    # Global mean
    global_mean = total_sum / total_count

    # Global standard deviation (weighted average)
    # More accurately, square root of weighted average of variances
    weighted_var = sum(
        r['count'] * (r['std']**2 + (r['mean'] - global_mean)**2)
        for r in chunk_results
    ) / total_count
    global_std = np.sqrt(weighted_var)

    return {
        'global_mean': global_mean,
        'global_std': global_std,
        'total_count': total_count
    }

def chunked_processing(n_samples=10_000_000, n_features=10, chunk_size=1_000_000):
    """Memory-efficient chunk processing"""
    print(f"=== Chunk Processing ===")
    print(f"Total samples: {n_samples:,}")
    print(f"Chunk size: {chunk_size:,}")
    print(f"Number of chunks: {n_samples // chunk_size}")

    start = time.time()
    chunk_results = []

    # Process by chunks
    for i in range(0, n_samples, chunk_size):
        # Determine chunk size
        current_chunk_size = min(chunk_size, n_samples - i)

        # Generate chunk data (in practice, read from file)
        chunk = np.random.random((current_chunk_size, n_features))

        # Process chunk
        result = process_chunk(chunk)
        chunk_results.append(result)

        print(f"Chunk {len(chunk_results)}: {current_chunk_size:,} rows processed")

    # Aggregate results
    final_result = aggregate_results(chunk_results)

    elapsed = time.time() - start

    print(f"\n=== Processing Results ===")
    print(f"Processing time: {elapsed:.2f}s")
    print(f"Global mean (first 3 dimensions): {final_result['global_mean'][:3]}")
    print(f"Global std (first 3 dimensions): {final_result['global_std'][:3]}")
    print(f"Total samples: {final_result['total_count']:,}")

    # Check memory efficiency
    import sys
    chunk_memory = sys.getsizeof(np.random.random((chunk_size, n_features))) / (1024**2)
    print(f"\nMemory usage per chunk: {chunk_memory:.2f} MB")
    print(f"(Processing with 1/{n_samples//chunk_size} of memory compared to loading all data at once)")

if __name__ == '__main__':
    # Execute (reduced size since actual size takes time)
    chunked_processing(n_samples=1_000_000, n_features=10, chunk_size=100_000)

Sample Output:

=== Chunk Processing ===
Total samples: 1,000,000
Chunk size: 100,000
Number of chunks: 10
Chunk 1: 100,000 rows processed
Chunk 2: 100,000 rows processed
...
Chunk 10: 100,000 rows processed

=== Processing Results ===
Processing time: 2.45s
Global mean (first 3 dimensions): [0.500 0.499 0.501]
Global std (first 3 dimensions): [0.289 0.288 0.290]
Total samples: 1,000,000

Memory usage per chunk: 7.63 MB
(Processing with 1/10 of memory compared to loading all data at once)

Key Points:

Problem 5 (Difficulty: hard)

Regarding the three strategies of data parallelism, model parallelism, and pipeline parallelism, explain their respective application scenarios and considerations when combining them.

Sample Answer

Answer:

1. Data Parallelism

Application Scenarios:

Examples:

2. Model Parallelism

Application Scenarios:

Examples:

3. Pipeline Parallelism

Application Scenarios:

Examples:

Considerations for Combined Use

1. Data Parallelism + Model Parallelism:

"""
Ultra-large-scale model training

Example: GPT-3 training
- Model parallelism: Split each layer across multiple GPUs
- Data parallelism: Process different mini-batches on multiple model replicas
- Result: Can efficiently train on thousands of GPUs
"""

Considerations:

2. Data Parallelism + Pipeline Parallelism:

"""
Large-scale ETL and machine learning pipeline

Example: Streaming prediction system
- Pipeline: Data acquisition → Preprocessing → Inference → Postprocessing
- Data parallel: Multiple workers execute in parallel at each stage
- Result: High-throughput prediction system
"""

Considerations:

3. Combining All Three:

"""
Ultra-large-scale distributed training system

Example: Large-scale recommendation system
- Data parallel: Process different user segments in parallel
- Model parallel: Distribute embedding layers across multiple GPUs
- Pipeline: Stage-wise feature extraction → Model training → Evaluation
"""

Considerations:

Selection Guidelines

Bottleneck Recommended Strategy Priority
Large data size Data parallelism 1st
Large model size Model parallelism 1st
Many processing stages Pipeline parallelism 2nd
Both large Data+Model parallelism Gradual
Real-time requirements Pipeline parallelism 1st

Implementation Principles:

  1. First optimize with single strategy
  2. Measure and identify next bottleneck
  3. Introduce additional strategies as needed
  4. Always measure overall system efficiency

References

  1. Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified data processing on large clusters. Communications of the ACM, 51(1), 107-113.
  2. Zaharia, M., et al. (2016). Apache Spark: A unified engine for big data processing. Communications of the ACM, 59(11), 56-65.
  3. Moritz, P., et al. (2018). Ray: A distributed framework for emerging AI applications. OSDI, 561-577.
  4. Rocklin, M. (2015). Dask: Parallel computation with blocked algorithms and task scheduling. SciPy, 126-132.
  5. Barroso, L. A., Clidaras, J., & Hölzle, U. (2013). The datacenter as a computer (2nd ed.). Morgan & Claypool Publishers.

Disclaimer