Event Driven¶
What is Event-Driven Architecture?¶
A software design pattern where the flow of the program is determined by events - significant changes in state.
Event Types¶
Domain Events¶
// Something that happened in the business domain
public record OrderPlaced(
String orderId,
String customerId,
List<OrderItem> items,
BigDecimal totalAmount,
Instant occurredAt
) implements DomainEvent {}
public record PaymentReceived(
String paymentId,
String orderId,
BigDecimal amount,
String paymentMethod,
Instant occurredAt
) implements DomainEvent {}
public record OrderShipped(
String orderId,
String trackingNumber,
String carrier,
Instant occurredAt
) implements DomainEvent {}
Event Categories¶
Event Sourcing¶
Store all changes as a sequence of events, not just current state.
Event Store Implementation¶
public interface EventStore {
void append(String aggregateId, List<Event> events, long expectedVersion);
List<Event> getEvents(String aggregateId);
List<Event> getEvents(String aggregateId, long fromVersion);
}
// Aggregate reconstruction
public class OrderAggregate {
private String id;
private OrderStatus status;
private List<OrderItem> items = new ArrayList<>();
private BigDecimal total;
// Reconstruct from events
public static OrderAggregate fromEvents(List<Event> events) {
OrderAggregate order = new OrderAggregate();
events.forEach(order::apply);
return order;
}
private void apply(Event event) {
switch (event) {
case OrderCreated e -> {
this.id = e.orderId();
this.status = OrderStatus.CREATED;
}
case ItemAdded e -> {
this.items.add(new OrderItem(e.productId(), e.quantity()));
this.total = calculateTotal();
}
case OrderConfirmed e -> {
this.status = OrderStatus.CONFIRMED;
}
case OrderShipped e -> {
this.status = OrderStatus.SHIPPED;
}
default -> throw new IllegalArgumentException("Unknown event");
}
}
// Command handling produces new events
public List<Event> addItem(String productId, int quantity) {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("Cannot modify confirmed order");
}
var event = new ItemAdded(id, productId, quantity, Instant.now());
apply(event);
return List.of(event);
}
}
Snapshots¶
Optimize reconstruction by periodically saving state.
Benefits & Challenges¶
CQRS (Command Query Responsibility Segregation)¶
Separate read and write models.
Implementation Example¶
// Command side
@Service
public class OrderCommandService {
private final EventStore eventStore;
public void placeOrder(PlaceOrderCommand cmd) {
// Validate
validateOrder(cmd);
// Create events
var events = List.of(
new OrderPlaced(cmd.orderId(), cmd.customerId(), cmd.items(), Instant.now())
);
// Persist events
eventStore.append(cmd.orderId(), events, 0);
}
}
// Query side - Projection
@Component
public class OrderProjection {
private final OrderReadRepository readRepo;
@EventHandler
public void on(OrderPlaced event) {
OrderView view = new OrderView(
event.orderId(),
event.customerId(),
"PLACED",
event.items(),
event.occurredAt()
);
readRepo.save(view);
}
@EventHandler
public void on(OrderShipped event) {
readRepo.updateStatus(event.orderId(), "SHIPPED");
}
}
// Query side - Read service
@Service
public class OrderQueryService {
private final OrderReadRepository readRepo;
public OrderView getOrder(String orderId) {
return readRepo.findById(orderId);
}
public List<OrderView> getOrdersByCustomer(String customerId) {
return readRepo.findByCustomerId(customerId);
}
}
Event Processing Patterns¶
Event Choreography¶
Services react to events independently.
Event Orchestration¶
Central coordinator manages event flow.
Idempotency¶
Ensure processing an event multiple times has same effect as once.
// Using idempotency key
@Service
public class PaymentEventHandler {
private final ProcessedEventRepository processedEvents;
private final PaymentService paymentService;
@Transactional
public void handle(PaymentRequested event) {
// Check if already processed
if (processedEvents.exists(event.eventId())) {
log.info("Event already processed: {}", event.eventId());
return;
}
// Process event
paymentService.processPayment(event);
// Mark as processed
processedEvents.save(new ProcessedEvent(event.eventId(), Instant.now()));
}
}
// Using natural idempotency
@Service
public class InventoryEventHandler {
@Transactional
public void handle(InventoryReserved event) {
// Idempotent operation - reserving same items twice is safe
// if we use "reservation ID" as unique key
inventoryRepo.createReservation(
event.reservationId(), // Unique per order
event.items()
);
// ON CONFLICT DO NOTHING
}
}
Delivery Guarantees¶
Achieving Exactly-Once Semantics¶
// Transactional outbox pattern
@Transactional
public void placeOrder(CreateOrderCommand cmd) {
// 1. Write to main table
Order order = orderRepository.save(new Order(cmd));
// 2. Write event to outbox table (same transaction)
outboxRepository.save(new OutboxEvent(
UUID.randomUUID(),
"OrderPlaced",
objectMapper.writeValueAsString(new OrderPlacedEvent(order)),
Instant.now()
));
}
// Separate process reads outbox and publishes
@Scheduled(fixedDelay = 1000)
public void publishOutboxEvents() {
List<OutboxEvent> events = outboxRepository.findUnpublished();
for (OutboxEvent event : events) {
try {
kafka.send(event.getTopic(), event.getPayload());
outboxRepository.markPublished(event.getId());
} catch (Exception e) {
log.error("Failed to publish event", e);
// Will retry on next run
}
}
}
Event Schema Evolution¶
Handling changes to event structure over time.
// Upcasting example
public class OrderPlacedUpcaster implements Upcaster<OrderPlaced> {
@Override
public OrderPlaced upcast(JsonNode oldEvent, int fromVersion) {
return switch (fromVersion) {
case 1 -> new OrderPlaced(
oldEvent.get("orderId").asText(),
oldEvent.get("customerId").asText(),
parseItems(oldEvent.get("items")),
new BigDecimal(oldEvent.get("amount").asText()),
"USD", // Default currency for v1 events
Instant.parse(oldEvent.get("occurredAt").asText())
);
case 2 -> objectMapper.convertValue(oldEvent, OrderPlaced.class);
default -> throw new IllegalArgumentException("Unknown version");
};
}
}
Messaging Patterns¶
Publish-Subscribe¶
Competing Consumers¶
Consumer Groups (Kafka)¶
Dead Letter Queue¶
Handle messages that can't be processed.
// Kafka example with retry and DLQ
@KafkaListener(topics = "orders")
@RetryableTopic(
attempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2),
dltTopicSuffix = ".DLT"
)
public void processOrder(OrderEvent event) {
// Processing logic
// After 3 failures, message goes to orders.DLT
}
@DltHandler
public void handleDlt(OrderEvent event) {
log.error("Failed to process order after retries: {}", event);
alertService.notify("Order processing failed", event);
}
Common Interview Questions¶
- Event Sourcing vs Event-Driven Architecture?
- Event Sourcing: Store state as events (persistence pattern)
-
EDA: System architecture based on event communication
-
When to use CQRS?
- Complex domains with different read/write patterns
- Need to scale reads independently
-
Multiple read models needed
-
How to handle event ordering?
- Partition by aggregate ID (Kafka)
- Sequence numbers per aggregate
-
Vector clocks for distributed ordering
-
Choreography vs Orchestration?
- Choreography: Decentralized, loose coupling
-
Orchestration: Centralized control, easier to track
-
How to achieve exactly-once processing?
- At-least-once delivery + idempotent consumers
- Transactional outbox pattern
- Kafka transactions (producer + consumer)