Skip to content

Event Sourcing


Definition

Event Sourcing Definition


Event Store

// EVENT: Immutable record of something that happened

public abstract class DomainEvent {
    private final String eventId = UUID.randomUUID().toString();
    private final Instant timestamp = Instant.now();
    private final String aggregateId;

    // Events are named in past tense (what happened)
}

public class AccountCreatedEvent extends DomainEvent {
    private final String accountId;
    private final String ownerName;
}

public class MoneyDepositedEvent extends DomainEvent {
    private final String accountId;
    private final BigDecimal amount;
}

public class MoneyWithdrawnEvent extends DomainEvent {
    private final String accountId;
    private final BigDecimal amount;
}

// EVENT STORE: Append-only log of events

interface EventStore {
    void append(String streamId, List<DomainEvent> events, long expectedVersion);
    List<DomainEvent> load(String streamId);
    List<DomainEvent> load(String streamId, long fromVersion);
}

@Repository
class JdbcEventStore implements EventStore {

    @Override
    public void append(String streamId, List<DomainEvent> events,
                       long expectedVersion) {
        // Optimistic concurrency check
        long currentVersion = getCurrentVersion(streamId);
        if (currentVersion != expectedVersion) {
            throw new ConcurrencyException("Stream was modified");
        }

        for (DomainEvent event : events) {
            jdbcTemplate.update(
                "INSERT INTO events (stream_id, version, type, data, timestamp) " +
                "VALUES (?, ?, ?, ?, ?)",
                streamId, ++currentVersion, event.getClass().getName(),
                serialize(event), event.getTimestamp()
            );
        }
    }

    @Override
    public List<DomainEvent> load(String streamId) {
        return jdbcTemplate.query(
            "SELECT * FROM events WHERE stream_id = ? ORDER BY version",
            this::mapToEvent, streamId
        );
    }
}

Aggregates

// AGGREGATE: Domain object rebuilt from events

public class Account {
    private String id;
    private BigDecimal balance = BigDecimal.ZERO;
    private List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // Private constructor - use factory methods
    private Account() {}

    // Factory method to create new account
    public static Account create(String id, String ownerName) {
        Account account = new Account();
        account.apply(new AccountCreatedEvent(id, ownerName));
        return account;
    }

    // Reconstitute from events
    public static Account fromEvents(List<DomainEvent> events) {
        Account account = new Account();
        for (DomainEvent event : events) {
            account.applyEvent(event);  // Apply without recording
        }
        return account;
    }

    // Business method
    public void deposit(BigDecimal amount) {
        if (amount.compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("Amount must be positive");
        }
        apply(new MoneyDepositedEvent(id, amount));
    }

    public void withdraw(BigDecimal amount) {
        if (amount.compareTo(balance) > 0) {
            throw new InsufficientFundsException();
        }
        apply(new MoneyWithdrawnEvent(id, amount));
    }

    // Apply event and record it
    private void apply(DomainEvent event) {
        applyEvent(event);
        uncommittedEvents.add(event);
    }

    // Event handlers (mutate state)
    private void applyEvent(DomainEvent event) {
        if (event instanceof AccountCreatedEvent e) {
            this.id = e.getAccountId();
        } else if (event instanceof MoneyDepositedEvent e) {
            this.balance = balance.add(e.getAmount());
        } else if (event instanceof MoneyWithdrawnEvent e) {
            this.balance = balance.subtract(e.getAmount());
        }
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }
}

Projections

Projections


Projections Implementation

// PROJECTION: Build read model from events

@Component
class AccountBalanceProjection {
    private final JdbcTemplate jdbcTemplate;

    @EventHandler
    void on(AccountCreatedEvent event) {
        jdbcTemplate.update(
            "INSERT INTO account_balances (account_id, balance, owner) VALUES (?, 0, ?)",
            event.getAccountId(), event.getOwnerName()
        );
    }

    @EventHandler
    void on(MoneyDepositedEvent event) {
        jdbcTemplate.update(
            "UPDATE account_balances SET balance = balance + ? WHERE account_id = ?",
            event.getAmount(), event.getAccountId()
        );
    }

    @EventHandler
    void on(MoneyWithdrawnEvent event) {
        jdbcTemplate.update(
            "UPDATE account_balances SET balance = balance - ? WHERE account_id = ?",
            event.getAmount(), event.getAccountId()
        );
    }
}

// TRANSACTION HISTORY PROJECTION
@Component
class TransactionHistoryProjection {

    @EventHandler
    void on(MoneyDepositedEvent event) {
        jdbcTemplate.update(
            "INSERT INTO transactions (account_id, type, amount, timestamp) VALUES (?, 'DEPOSIT', ?, ?)",
            event.getAccountId(), event.getAmount(), event.getTimestamp()
        );
    }

    @EventHandler
    void on(MoneyWithdrawnEvent event) {
        jdbcTemplate.update(
            "INSERT INTO transactions (account_id, type, amount, timestamp) VALUES (?, 'WITHDRAWAL', ?, ?)",
            event.getAccountId(), event.getAmount(), event.getTimestamp()
        );
    }
}

// REBUILDING A PROJECTION
class ProjectionRebuilder {
    void rebuild(String projectionName) {
        // 1. Delete current projection data
        projectionRepository.deleteAll(projectionName);

        // 2. Replay all events
        eventStore.loadAll().forEach(event -> {
            projectionHandler.handle(event);
        });
    }
}

Benefits & Trade-offs

Benefits and Trade-offs


Tips & Tricks

Tips and Tricks


  • *