Data Pipelines¶
Overview¶
ETL vs ELT¶
Batch Processing¶
Apache Spark¶
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, window
# Initialize Spark
spark = SparkSession.builder \
.appName("OrderAnalytics") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Read from source
orders_df = spark.read \
.format("parquet") \
.load("s3://bucket/orders/")
customers_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "customers") \
.load()
# Transformations
# Filter, join, aggregate
result_df = orders_df \
.filter(col("status") == "completed") \
.join(customers_df, "customer_id") \
.groupBy("customer_segment", "order_date") \
.agg(
sum("total").alias("revenue"),
avg("total").alias("avg_order_value"),
count("*").alias("order_count")
)
# Write to destination
result_df.write \
.format("parquet") \
.partitionBy("order_date") \
.mode("overwrite") \
.save("s3://bucket/analytics/daily_revenue/")
Spark Architecture¶
Stream Processing¶
Apache Kafka + Flink¶
// Flink streaming job
public class OrderProcessor {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka source
KafkaSource<Order> source = KafkaSource.<Order>builder()
.setBootstrapServers("kafka:9092")
.setTopics("orders")
.setGroupId("order-processor")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new OrderDeserializer())
.build();
DataStream<Order> orders = env.fromSource(
source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
"Kafka Source"
);
// Processing
DataStream<OrderMetrics> metrics = orders
.filter(order -> order.getStatus().equals("completed"))
.keyBy(Order::getCustomerSegment)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new OrderAggregator());
// Sink to database
metrics.addSink(new JdbcSink<>(
"INSERT INTO metrics (segment, window, revenue) VALUES (?, ?, ?)",
(ps, m) -> {
ps.setString(1, m.getSegment());
ps.setTimestamp(2, m.getWindowEnd());
ps.setBigDecimal(3, m.getRevenue());
},
jdbcConnectionOptions
));
env.execute("Order Metrics Job");
}
}
Windowing¶
Event Time vs Processing Time¶
Change Data Capture (CDC)¶
Data Quality¶
Data Validation¶
# Great Expectations example
import great_expectations as gx
# Define expectations
expectation_suite = gx.ExpectationSuite("orders_suite")
# Column existence
expectation_suite.add_expectation(
gx.ExpectColumnToExist(column="order_id")
)
# Null checks
expectation_suite.add_expectation(
gx.ExpectColumnValuesToNotBeNull(column="customer_id")
)
# Value constraints
expectation_suite.add_expectation(
gx.ExpectColumnValuesToBeBetween(
column="total",
min_value=0,
max_value=1000000
)
)
# Uniqueness
expectation_suite.add_expectation(
gx.ExpectColumnValuesToBeUnique(column="order_id")
)
# Referential integrity
expectation_suite.add_expectation(
gx.ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "confirmed", "shipped", "delivered", "cancelled"]
)
)
# Freshness
expectation_suite.add_expectation(
gx.ExpectColumnMaxToBeBetween(
column="created_at",
min_value=(datetime.now() - timedelta(hours=1)).isoformat()
)
)
# Run validation
results = context.run_validation(batch, expectation_suite)
Data Quality Dimensions¶
Orchestration¶
Apache Airflow¶
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
}
with DAG(
'daily_etl_pipeline',
default_args=default_args,
description='Daily ETL for order analytics',
schedule_interval='0 2 * * *', # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'orders'],
) as dag:
# Sensor: Wait for source file
wait_for_file = S3KeySensor(
task_id='wait_for_source_file',
bucket_name='source-bucket',
bucket_key='orders/{{ ds }}/orders.parquet',
timeout=3600,
)
# Extract
extract_orders = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders_func,
op_kwargs={'date': '{{ ds }}'},
)
# Transform
transform_orders = PythonOperator(
task_id='transform_orders',
python_callable=transform_orders_func,
)
# Load
load_to_warehouse = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse_func,
)
# Data quality check
quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=run_quality_checks,
)
# Dependencies
wait_for_file >> extract_orders >> transform_orders >> load_to_warehouse >> quality_check
DAG Best Practices¶
Data Lake Architecture¶
Common Interview Questions¶
- ETL vs ELT?
- ETL: Transform before load (limited target)
-
ELT: Load then transform (powerful warehouse)
-
Batch vs Streaming?
- Batch: High throughput, high latency
-
Streaming: Low latency, continuous
-
How to handle late data?
- Watermarks define event-time progress
- Allowed lateness window
-
Side outputs for late events
-
CDC benefits?
- Real-time sync
- Low source overhead
-
Captures all changes
-
Data quality dimensions?
- Completeness, accuracy, consistency
- Timeliness, uniqueness, validity