System Design: Building a Search Indexing Platform
Search is often presented to users as a simple input box that instantly returns relevant results. Under the hood, however, search is operated as a highly complex distributed data synchronization problem.
The primary engineering challenge is not writing queries against Elasticsearch or OpenSearch. The real difficulty lies in getting the right data from your primary transactional databases into the search index, in the right shape, at the right time, without overloading the source systems or serving stale results to your users.
When a search indexing platform is poorly designed, critical synchronization bugs occur: deleted items continue to appear in search results, newly published inventory takes hours to index, and massive index rebuilds saturate database CPUs, resulting in production outages. This guide details the architecture of a resilient, high-throughput search indexing platform designed to operate at scale.
Requirements and System Goals
A robust, enterprise-grade search indexing platform must satisfy the following functional and operational boundaries:
Functional Requirements
- Real-Time CDC Ingestion: Automatically capture database changes (inserts, updates, deletes) from primary datastores and stream them into the search index in near-real-time.
- Batch Backfilling and Reindexing: Support complete or selective index backfills to rebuild index schemas without interrupting live search traffic or causing downtime.
- Tombstone Delete Support: Safely capture database deletions and propagate them as document removals or logical tombstones in the search cluster.
- Zero-Downtime Schema Evolution: Enable structural schema modifications (e.g., adding analyzers, changing fields) using blue-green index alias cutovers.
Non-Functional Requirements
- Bounded Freshness Lag: The synchronization delay between a database commit and its visibility in the search index must be strictly bounded and observable (ideally less than 2.0 seconds under normal load).
- Decoupled Write Path: A total outage of the search cluster must never block or degrade writes to the primary transactional application database.
- Database Isolation and Rate Throttling: Large-scale indexing backfills must be strictly throttled to prevent CPU/IOPS starvation on primary transactional databases.
- Safe Retries and Concurrency Order: Ensure that events are processed in strict chronological order per-document, allowing out-of-order retries to be skipped safely using version comparison.
API Interfaces and Service Contracts
An elegant search indexing pipeline is driven by explicit data contracts at both the ingestion layer (Kafka outbox events) and the index storage layer (Elasticsearch mappings).
graph LR
Outbox[Postgres Outbox Table] -->|CDC / Debezium| Kafka[Kafka Event Bus]
Kafka -->|Consume & Process| Worker[Indexing Worker Fleet]
Worker -->|Bulk Ingest| ES[Elasticsearch Cluster]
1. Ingestion Outbox Schema (Kafka Payload)
Every change event captured from the source-of-truth database is serialized as a structured Kafka event. Below is the JSON schema contract for a product catalog update event:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SearchIndexEvent",
"type": "object",
"properties": {
"event_id": { "type": "string", "format": "uuid" },
"event_type": { "type": "string", "enum": ["PRODUCT_UPSERT", "PRODUCT_DELETE"] },
"entity_id": { "type": "string" },
"tenant_id": { "type": "string" },
"version": { "type": "integer" },
"timestamp": { "type": "string", "format": "date-time" },
"payload": {
"type": "object",
"properties": {
"title": { "type": "string" },
"description": { "type": "string" },
"brand_name": { "type": "string" },
"price_minor": { "type": "integer" },
"available_qty": { "type": "integer" }
},
"required": ["title", "price_minor", "available_qty"]
}
},
"required": ["event_id", "event_type", "entity_id", "tenant_id", "version", "timestamp"]
}
2. Elasticsearch Index Mapping Schema
Rather than relying on dynamic type inference (which leads to mapping conflicts), we explicitly define our Elasticsearch mapping contracts, configuring optimized tokenizers and analyzers:
{
"settings": {
"analysis": {
"analyzer": {
"autocomplete_analyzer": {
"type": "custom",
"tokenizer": "edge_ngram_tokenizer",
"filter": ["lowercase", "asciifolding"]
}
},
"tokenizer": {
"edge_ngram_tokenizer": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 15,
"token_chars": ["letter", "digit"]
}
}
}
},
"mappings": {
"dynamic": "strict",
"properties": {
"id": { "type": "keyword" },
"tenant_id": { "type": "keyword" },
"title": {
"type": "text",
"analyzer": "autocomplete_analyzer",
"fields": {
"keyword": { "type": "keyword" }
}
},
"description": { "type": "text", "analyzer": "standard" },
"brand": { "type": "keyword" },
"price_minor": { "type": "long" },
"available_qty": { "type": "integer" },
"version": { "type": "long" },
"updated_at": { "type": "date" }
}
}
}
High-Level Design and Visualizations
Maintaining synchronization between transactional systems and search indices requires a decoupled, event-driven streaming pipeline.
CDC-Driven Real-Time Search Sync Pipeline
This architecture leverages Change Data Capture (CDC) to stream commits directly from the transaction log (WAL) to Kafka without adding any write overhead to application paths.
sequenceDiagram
autonumber
participant App as Application Service
participant DB as Postgres (Primary Database)
participant Debezium as Debezium (CDC Engine)
participant Kafka as Kafka Cluster (Buffered Log)
participant Workers as Indexing Workers Fleet
participant ES as Elasticsearch Cluster
App->>DB: UPDATE products SET price_minor = 7900 WHERE id = 'p1'
Note over DB: Commit to transaction log (WAL)
Debezium->>DB: Stream WAL changes bytes asynchronously
Debezium->>Kafka: Publish event to 'cdc.products' (Partition Key: 'p1')
Note over Kafka: Event buffered, ordered strictly by product id
Workers->>Kafka: Consume batch of 500 events
Note over Workers: Deduplicate batch,<br/>resolve latest version per ID,<br/>build bulk payload
Workers->>ES: POST /_bulk (Update products_v2 index)
Note over ES: Index refreshed (Lucene segment written)
Blue-Green Index Alias Cutover
When structural index mappings or custom analyzers need to be updated in production, we must avoid in-place rebuilds. Instead, we use versioned indices managed by logical aliases.
sequenceDiagram
autonumber
actor Admin as Platform Operations
participant ES as Elasticsearch Cluster
participant Workers as Backfill Fleet
participant Search as Search API / Client
Note over Search: Live searches point to alias: 'products_current' (resolves to 'products_v1')
Admin->>ES: CREATE INDEX products_v2 (Apply new mappings)
Admin->>Workers: Trigger throttled backfill from primary database
Workers->>ES: Ingest historical records into 'products_v2'
Note over Admin: Verify document counts & test search quality on 'products_v2'
Admin->>ES: Atomic Alias Switch: Remove 'products_v1', Add 'products_v2' to 'products_current'
Note over Search: Instant zero-downtime transition to new index schema!
Admin->>ES: DELETE INDEX products_v1 (After safety validation window)
Low-Level Design and Schema Strategies
To guarantee transactional safety and avoid missing events under heavy network partitions, we combine the Transactional Outbox Pattern with strict partition-key database schemas.
Transactional Outbox Schema (Postgres)
Instead of the dual-write pattern (which suffers from data inconsistency if the application crashes mid-request), we write events to an outbox table in the same transaction as the core entity change:
-- Transactional Outbox table definition
CREATE TABLE transactional_outbox (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Index to enable rapid sequential reading by the outbox polling daemon
CREATE INDEX idx_outbox_polling ON transactional_outbox (created_at, event_id);
Application Code Transaction Example
@Transactional
public void updateProductPrice(String productId, long priceMinor) {
// 1. Update core product entity
productRepository.updatePrice(productId, priceMinor);
// 2. Write update event to the outbox table in the SAME database transaction
ProductUpdateEvent event = new ProductUpdateEvent(productId, priceMinor);
outboxRepository.save(new OutboxRecord(
"Product",
productId,
"PRODUCT_UPDATE",
objectMapper.writeValueAsString(event),
event.getVersion()
));
} // Postgres commits both writes atomically. 100% data consistency guaranteed.
Resume and Offset Tracking Schema (Kafka Consumer)
To guarantee that indexing workers resume from their exact previous state after a crash or deploy, partition offset coordinates are stored directly inside Kafka's native storage. However, during custom batch backfills, we maintain local progress tracking:
CREATE TABLE backfill_progress (
backfill_id UUID PRIMARY KEY,
index_name VARCHAR(255) NOT NULL,
start_id BIGINT NOT NULL,
last_processed_id BIGINT NOT NULL,
end_id BIGINT NOT NULL,
status VARCHAR(64) NOT NULL, -- 'RUNNING', 'COMPLETED', 'FAILED'
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Scaling and Operational Challenges
At massive throughput ranges, scaling a search indexing platform requires mitigating database load during backfills and optimizing worker bulk processing.
Bulk Ingestion and Throughput Calculations
Suppose we need to backfill a historical catalog containing 100,000,000 (100 Million) product documents into a new Elasticsearch index. Let us calculate the optimum batch sizes and time bounds.
Let:
- $N$ = Total documents = $100,000,000$ documents.
- $B$ = Bulk batch size = $5,000$ documents per bulk request.
- $S$ = Average document size = $1.2 \text{ KB}$ (serialized JSON).
- $T_{\text{bulk}}$ = Average roundtrip time of a single Elasticsearch
_bulkwrite = $250 \text{ milliseconds}$ (under load). - $W$ = Number of parallel indexing worker threads = $16$ workers.
First, calculate the byte size of a single bulk batch:
$$\text{Size}_{\text{batch}} = B \times S = 5,000 \times 1.2 \text{ KB} = 6,000 \text{ KB} \approx 6.0 \text{ Megabytes}$$ This is well within Elasticsearch's optimal 5MB to 15MB bulk size recommendations.
Next, calculate the total bulk batches required:
$$\text{Batches}_{\text{total}} = \frac{N}{B} = \frac{100,000,000}{5,000} = 20,000 \text{ bulk batches}$$
Calculate the hourly throughput capacity of our indexing worker fleet:
$$\text{Throughput}{\text{worker}} = \frac{1 \text{ batch}}{250 \text{ ms}} = 4 \text{ batches/second}$$ $$\text{Throughput}{\text{fleet}} = 4 \times 16 \text{ workers} = 64 \text{ batches/second}$$ $$\text{Throughput}_{\text{fleet}} = 64 \times 5,000 \text{ docs} = 320,000 \text{ documents/second}$$
Finally, calculate the total duration required to complete the 100M document backfill:
$$\text{Time}{\text{seconds}} = \frac{100,000,000}{320,000} = 312.5 \text{ seconds}$$ $$\text{Time}{\text{minutes}} = \frac{312.5}{60} \approx 5.2 \text{ minutes}$$
To achieve this maximum throughput without crashing the primary transactional database, we must stream range queries page-by-page using read-replicas, keeping primary node CPU usage close to zero.
Trade-offs and Architectural Alternatives
Designing a search synchronization pipeline requires weighing engineering trade-offs across three primary paradigms:
| Dimension | Change Data Capture (CDC / Debezium) | Transactional Outbox Pattern | Application-level Double-Writes |
|---|---|---|---|
| Data Consistency | Perfect (Reads directly from database write-ahead log, ensuring every committed state is captured). | Perfect (Relies on a single, ACID-compliant local database transaction). | Poor (High risk of silent divergence; writing to DB succeeds while writing to ES fails due to timeouts). |
| Database Performance | Excellent (Zero queries executed; reads transaction bytes asynchronously from the file system). | Medium (Outbox inserts add lock and write IOPS overhead to database transactions). | Low (Adds network hop delays and transaction blocking overhead directly inside the request thread). |
| Implementation Cost | High (Requires deploying Kafka Connect clusters, managing Debezium connectors, and handling raw log bytes). | Medium (Requires implementing an outbox relay engine and creating transactional hooks). | Very Low (Simply involves firing two client requests inside application controllers). |
| Schema Evolution Safety | Medium (Database DDL schema migrations can break CDC deserializer schemas if not mapped carefully). | High (The application explicitly controls the payload schema serializations). | High (The application manages and validates the mappings directly). |
Failure Modes and Fault Tolerance Strategies
1. Partial Failures in Bulk Ingestion Requests
Elasticsearch’s _bulk API does not fail transactional groups. If a batch of 5,000 documents contains 3 documents with bad datatype formats, the other 4,997 succeed while the 3 fail with individual error blocks.
- Resolution Strategy: The indexing worker must inspect the JSON response body of the bulk request. It must parse the
itemsarray, check for anystatusgreater than or equal to 400, extract the failed document IDs, and route them to a Dead Letter Queue (DLQ) for manual mapping triage, while acknowledging the successful offsets to Kafka.
2. Out-of-Order Version Overwrites (The Race Condition)
Under heavy network retries, a delayed old event (v1) arrives at the indexing worker after a newer update (v2) has already been processed. Writing the old event would silently corrupt the index.
- Resolution Strategy: Use Optimistic Concurrency Control. We map our database entity version to Elasticsearch's
versionparameter using external versioning:
# Pass the entity version to Elasticsearch.
# Only apply update if incoming version is strictly greater than the existing index version.
PUT /products_v2/_doc/p_123?version=42&version_type=external
{
"title": "Ergonomic Office Chair",
"price_minor": 29900
}
If an older update tries to write, Elasticsearch rejects it with an HTTP 409 Version Conflict exception, which the worker safely ignores.
3. Schema Mapping Exceptions
A field change throws a mapping exception, causing the indexing queue to stall.
- Resolution Strategy: Implement a strict validation interceptor in the worker fleet. Any JSON payload containing structural validation anomalies must bypass the active index path, logging the payload directly to a S3 quarantine bucket, rather than causing a queue-blocking failure loop.
Staff Engineer Perspective
The Perils of Dynamic Field Mapping
A major mistake made in early system design is leaving dynamic field mapping enabled in production. When a customer inputs a product title like "12345" (entirely numeric), Elasticsearch may dynamically auto-detect the field type as a long integer.
Subsequent documents containing standard text titles will be rejected with mapping conflict exceptions, taking down your ingestion pipeline.
To operate at a principal engineer standard, always set dynamic: strict inside your index blueprints. Every field type, tokenizer, and analyzer must be explicitly declared and version-controlled. If a worker attempts to write an unmapped field, the indexer must discard the unknown field and log it to a monitoring alert registry.
Managing Backpressure in the Ingestion Fleet
During massive traffic spikes (e.g., Black Friday), database commits skyrocket, saturating the Kafka buffer. If indexing workers blindly pull maximum batches, they will overwhelm the Elasticsearch cluster, triggering EsRejectedExecutionException failures.
- Mitigation: Implement consumer-side adaptive polling. The worker fleet must monitor the cluster queue size using the Elasticsearch nodes API:
GET /_nodes/stats/thread_pool. If write queue saturation exceeds 80%, the consumers must dynamically reduce their batch sizes and inject backpressure sleep intervals, allowing the search cluster to recover.
Verbal Script
Interviewer: "How would you design a highly reliable search indexing platform that keeps search data in sync with a primary SQL database under heavy traffic?"
Candidate:
"To design a highly reliable search indexing platform, I would implement an asynchronous, event-driven architecture using Change Data Capture (CDC) to completely decouple our transactional database write path from our search cluster write path.
When changes are committed to our primary Postgres database, a Change Data Capture connector like Debezium streams these raw transaction log bytes from the Write-Ahead Log (WAL) and publishes them as structured change events to a partitioned Kafka topic. We use the product ID as the Kafka partition key to ensure that all events for a specific product are processed in strict sequential order by a single consumer worker thread.
Our indexing workers consume these events in optimized bulk batches—typically 5,000 documents per batch. Instead of sending single indexing requests, they execute bulk operations against Elasticsearch using the _bulk API. To protect our indexing pipeline against out-of-order retries, we enforce Optimistic Concurrency Control by mapping the database's version column to Elasticsearch’s external versioning (version_type=external). This ensures that delayed older updates are automatically rejected with a 409 Conflict if a newer version has already been indexed.
For schema changes, we avoid rebuilding active indices in place. We configure versioned indices behind a logical Elasticsearch alias (e.g., products_current). When a mapping needs to evolve, we create a new index version, trigger a throttled backfill using read-replicas to avoid CPU starvation on the primary database, verify data integrity, and then execute an atomic, zero-downtime alias swap.
If a batch contains corrupt documents, our worker fleet parses the bulk response, routes the specific failed IDs to a Dead Letter Queue for isolation, and safely commits the remaining offsets, ensuring our ingestion queue never stalls."