Event Sourcing and Command Query Responsibility Segregation (CQRS) are powerful architectural patterns for building highly auditable, high-throughput distributed systems. In typical state-based CRUD systems, the database stores only the current snapshot of the application state. The historical transitions, the business rationales, and the intermediate states are completely lost.
Event Sourcing reverses this paradigm. Every state transition is recorded as an immutable, append-only event in a specialized database called the Event Store. The current state of any entity (or aggregate) is computed dynamically by replaying its historical event log. CQRS pairs with Event Sourcing by decoupling the write-side command pipeline from the read-side query pipeline.
However, moving from conference talk slide decks to a resilient, high-volume production environment introduces immense physical challenges. This guide details the structural requirements, low-level Java implementations, SQL DDL schemas, and operational runbooks required to deploy Event Sourcing and CQRS at platform scale.
1. Requirements & Core Constraints
Designing a production-grade event-sourced engine requires strict operational metrics and performance ceilings.
Functional Constraints
- Immutable Ingestion: The system must persist incoming domain events as immutable records, enforcing atomic, sequential writes per aggregate instance.
- State Reconstitution: The application must rebuild the active memory state of any aggregate by replaying its event history in under 15 milliseconds.
- Schema Evolution: The platform must support seamless, zero-downtime schema evolution, enabling old event structures to be loaded into new class formats on the fly.
- Dynamic Projections: Developers must have the ability to spin up new read-model projections and rebuild them from historical logs without affecting write path availability.
Non-Functional SLAs
- Global Data Footprint: Scale to support 100 Million distinct active aggregates, managing a historical log of over 10 Billion distinct domain events.
- Ingestion Scale: The write-side command gateway must process a baseline of 20,000 writes/second under peak loads.
- Precision Read Latencies: Read-side queries against denormalized projections must return p99 responses in under 5 milliseconds.
- Eventual Consistency Limit: Read-side projection models must catch up to write-side commits within less than 50 milliseconds of transaction confirmation.
Back-of-the-Envelope Estimates
Let us compute the storage footprint, memory requirements, and snapshotting thresholds for a system handling 10 Billion events:
- Event Storage Calculations: Assume each domain event payload (containing UUIDs, transaction fields, correlation IDs, and JSON data) averages 500 bytes of raw text. For 10 Billion events:
$$\text{Raw Event Storage} = 10,000,000,000 \times 500 \text{ bytes} = 5,000,000,000,000 \text{ bytes} \approx 5 \text{ Terabytes}$$
Using index structures (primary keys on
event_id, composite indexes onaggregate_id+sequence_number) adds approximately 40% index-overhead: $$\text{Index Storage} = 5 \text{ TB} \times 0.40 = 2 \text{ TB}$$ $$\text{Total Write-Side Storage} = 7 \text{ Terabytes}$$ To run this efficiently, our Event Store database must reside on high-IOPS SSDs, and historical events older than 1 year must be continuously offloaded to S3 in compressed Apache Parquet format to control costs. - Replay Latency and Snapshot Sizing: Replaying raw events in memory to reconstitute state consumes CPU and disk I/O. If a single aggregate has a long lifecycle (e.g., a high-volume trading account) and accumulates 20,000 events:
- Replaying 20,000 events without snapshots, assuming a deserialization and application speed of 5 microseconds per event: $$\text{Reconstitute Latency} = 20,000 \times 5 \text{ microseconds} = 100 \text{ milliseconds}$$ This is unacceptable for standard user requests.
- By applying a Snapshot Strategy where we save a state snapshot after every 100 events, the engine only needs to load the latest snapshot and replay a maximum of 99 subsequent events: $$\text{Maximum Replay Latency} = 99 \times 5 \text{ microseconds} \approx 0.5 \text{ milliseconds}$$ This keeps the state reconstitution speed sub-millisecond, regardless of aggregate lifespan.
2. API Design & Core Contracts
In an event-sourced architecture, API design is split into commands (state changes) and queries (state reads).
1. Write-Side Command Ingestion API
POST /api/v1/orders/commands
Invoked by the client frontend to dispatch an action. The gateway validates the command and returns the transaction metadata immediately.
Request Headers:
Content-Type: application/json
Authorization: Bearer <JWT_TOKEN>
X-Command-Type: PlaceOrderCommand
Request Payload:
{
"commandId": "cmd_b1c09988-d4e3-4f2a-b6c5-776655443322",
"aggregateId": "agg_df5a6b7c-88f9-44e2-a0b1-9c8d7e6f5a4b",
"customerId": "cust_88ff77ee-dd22-4a1b-bb33-998877665544",
"items": [
{
"productId": "prod_11aa22bb",
"quantity": 3,
"unitPrice": 45.00
}
],
"totalAmount": 135.00
}
Response Payload (202 Accepted):
{
"commandId": "cmd_b1c09988-d4e3-4f2a-b6c5-776655443322",
"aggregateId": "agg_df5a6b7c-88f9-44e2-a0b1-9c8d7e6f5a4b",
"status": "ACCEPTED",
"committedSequence": 42,
"telemetry": {
"consistenceDelayMs": 12,
"writeDurationMs": 4
}
}
2. Read-Side Query API
GET /api/v1/orders/agg_df5a6b7c-88f9-44e2-a0b1-9c8d7e6f5a4b
Invoked by the user dashboard to fetch the denormalized read-model. The server returns telemetry indicating how fresh the read-model is.
Response Payload (200 OK):
{
"orderId": "agg_df5a6b7c-88f9-44e2-a0b1-9c8d7e6f5a4b",
"customerId": "cust_88ff77ee-dd22-4a1b-bb33-998877665544",
"status": "SHIPPED",
"totalAmount": 135.00,
"lastUpdatedSequence": 42,
"lastProcessedEventTime": "2026-05-22T22:15:30.123Z",
"readConsistencyLagMs": 18
}
3. High-Level Design (HLD)
The write side appends immutable logs to the Event Store, while the read side processes those events asynchronously to build denormalized views.
1. Ingestion and CQRS Read/Write Pipelines
The following architecture details how commands execute against the Event Store, emit events to Kafka, and feed into the read-side projections:
graph TD
%% Write Command Path
Client[Client Device] -->|1. Dispatch Command| CmdGW[Command Gateway / Load Balancer]
CmdGW -->|2. Route to Worker| CmdApp[Stateless Write Service Fleet]
%% Write-side storage
CmdApp -->|3. Query Latest Snapshot| SnapDB[("PostgreSQL Snapshot Store")]
CmdApp -->|4. Replay Subsequent Events| EventStore[("PostgreSQL Event Store")]
CmdApp -->|5. Append New Events & Commit| EventStore
%% Event Publishing
EventStore -->|6. Capture Change Stream| CDC[Debezium Change Data Capture]
CDC -->|7. Publish Protobuf Events| KafkaBroker[("Apache Kafka Event Log")]
%% Read / Query Path
subgraph Read-side Projection Pipeline
KafkaBroker -->|8a. Stream Events| ProjWorker[Stateful Projection Workers]
ProjWorker -->|8b. Batch Insert| ReadDB[("Elasticsearch Read Models")]
ProjWorker -->|8c. Cache State| ReadCache[("Redis Query Cache")]
end
%% Dashboard Queries
ClientDashboard[Client Dashboard] -->|9. Read Query| APIQuery[Query API Gateway]
APIQuery -->|10. High-speed lookup| ReadCache
APIQuery -->|Cache Miss| ReadDB
classDef database fill:#0d3b66,stroke:#f4d35e,stroke-width:2px,color:#fff;
classDef cluster fill:#2e0f38,stroke:#f4d35e,stroke-width:2px,color:#fff;
classDef client fill:#3d5a80,stroke:#293241,stroke-width:2px,color:#fff;
classDef loadbalancer fill:#ee6c4d,stroke:#293241,stroke-width:2px,color:#fff;
class SnapDB,EventStore,KafkaBroker,ReadDB,ReadCache database;
class CmdApp,ProjWorker,CDC,APIQuery cluster;
class Client,ClientDashboard client;
class CmdGW loadbalancer;
2. E2E Command Execution and Reconstitution Sequence
The sequence below outlines how an incoming command is validated, loaded from the latest snapshot, applied against subsequent events, saved to the database, and propagated downstream:
sequenceDiagram
autonumber
actor Client as Client Client
participant GW as Command Gateway
participant Aggregate as OrderAggregate
participant DB as Event Store DB
participant Broker as Kafka Broker
participant Projection as Read-Model Projector
Client->>GW: POST /api/orders/commands (PlaceOrderCommand)
GW->>DB: Fetch Latest Snapshot (aggregate_id)
DB-->>GW: Return Snapshot (Sequence: 100)
GW->>DB: Fetch subsequent events WHERE sequence > 100
DB-->>GW: Return events [OrderPlaced, OrderItemAdded]
GW->>Aggregate: Reconstitute state from snapshot + events
GW->>Aggregate: HandleCommand(PlaceOrderCommand)
alt Validation Failed
Aggregate-->>GW: Throw DomainException
GW-->>Client: 422 Unprocessable Entity
else Validation Passed
Aggregate->>Aggregate: ApplyEvent(OrderCompletedEvent)
GW->>DB: Append event (Sequence: 103) & Commit
Note over GW, DB: Optimistic Concurrency check (sequence = 103)
DB-->>GW: Commit Confirmed
GW-->>Client: 202 Accepted
GW->>Broker: Publish OrderCompletedEvent
Broker->>Projection: Process OrderCompletedEvent
Projection->>Projection: Update Denormalized Query DB
end
4. Low-Level Design & Database Models
To handle high transactional throughput, the relational schema for our Event Store must avoid foreign keys and optimize index distributions.
1. Database Selection Rationale
| Layer | Storage Technology | Primary Role | Technical Justification |
|---|---|---|---|
| Write Store | PostgreSQL | Event Store Log | Demands strict ACID consistency for append operations. Highly optimized composite index support ensures rapid sequential query lookups. |
| Snapshot Store | PostgreSQL | Aggregate Snapshots | Storing the JSON snapshots. Postgres JSONB indexing supports query acceleration for catalog auditing. |
| Messaging Bus | Apache Kafka | Event Propagation | Log-based partition scaling guarantees strict message order preservation per partition (using aggregate_id as the routing key). |
| Read Model | Elasticsearch | Query-side Search | High-performance full-text search and complex query aggregations. Sub-5ms search speeds offload primary databases completely. |
2. SQL DDL Database Schemas
PostgreSQL Event Store Schema
-- Production Event Store Log Table
CREATE TABLE domain_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(128) NOT NULL, -- 'OrderAggregate', 'AccountAggregate'
event_type VARCHAR(128) NOT NULL, -- 'OrderPlaced', 'OrderShipped'
event_version INT NOT NULL DEFAULT 1, -- Schema version (for upcasters)
sequence_number BIGINT NOT NULL, -- Continuous position within aggregate log
payload JSONB NOT NULL, -- Actual event properties
metadata JSONB, -- Trace ID, user credentials, correlation ID
occurred_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
-- Enforce absolute uniqueness of sequence number per aggregate instance
UNIQUE (aggregate_id, sequence_number)
);
-- Optimize aggregate state loading (reconstitution path)
CREATE UNIQUE INDEX idx_events_aggregate_reconstitution
ON domain_events (aggregate_id, sequence_number ASC);
-- Optimize operational auditing queries
CREATE INDEX idx_events_audit_lookup
ON domain_events (event_type, occurred_at DESC);
Aggregate Snapshot Table
-- Aggregate Snapshots Cache Table
CREATE TABLE aggregate_snapshots (
aggregate_id UUID PRIMARY KEY,
aggregate_type VARCHAR(128) NOT NULL,
snapshot_version INT NOT NULL, -- Code snapshot version mapping
sequence_number BIGINT NOT NULL, -- Sequence number of last snapshotted event
state_payload JSONB NOT NULL, -- Fully denormalized JSON state representation
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_snapshots_audit_metrics
ON aggregate_snapshots (aggregate_type, updated_at DESC);
3. Compilable Java Event Store and Aggregate Reconstitution Class
Below is a complete, production-grade Java engine illustrating aggregate reconstitution, thread-safe state mutations, optimistic concurrency locking, and event upcasting integration:
package com.codesprintpro.eventsourcing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* High-performance aggregate engine designed for state reconstitution,
* optimistic concurrency handling, and basic JSON upcasting.
*/
public class AggregateEngine {
private final EventStoreRepository eventStore;
private final SnapshotRepository snapshotStore;
private final UpcasterRegistry upcasterRegistry;
public AggregateEngine(EventStoreRepository eventStore,
SnapshotRepository snapshotStore,
UpcasterRegistry upcasterRegistry) {
this.eventStore = eventStore;
this.snapshotStore = snapshotStore;
this.upcasterRegistry = upcasterRegistry;
}
/**
* Reconstitutes the state of an Order aggregate from the latest snapshot
* and subsequent event log entries.
*/
public OrderAggregate load(UUID aggregateId) {
// 1. Try to load the latest snapshot
Optional<Snapshot> snapshotOpt = snapshotStore.find(aggregateId);
long startSequence = 0L;
OrderAggregate aggregate;
if (snapshotOpt.isPresent()) {
Snapshot snapshot = snapshotOpt.get();
aggregate = OrderAggregate.fromSnapshot(snapshot);
startSequence = snapshot.sequenceNumber();
} else {
aggregate = new OrderAggregate(aggregateId);
}
// 2. Fetch all subsequent events
List<RawEvent> rawEvents = eventStore.fetchEventsAfter(aggregateId, startSequence);
// 3. Upcast and apply events sequentially
for (RawEvent raw : rawEvents) {
RawEvent upgraded = upcasterRegistry.upcast(raw);
aggregate.apply(upgraded);
}
return aggregate;
}
/**
* Saves uncommitted events to the Event Store, performing optimistic lock checks.
*/
public void save(OrderAggregate aggregate) {
List<RawEvent> newEvents = aggregate.getUncommittedEvents();
if (newEvents.isEmpty()) {
return;
}
long expectedSequence = aggregate.getSequenceNumber() - newEvents.size();
eventStore.appendEvents(aggregate.getId(), newEvents, expectedSequence);
aggregate.clearUncommittedEvents();
}
// --- Core Domain Entities ---
public static class OrderAggregate implements Serializable {
private final UUID id;
private long sequenceNumber = 0L;
private String status = "NEW";
private BigDecimal totalAmount = BigDecimal.ZERO;
private final List<RawEvent> uncommittedEvents = new ArrayList<>();
public OrderAggregate(UUID id) {
this.id = id;
}
public static OrderAggregate fromSnapshot(Snapshot snapshot) {
OrderAggregate aggregate = new OrderAggregate(snapshot.aggregateId());
aggregate.sequenceNumber = snapshot.sequenceNumber();
aggregate.status = snapshot.status();
aggregate.totalAmount = snapshot.totalAmount();
return aggregate;
}
public void apply(RawEvent event) {
this.sequenceNumber = event.sequenceNumber();
switch (event.eventType()) {
case "OrderPlaced" -> {
this.status = "PLACED";
this.totalAmount = new BigDecimal(event.payload().get("totalAmount").asText());
}
case "OrderCompleted" -> this.status = "COMPLETED";
case "OrderCancelled" -> this.status = "CANCELLED";
}
}
public void placeOrder(BigDecimal amount) {
if (!this.status.equals("NEW")) {
throw new IllegalStateException("Order already placed!");
}
long nextSeq = this.sequenceNumber + 1;
ObjectMapper mapper = new ObjectMapper();
ObjectNode payload = mapper.createObjectNode();
payload.put("totalAmount", amount.toString());
RawEvent event = new RawEvent(UUID.randomUUID(), id, "OrderPlaced", 1, nextSeq, payload);
uncommittedEvents.add(event);
apply(event);
}
public UUID getId() { return id; }
public long getSequenceNumber() { return sequenceNumber; }
public String getStatus() { return status; }
public BigDecimal getTotalAmount() { return totalAmount; }
public List<RawEvent> getUncommittedEvents() { return uncommittedEvents; }
public void clearUncommittedEvents() { uncommittedEvents.clear(); }
}
// --- Infrastructure Support Interfaces & Records ---
public record RawEvent(UUID eventId, UUID aggregateId, String eventType, int version, long sequenceNumber, ObjectNode payload) {}
public record Snapshot(UUID aggregateId, long sequenceNumber, String status, BigDecimal totalAmount) {}
public interface EventStoreRepository {
List<RawEvent> fetchEventsAfter(UUID aggregateId, long sequenceNumber);
void appendEvents(UUID aggregateId, List<RawEvent> events, long expectedSequence);
}
public interface SnapshotRepository {
Optional<Snapshot> find(UUID aggregateId);
void save(Snapshot snapshot);
}
public static class UpcasterRegistry {
public RawEvent upcast(RawEvent raw) {
if (raw.eventType().equals("OrderPlaced") && raw.version() == 1) {
// Example Upcaster: Rename customer_id to user_id in the payload
ObjectNode payload = raw.payload().deepCopy();
if (payload.has("customer_id")) {
payload.set("user_id", payload.get("customer_id"));
payload.remove("customer_id");
}
return new RawEvent(raw.eventId(), raw.aggregateId(), raw.eventType(), 2, raw.sequenceNumber(), payload);
}
return raw;
}
}
}
5. Scaling Challenges & System Bottlenecks
Moving an event-sourced platform beyond thousands of commands per second reveals core distributed systems engineering problems.
1. Rebuilding Large Projections from 10 Billion Events with Zero Downtime
- The Bottleneck: When you introduce a new feature or fix a query-side bug, you must rebuild the read model projection by replaying historical events. Replaying 10 Billion events from scratch on a single database thread takes weeks, rendering the read path stale.
- The Mitigation: Blue-Green Projections:
- We never rebuild projections in place. Instead, we spin up a new, independent projection table (e.g.
orders_projection_v2) on a separate cluster. - We run the Projection Rebuilder Worker in parallel. It reads historical events from cold storage (AWS Athena over S3 Parquet format) or high-throughput Kafka topics, streaming writes to
orders_projection_v2. - During this rebuild, the application continues to serve user traffic from the old projection
orders_projection_v1. - Once the sequence number of
orders_projection_v2catches up to the current write-side sequence (monitored via watermark offsets), we switch the API routing pointer to the new table, then safely delete the oldv1structure.
- We never rebuild projections in place. Instead, we spin up a new, independent projection table (e.g.
2. Schema Evolution and Breaking API Changes (Upcaster Chains)
- The Bottleneck: Events are immutable; once written to disk, they cannot be updated. However, business features evolve, changing event models (e.g., merging two fields or splitting address records). Loading old, incompatible JSON structures into new Java classes crashes the deserializer.
- The Mitigation: Sequential Upcasting Chains:
- We insert an Upcaster Interceptor in the aggregate reconstitution pipeline, right between database retrieval and Jackson deserialization.
- The upcaster works purely on raw JSON node trees. If the database returns a V1 event, the
V1ToV2Upcasterupgrades the JSON structure in memory. The upgraded JSON tree is then passed to theV2ToV3Upcaster, and so on. - The application code only needs to know how to deserialize the latest version (V3). This isolates version translation logic, keeping the primary domain aggregates clean.
3. Out-Of-Order Event Delivery in Projection Workers
- The Bottleneck: CQRS relies on asynchronous processing. If a user places an order and quickly completes it, the write side emits
OrderPlacedthenOrderCompleted. If Kafka partitions are misconfigured, a projection worker could consumeOrderCompletedbeforeOrderPlaced. This causes database key errors and corrupts the read model. - The Mitigation: Partition Key Pinning and Watermarking:
- We ensure all events generated by a specific
aggregate_idare pinned to the exact same Kafka partition. We achieve this by setting the Kafka routing key toaggregate_id. Since a single partition is consumed sequentially by a single worker thread, out-of-order delivery within an aggregate is physically impossible. - Projection tables explicitly store the
last_processed_sequencecolumn. If a worker receives an event with sequence 10, but the database shows the last processed sequence is 8, the worker blocks the transaction and waits for sequence 9, preventing read-skew corruption.
- We ensure all events generated by a specific
6. Technical Trade-offs & Consistency Models
1. Dual-Write Anti-Pattern vs. CDC Log Mining
2. Optimistic Concurrency Checks
7. Failure Scenarios & Operational Resiliency
1. Projection Worker Lag & Cascading Backpressure
If our Elasticsearch read-model cluster slows down (due to CPU overload or garbage collection pauses), the Projection Worker cannot write aggregates. Events pile up in the Kafka consumer buffers. If unmitigated, memory overflows, crashing the projection service.
- Resiliency Plan:
- We configure the Kafka consumers with a Bounded Backpressure Strategy using a Pause/Resume mechanism.
- If the projection write queue exceeds 500 records, the worker calls
KafkaConsumer.pause(). The broker stops pushing new events, letting the worker flush its buffer to Elasticsearch. - Once the write queue is resolved, the worker calls
KafkaConsumer.resume(), resuming consumption safely.
2. Event Store Disk Saturation
Because Event Sourcing is append-only, the database disk footprint grows continuously. If a database server runs out of disk space, all write commands fail, triggering a platform-wide outage.
- Resiliency Plan:
- We split data streams into Hot Storage and Cold Storage.
- Hot Store (PostgreSQL): Holds events for the last 90 days. Ensures rapid reconstitution speeds.
- Cold Store (AWS S3): A background archiver continuously runs an ETL process, packing events older than 90 days into highly compressed, columnar Apache Parquet files on S3.
- If the application needs to reconstitute a stale, inactive aggregate (older than 90 days), the engine queries S3, loads the parquet log, and returns the state, preserving database disk space.
8. Candidate Verbal Script (Interview Guide)
Below is an verbatim mock interview dialogue showcasing how a candidate navigates Event Sourcing and CQRS:
Interviewer: "How would you design a financial ledger system that handles millions of bank transactions daily, requiring absolute auditing and sub-10ms query times?"
Candidate: "I will design this platform utilizing an Event Sourced and CQRS Architecture with a Kappa streaming design. A financial ledger cannot tolerate state mutations; we must know exactly how every balance was computed.
On the Write Path, when a command (such as TransferFundsCommand) is received, the stateless command handler loads the latest state snapshot from the aggregate_snapshots database. It then loads subsequent events from the PostgreSQL Event Store and reconstitutes the active memory state of the aggregate. If validation passes, the domain aggregate generates a FundsTransferred event, which is appended to the Event Store using strict optimistic concurrency checks.
To ensure eventual consistency with the Read Path, I will deploy a Debezium Change Data Capture worker that tail-logs the PostgreSQL Write-Ahead Log, publishing new events directly to Apache Kafka using the account_id as the partition key.
On the Read Path, stateful Projection Workers consume these events from Kafka and update high-performance denormalized query views in Elasticsearch and Redis. All user dashboard queries bypass the Event Store and read directly from this denormalized read-model cluster, returning responses in under 5 milliseconds."
Interviewer: "What happens if a projection crashes and falls behind? How do you prevent clients from reading stale data immediately after a write?"
Candidate: *"This is the classic read-skew challenge in CQRS. I mitigate this at two levels:
- API Metadata Telemetry: When the write side successfully appends an event, it returns the committed
sequence_numberin the API response headers. When the client subsequently queries the read-side API, it passes this sequence number. If the Projection database shows its last processed sequence number is lower than the requested sequence, the query handler waits, polling the cache for up to 200 milliseconds until the projection catches up. If it timeout, it returns the result with a header indicating stale data. - Command Side Read Bypass: For critical operations where the user needs immediate confirmation (e.g. updating their profile name), the command gateway returns the modified entity structure directly in the command response. This allows the client UI to update instantly without waiting for asynchronous projection propagation."*
Interviewer: "How do you handle schema migrations when you have 10 Billion historical events stored in your database?"
Candidate: "We never run database migrations to alter existing event records. Events are immutable domain facts. Instead, I implement Event Upcasting.
When a breaking change occurs, such as splitting a name field into first_name and last_name, I write a JSON upcaster class. When the aggregate loads historical events from the database, the upcaster interceptor captures the raw JSON structure. If it matches version 1, the V1-to-V2 upcaster transforms the JSON node tree in memory before passing it to the deserializer. The application codebase only needs to know how to map the latest model. For projection rebuilds, I deploy a green-field projection table and run parallel rebuilder workers that stream historical logs from AWS S3 cold storage, switching traffic to the new table only after the rebuild catches up to the live event watermark."