Skip to content

Data Pipelines

Overview

Data Pipeline Architecture


ETL vs ELT

ETL vs ELT


Batch Processing

Apache Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, window

# Initialize Spark
spark = SparkSession.builder \
    .appName("OrderAnalytics") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Read from source
orders_df = spark.read \
    .format("parquet") \
    .load("s3://bucket/orders/")

customers_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/db") \
    .option("dbtable", "customers") \
    .load()

# Transformations
# Filter, join, aggregate
result_df = orders_df \
    .filter(col("status") == "completed") \
    .join(customers_df, "customer_id") \
    .groupBy("customer_segment", "order_date") \
    .agg(
        sum("total").alias("revenue"),
        avg("total").alias("avg_order_value"),
        count("*").alias("order_count")
    )

# Write to destination
result_df.write \
    .format("parquet") \
    .partitionBy("order_date") \
    .mode("overwrite") \
    .save("s3://bucket/analytics/daily_revenue/")

Spark Architecture

Spark Architecture


Stream Processing

// Flink streaming job
public class OrderProcessor {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka source
        KafkaSource<Order> source = KafkaSource.<Order>builder()
            .setBootstrapServers("kafka:9092")
            .setTopics("orders")
            .setGroupId("order-processor")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new OrderDeserializer())
            .build();

        DataStream<Order> orders = env.fromSource(
            source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
            "Kafka Source"
        );

        // Processing
        DataStream<OrderMetrics> metrics = orders
            .filter(order -> order.getStatus().equals("completed"))
            .keyBy(Order::getCustomerSegment)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .aggregate(new OrderAggregator());

        // Sink to database
        metrics.addSink(new JdbcSink<>(
            "INSERT INTO metrics (segment, window, revenue) VALUES (?, ?, ?)",
            (ps, m) -> {
                ps.setString(1, m.getSegment());
                ps.setTimestamp(2, m.getWindowEnd());
                ps.setBigDecimal(3, m.getRevenue());
            },
            jdbcConnectionOptions
        ));

        env.execute("Order Metrics Job");
    }
}

Windowing

Window Types

Event Time vs Processing Time

Event Time vs Processing Time


Change Data Capture (CDC)

Change Data Capture


Data Quality

Data Validation

# Great Expectations example
import great_expectations as gx

# Define expectations
expectation_suite = gx.ExpectationSuite("orders_suite")

# Column existence
expectation_suite.add_expectation(
    gx.ExpectColumnToExist(column="order_id")
)

# Null checks
expectation_suite.add_expectation(
    gx.ExpectColumnValuesToNotBeNull(column="customer_id")
)

# Value constraints
expectation_suite.add_expectation(
    gx.ExpectColumnValuesToBeBetween(
        column="total",
        min_value=0,
        max_value=1000000
    )
)

# Uniqueness
expectation_suite.add_expectation(
    gx.ExpectColumnValuesToBeUnique(column="order_id")
)

# Referential integrity
expectation_suite.add_expectation(
    gx.ExpectColumnValuesToBeInSet(
        column="status",
        value_set=["pending", "confirmed", "shipped", "delivered", "cancelled"]
    )
)

# Freshness
expectation_suite.add_expectation(
    gx.ExpectColumnMaxToBeBetween(
        column="created_at",
        min_value=(datetime.now() - timedelta(hours=1)).isoformat()
    )
)

# Run validation
results = context.run_validation(batch, expectation_suite)

Data Quality Dimensions

Data Quality Dimensions


Orchestration

Apache Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
}

with DAG(
    'daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL for order analytics',
    schedule_interval='0 2 * * *',  # 2 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'orders'],
) as dag:

    # Sensor: Wait for source file
    wait_for_file = S3KeySensor(
        task_id='wait_for_source_file',
        bucket_name='source-bucket',
        bucket_key='orders/{{ ds }}/orders.parquet',
        timeout=3600,
    )

    # Extract
    extract_orders = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders_func,
        op_kwargs={'date': '{{ ds }}'},
    )

    # Transform
    transform_orders = PythonOperator(
        task_id='transform_orders',
        python_callable=transform_orders_func,
    )

    # Load
    load_to_warehouse = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_warehouse_func,
    )

    # Data quality check
    quality_check = PythonOperator(
        task_id='data_quality_check',
        python_callable=run_quality_checks,
    )

    # Dependencies
    wait_for_file >> extract_orders >> transform_orders >> load_to_warehouse >> quality_check

DAG Best Practices

Airflow Best Practices


Data Lake Architecture

Data Lake Zones


Common Interview Questions

  1. ETL vs ELT?
  2. ETL: Transform before load (limited target)
  3. ELT: Load then transform (powerful warehouse)

  4. Batch vs Streaming?

  5. Batch: High throughput, high latency
  6. Streaming: Low latency, continuous

  7. How to handle late data?

  8. Watermarks define event-time progress
  9. Allowed lateness window
  10. Side outputs for late events

  11. CDC benefits?

  12. Real-time sync
  13. Low source overhead
  14. Captures all changes

  15. Data quality dimensions?

  16. Completeness, accuracy, consistency
  17. Timeliness, uniqueness, validity