Skip to content

MapReduce

Introduction

MapReduce is a programming model and processing framework for distributed computing on large datasets across clusters of computers. Originally developed by Google, it became the foundation for Hadoop and influenced modern big data processing.

Core Concept

MapReduce Core Concept Flow

Key Principles

  • Divide and conquer - Split large problems into smaller tasks
  • Data locality - Move computation to data, not data to computation
  • Fault tolerance - Automatically handle node failures
  • Horizontal scaling - Add more machines to process more data

How MapReduce Works

The Three Phases

MapReduce Three Phases

Phase Details

Phase Input Output Purpose
Map (key, value) list of (key', value') Transform and filter
Shuffle All mapper outputs Grouped by key Organize data
Reduce (key, list of values) (key, aggregated value) Aggregate results

Classic Example: Word Count

Problem

Count occurrences of each word in a large text corpus.

Input

Document 1: "hello world hello"
Document 2: "world of code"
Document 3: "hello code"

Map Phase

def map(document_id, document_content):
    for word in document_content.split():
        emit(word, 1)

# Mapper 1 output: (hello, 1), (world, 1), (hello, 1)
# Mapper 2 output: (world, 1), (of, 1), (code, 1)
# Mapper 3 output: (hello, 1), (code, 1)

Shuffle & Sort Phase

hello → [1, 1, 1]
world → [1, 1]
code  → [1, 1]
of    → [1]

Reduce Phase

def reduce(word, counts):
    emit(word, sum(counts))

# Output:
# (hello, 3)
# (world, 2)
# (code, 2)
# (of, 1)

MapReduce Patterns

1. Filtering

Select records matching criteria.

# Map: emit only matching records
def map(key, record):
    if record.status == "active":
        emit(record.id, record)

# Reduce: identity (or no reducer needed)
def reduce(key, records):
    for record in records:
        emit(key, record)

2. Aggregation (Sum, Count, Average)

# Count by category
def map(key, record):
    emit(record.category, 1)

def reduce(category, counts):
    emit(category, sum(counts))

3. Inverted Index

Build search index from documents.

def map(doc_id, content):
    for word in content.split():
        emit(word, doc_id)

def reduce(word, doc_ids):
    emit(word, list(set(doc_ids)))

# Output: "hello" → [doc1, doc3, doc7]

4. Join (Reduce-Side Join)

# Join users with orders
def map(key, record):
    if record.type == "user":
        emit(record.user_id, ("user", record))
    elif record.type == "order":
        emit(record.user_id, ("order", record))

def reduce(user_id, records):
    users = [r for (t, r) in records if t == "user"]
    orders = [r for (t, r) in records if t == "order"]
    for user in users:
        for order in orders:
            emit(user_id, merge(user, order))

5. Top-K

# Map: local top-k
def map(key, record):
    # Maintain local heap of top-k
    local_top_k.add(record)
    # Emit all to single reducer at end

def reduce(key, records):
    # Global top-k from all local top-k
    global_top_k = heap_merge(records, k)
    emit(global_top_k)

Combiners (Local Reduce)

What is a Combiner?

A mini-reducer that runs on mapper output before shuffle.

Combiner Comparison

Benefits

  • Reduces network I/O
  • Reduces shuffle data
  • Faster processing

When to Use

Only for associative and commutative operations: - Sum, Count, Max, Min: ✅ - Average: ❌ (need sum AND count) - Median: ❌

# Combiner for word count (same as reducer)
def combiner(word, counts):
    emit(word, sum(counts))

Handling Failures

Worker Failure

  • Master pings workers periodically
  • If worker fails:
  • Re-execute completed map tasks (output was on local disk)
  • Re-execute in-progress tasks
  • Completed reduce tasks don't need re-execution (output in distributed FS)

Master Failure

  • Checkpoints master state
  • New master resumes from checkpoint
  • Or restart entire job (rare in practice)

Stragglers

  • Problem: Slow machines delay entire job
  • Solution: Backup tasks - run duplicate tasks near completion
  • First to finish wins

Performance Optimization

Data Locality

Preference order:
1. Same machine as data → No network transfer
2. Same rack as data → Low network cost
3. Different rack → High network cost

Partitioning

# Default: Hash partitioning
partition = hash(key) % num_reducers

# Custom partitioner for skewed data
def partition(key):
    if key in hot_keys:
        return hash(key + random()) % num_reducers
    return hash(key) % num_reducers

Handling Skew

  • Hot keys: Split across multiple reducers
  • Salting: Add random suffix to hot keys
  • Sampling: Identify skewed keys beforehand

MapReduce vs Modern Alternatives

Feature MapReduce Spark Flink
Processing Batch Batch + Interactive Stream + Batch
Speed Slower (disk I/O) 10-100x faster (in-memory) Real-time
Iterations Poor (many jobs) Excellent (cache RDDs) Excellent
Ease of use Low-level High-level APIs High-level APIs
Use case Large batch ETL Interactive, ML Streaming

When to Still Use MapReduce

  • Simple batch jobs
  • Extremely large datasets (disk-based is more stable)
  • Existing Hadoop infrastructure
  • Cost-sensitive (memory is expensive)

Common Interview Questions

1. How would you implement distributed sort?

Map phase:
- Sample data to determine range boundaries
- Partition data into ranges

Reduce phase:
- Each reducer gets one range
- Sort within reducer
- Concatenate reducer outputs

2. How to compute average with MapReduce?

# Wrong: Can't use combiner with simple average

# Correct: Emit sum and count
def map(key, value):
    emit("avg", (value, 1))

def combiner(key, values):
    total_sum = sum(v[0] for v in values)
    total_count = sum(v[1] for v in values)
    emit(key, (total_sum, total_count))

def reduce(key, values):
    total_sum = sum(v[0] for v in values)
    total_count = sum(v[1] for v in values)
    emit(key, total_sum / total_count)

3. How to handle hot keys?

1. Salting: Add random suffix to hot keys
   "hot_key" → "hot_key_0", "hot_key_1", ...

2. Two-phase aggregation:
   Phase 1: Partial aggregate with salted keys
   Phase 2: Final aggregate combining salted results

3. Broadcast join: For small dimension tables

4. Map-Side Join vs Reduce-Side Join?

Map-Side Join:
- Small table fits in memory
- Load small table in each mapper
- No shuffle needed
- Very fast

Reduce-Side Join:
- Both tables are large
- Shuffle by join key
- Join in reducer
- Expensive but necessary

Real-World Applications

Application Map Function Reduce Function
Web indexing Extract (word, URL) Build inverted index
Log analysis Parse log, emit (error_type, 1) Count errors
Recommendation (user, item_viewed) Build user profiles
Graph PageRank Distribute rank to neighbors Sum incoming ranks
ETL Transform records Aggregate/deduplicate

Implementation Example (Python-like Pseudocode)

class MapReduceJob:
    def __init__(self, input_path, output_path, num_reducers):
        self.input_path = input_path
        self.output_path = output_path
        self.num_reducers = num_reducers

    def map(self, key, value):
        """Override this method"""
        raise NotImplementedError

    def reduce(self, key, values):
        """Override this method"""
        raise NotImplementedError

    def combiner(self, key, values):
        """Optional: Override for local aggregation"""
        return self.reduce(key, values)

    def partition(self, key):
        """Override for custom partitioning"""
        return hash(key) % self.num_reducers


class WordCount(MapReduceJob):
    def map(self, doc_id, content):
        for word in content.lower().split():
            yield (word, 1)

    def reduce(self, word, counts):
        yield (word, sum(counts))

    def combiner(self, word, counts):
        yield (word, sum(counts))

Key Takeaways

  1. Map: Transform input into (key, value) pairs
  2. Shuffle: Group by key across the cluster
  3. Reduce: Aggregate values for each key
  4. Combiners: Local pre-aggregation to reduce network I/O
  5. Fault tolerance: Built-in through re-execution
  6. Scaling: Add more machines for more data
  7. Limitations: Not great for iterative algorithms, real-time