MapReduce¶
Introduction¶
MapReduce is a programming model and processing framework for distributed computing on large datasets across clusters of computers. Originally developed by Google, it became the foundation for Hadoop and influenced modern big data processing.
Core Concept¶
Key Principles¶
- Divide and conquer - Split large problems into smaller tasks
- Data locality - Move computation to data, not data to computation
- Fault tolerance - Automatically handle node failures
- Horizontal scaling - Add more machines to process more data
How MapReduce Works¶
The Three Phases¶
Phase Details¶
| Phase | Input | Output | Purpose |
|---|---|---|---|
| Map | (key, value) | list of (key', value') | Transform and filter |
| Shuffle | All mapper outputs | Grouped by key | Organize data |
| Reduce | (key, list of values) | (key, aggregated value) | Aggregate results |
Classic Example: Word Count¶
Problem¶
Count occurrences of each word in a large text corpus.
Input¶
Map Phase¶
def map(document_id, document_content):
for word in document_content.split():
emit(word, 1)
# Mapper 1 output: (hello, 1), (world, 1), (hello, 1)
# Mapper 2 output: (world, 1), (of, 1), (code, 1)
# Mapper 3 output: (hello, 1), (code, 1)
Shuffle & Sort Phase¶
Reduce Phase¶
def reduce(word, counts):
emit(word, sum(counts))
# Output:
# (hello, 3)
# (world, 2)
# (code, 2)
# (of, 1)
MapReduce Patterns¶
1. Filtering¶
Select records matching criteria.
# Map: emit only matching records
def map(key, record):
if record.status == "active":
emit(record.id, record)
# Reduce: identity (or no reducer needed)
def reduce(key, records):
for record in records:
emit(key, record)
2. Aggregation (Sum, Count, Average)¶
# Count by category
def map(key, record):
emit(record.category, 1)
def reduce(category, counts):
emit(category, sum(counts))
3. Inverted Index¶
Build search index from documents.
def map(doc_id, content):
for word in content.split():
emit(word, doc_id)
def reduce(word, doc_ids):
emit(word, list(set(doc_ids)))
# Output: "hello" → [doc1, doc3, doc7]
4. Join (Reduce-Side Join)¶
# Join users with orders
def map(key, record):
if record.type == "user":
emit(record.user_id, ("user", record))
elif record.type == "order":
emit(record.user_id, ("order", record))
def reduce(user_id, records):
users = [r for (t, r) in records if t == "user"]
orders = [r for (t, r) in records if t == "order"]
for user in users:
for order in orders:
emit(user_id, merge(user, order))
5. Top-K¶
# Map: local top-k
def map(key, record):
# Maintain local heap of top-k
local_top_k.add(record)
# Emit all to single reducer at end
def reduce(key, records):
# Global top-k from all local top-k
global_top_k = heap_merge(records, k)
emit(global_top_k)
Combiners (Local Reduce)¶
What is a Combiner?¶
A mini-reducer that runs on mapper output before shuffle.
Benefits¶
- Reduces network I/O
- Reduces shuffle data
- Faster processing
When to Use¶
Only for associative and commutative operations: - Sum, Count, Max, Min: ✅ - Average: ❌ (need sum AND count) - Median: ❌
Handling Failures¶
Worker Failure¶
- Master pings workers periodically
- If worker fails:
- Re-execute completed map tasks (output was on local disk)
- Re-execute in-progress tasks
- Completed reduce tasks don't need re-execution (output in distributed FS)
Master Failure¶
- Checkpoints master state
- New master resumes from checkpoint
- Or restart entire job (rare in practice)
Stragglers¶
- Problem: Slow machines delay entire job
- Solution: Backup tasks - run duplicate tasks near completion
- First to finish wins
Performance Optimization¶
Data Locality¶
Preference order:
1. Same machine as data → No network transfer
2. Same rack as data → Low network cost
3. Different rack → High network cost
Partitioning¶
# Default: Hash partitioning
partition = hash(key) % num_reducers
# Custom partitioner for skewed data
def partition(key):
if key in hot_keys:
return hash(key + random()) % num_reducers
return hash(key) % num_reducers
Handling Skew¶
- Hot keys: Split across multiple reducers
- Salting: Add random suffix to hot keys
- Sampling: Identify skewed keys beforehand
MapReduce vs Modern Alternatives¶
| Feature | MapReduce | Spark | Flink |
|---|---|---|---|
| Processing | Batch | Batch + Interactive | Stream + Batch |
| Speed | Slower (disk I/O) | 10-100x faster (in-memory) | Real-time |
| Iterations | Poor (many jobs) | Excellent (cache RDDs) | Excellent |
| Ease of use | Low-level | High-level APIs | High-level APIs |
| Use case | Large batch ETL | Interactive, ML | Streaming |
When to Still Use MapReduce¶
- Simple batch jobs
- Extremely large datasets (disk-based is more stable)
- Existing Hadoop infrastructure
- Cost-sensitive (memory is expensive)
Common Interview Questions¶
1. How would you implement distributed sort?¶
Map phase:
- Sample data to determine range boundaries
- Partition data into ranges
Reduce phase:
- Each reducer gets one range
- Sort within reducer
- Concatenate reducer outputs
2. How to compute average with MapReduce?¶
# Wrong: Can't use combiner with simple average
# Correct: Emit sum and count
def map(key, value):
emit("avg", (value, 1))
def combiner(key, values):
total_sum = sum(v[0] for v in values)
total_count = sum(v[1] for v in values)
emit(key, (total_sum, total_count))
def reduce(key, values):
total_sum = sum(v[0] for v in values)
total_count = sum(v[1] for v in values)
emit(key, total_sum / total_count)
3. How to handle hot keys?¶
1. Salting: Add random suffix to hot keys
"hot_key" → "hot_key_0", "hot_key_1", ...
2. Two-phase aggregation:
Phase 1: Partial aggregate with salted keys
Phase 2: Final aggregate combining salted results
3. Broadcast join: For small dimension tables
4. Map-Side Join vs Reduce-Side Join?¶
Map-Side Join:
- Small table fits in memory
- Load small table in each mapper
- No shuffle needed
- Very fast
Reduce-Side Join:
- Both tables are large
- Shuffle by join key
- Join in reducer
- Expensive but necessary
Real-World Applications¶
| Application | Map Function | Reduce Function |
|---|---|---|
| Web indexing | Extract (word, URL) | Build inverted index |
| Log analysis | Parse log, emit (error_type, 1) | Count errors |
| Recommendation | (user, item_viewed) | Build user profiles |
| Graph PageRank | Distribute rank to neighbors | Sum incoming ranks |
| ETL | Transform records | Aggregate/deduplicate |
Implementation Example (Python-like Pseudocode)¶
class MapReduceJob:
def __init__(self, input_path, output_path, num_reducers):
self.input_path = input_path
self.output_path = output_path
self.num_reducers = num_reducers
def map(self, key, value):
"""Override this method"""
raise NotImplementedError
def reduce(self, key, values):
"""Override this method"""
raise NotImplementedError
def combiner(self, key, values):
"""Optional: Override for local aggregation"""
return self.reduce(key, values)
def partition(self, key):
"""Override for custom partitioning"""
return hash(key) % self.num_reducers
class WordCount(MapReduceJob):
def map(self, doc_id, content):
for word in content.lower().split():
yield (word, 1)
def reduce(self, word, counts):
yield (word, sum(counts))
def combiner(self, word, counts):
yield (word, sum(counts))
Key Takeaways¶
- Map: Transform input into (key, value) pairs
- Shuffle: Group by key across the cluster
- Reduce: Aggregate values for each key
- Combiners: Local pre-aggregation to reduce network I/O
- Fault tolerance: Built-in through re-execution
- Scaling: Add more machines for more data
- Limitations: Not great for iterative algorithms, real-time