In a monolithic architecture, inspecting application logs is simple: you SSH into the single host machine and run tail -f /var/log/app.log.
In a modern, highly distributed system composed of thousands of microservices running across hundreds of containerized nodes, this approach is impossible. When a critical transaction fails across a distributed call graph, logs are scattered across distinct volatile container filesystems. Centralized log collection is not merely a debugging convenience; it is a vital operational necessity.
Designing a platform to collect, parse, index, and store terabytes of log data every single day introduces significant engineering challenges. In this system design case study, we will architect a resilient, highly available, and cost-effective distributed logging platform capable of handling 10+ Terabytes of logs per day with near-real-time search capabilities.
System Requirements and Goals
To build a logging infrastructure at a multi-terabyte daily scale, we must define concrete boundaries for throughput, latency, reliability, and retention.
1. Functional Requirements
- Centralized Log Ingestion: Ingest logs seamlessly from containerized workloads, bare-metal servers, and serverless compute units.
- Structured & Unstructured Parsing: Automatically parse structured logs (e.g., JSON) and extract metadata from unstructured text (e.g., Syslog, Apache Access Logs) using regular expression grok patterns.
- Full-Text Search: Users can perform arbitrary text searches, field-specific filtering (e.g.,
service_name = "payment-service"), log-level filtering (severity = "ERROR"), and trace-based correlation (trace_id = "tr_abc123"). - Real-Time Analytics & Alerting: Power visual dashboards (e.g., Kibana, Grafana) and trigger alerts on specific error thresholds or log patterns (e.g., Alert if "Out of Memory" occurs > 5 times in 1 minute).
- Historical Replay & Auditing: Enable developers and auditors to retrieve historical log records from cold archives for up to 365 days.
2. Non-Functional Requirements
- High Write Throughput: Reliably support an ingestion volume of 10 Terabytes of raw logs per day.
- Near Real-Time (NRT) Searchability: Newly ingested logs must be index-analyzed and fully searchable in the query console within 5 seconds of generation.
- Isolation (Zero Application Interference): Log collection must be non-blocking. If the central logging platform experiences an outage or latency spike, it must never degrade the performance of the core application runtimes.
- High Availability & Durability: Maintain at least $99.99%$ ingestion availability. Once a log is committed, it must never be lost.
- Storage Cost-Effectiveness: Raw text logs are expensive to store long-term. The storage subsystem must use intelligent compression and lifecycle tiers to minimize costs.
3. Capacity Estimation & Scalability Math
Let's calculate the system limits for a $10 \text{ TB/day}$ log ingestion workload:
- Average Log Message Size: $500 \text{ Bytes}$ (uncompressed).
- Total Daily Log Lines: $$\text{Daily Logs} = \frac{10 \text{ TB}}{500 \text{ Bytes}} = \frac{10 \times 10^{12}}{500} = 20,000,000,000 \text{ lines (20 Billion / day)}$$
- Average Ingestion Throughput: $$\text{Throughput}{\text{avg}} = \frac{20 \text{ Billion logs}}{86,400 \text{ seconds}} \approx 231,480 \text{ logs/sec}$$ $$\text{Data Rate}{\text{avg}} = \frac{10 \text{ TB}}{86,400 \text{ seconds}} \approx 115.7 \text{ MB/sec}$$
- Peak Load Factor (3x spiky traffic): $$\text{Throughput}{\text{peak}} \approx 700,000 \text{ logs/sec}$$ $$\text{Data Rate}{\text{peak}} \approx 350 \text{ MB/sec}$$
- Elasticsearch Index Expansion Factor: Storing documents in an inverted index (Lucene) adds a metadata overhead of $1.3x$ to $1.5x$ the raw size. Thus, $10 \text{ TB}$ of raw logs requires up to $15 \text{ TB}$ of indexed storage.
- High Availability Storage Overhead (1 Primary + 1 Replica): $$\text{Daily Storage Space} = 15 \text{ TB} \times 2 = 30 \text{ TB/day}$$ To retain logs for 7 days in the Hot Tier, we require $210 \text{ TB}$ of high-speed NVMe SSDs.
API Design and Interface Contracts
Logging operates primarily on asynchronous streaming, but both shippers and users interact via standardized API structures.
1. Ingestion Agent Delivery Contract
Log collectors (e.g., Vector, Filebeat) deliver logs in highly compressed batches to the ingestion load balancers.
POST /v1/logs/ingest
Request Headers:
Content-Type: application/json
Content-Encoding: gzip
X-Log-Shipper-Token: token_infra_devops_99
Request Payload:
{
"shipper_metadata": {
"host_id": "i-09ab7d6f5e",
"cluster_name": "kubernetes-prod-us-east-1",
"agent_version": "vector-0.34.0"
},
"log_records": [
{
"timestamp": "2026-05-23T08:06:14.123Z",
"stream": "stdout",
"raw_message": "2026-05-23 08:06:14 [payment-service] ERROR tr_abc123 usr_456 - Database transaction timeout during capture.",
"container_id": "c_pay_88f9"
},
{
"timestamp": "2026-05-23T08:06:14.125Z",
"stream": "stderr",
"raw_message": "{\"level\":\"info\",\"msg\":\"Garbage collection cycle completed in 12ms\",\"service\":\"payment-service\"}",
"container_id": "c_pay_88f9"
}
]
}
2. Search & Query API Contract
Internal systems, alerting engines, and UI platforms execute structured searches against the storage clusters.
POST /v1/logs/search
Request Payload:
{
"query": {
"full_text": "database transaction timeout",
"filters": [
{ "field": "service", "operator": "EQUALS", "value": "payment-service" },
{ "field": "level", "operator": "EQUALS", "value": "ERROR" }
],
"time_range": {
"start": "2026-05-23T07:00:00Z",
"end": "2026-05-23T08:06:00Z"
}
},
"pagination": {
"limit": 100,
"cursor": "c3Bvb2xfMTAwX2V2dF85OWE="
}
}
Response Payload (200 OK):
{
"total_hits": 12,
"next_cursor": "c3Bvb2xfMTIwX2V2dF85OWI=",
"logs": [
{
"id": "log_88a9d2c1",
"timestamp": "2026-05-23T08:06:14.123Z",
"level": "ERROR",
"service": "payment-service",
"trace_id": "tr_abc123",
"user_id": "usr_456",
"message": "Database transaction timeout during capture.",
"metadata": {
"host": "i-09ab7d6f5e",
"container_id": "c_pay_88f9"
}
}
]
}
High-Level Design Architecture
Centralized logging at TB/day scale must decouple log collection, message buffering, transformation/enrichment, indexing, and visualization.
graph TD
subgraph "Application & Node Level"
AppA[App Pod A] -->|stdout/stderr| LocalDiskA[Container Log File]
AppB[App Pod B] -->|stdout/stderr| LocalDiskB[Container Log File]
LogAgent[Vector / Fluentbit DaemonSet] -->|Tail File & Buffer on Disk| LocalDiskA
LogAgent -->|Tail File & Buffer on Disk| LocalDiskB
end
subgraph "Buffering & Streaming Tier"
LogAgent -->|1. Partitioned Ingestion| KafkaBroker[Apache Kafka Cluster]
KafkaDLQ[Kafka Dead-Letter Log Topic]
end
subgraph "Transformation & Indexing"
Indexer[Logstash / Vector Indexing Consumers] -->|2. Pull Logs| KafkaBroker
Indexer -->|3. Route Errors| KafkaDLQ
end
subgraph "Search & Tiered Storage Tier"
Indexer -->|4. Index Structured Docs| ESSearch[Elasticsearch Cluster]
subgraph "Hot Storage Tier"
ESHot[Hot Data Nodes NVMe SSDs <7 Days]
end
subgraph "Warm Storage Tier"
ESWarm[Warm Data Nodes HDDs 8-30 Days]
end
subgraph "Cold Archival Tier"
S3Archive[S3 / GCS Object Store 1 Year]
end
ESSearch --> ESHot
ESHot -->|5. ILM Shard Migration| ESWarm
ESWarm -->|6. Snapshot Archival| S3Archive
end
subgraph "Observability & Access"
Kibana[Kibana Query Engine] -->|7. Search API| ESSearch
OpsUser[SRE / Developer Console] -->|8. Visualise Logs| Kibana
end
%% Colors & Layout
style KafkaBroker fill:#1a1c23,stroke:#f59e0b,stroke-width:2px,color:#fff
style Indexer fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
style ESSearch fill:#0f172a,stroke:#10b981,stroke-width:2px,color:#fff
style S3Archive fill:#1e293b,stroke:#64748b,stroke-width:2px,color:#fff
Architectural Data Flow Steps
- DaemonSet Log Collection: Instead of configuring microservices to write logs directly to the network (which introduces blocking latency), applications write logs asynchronously to standard output (
stdout/stderr). A lightweight log agent (e.g., Vector or FluentBit) runs as a DaemonSet on each physical host, mounting the local directory containing container log files. - Buffer Tier (Apache Kafka): The log shipper writes raw log lines into an Apache Kafka cluster. Kafka acts as an essential backpressure absorber. If our indexing engine experiences a write spike or an outage, logs build up safely in Kafka's disk-backed replicated log partitions rather than crashing or blocking the application containers.
- Indexing Tier: Dedicated consumer pools (Logstash or highly optimized Go/Rust consumers) subscribe to Kafka log topics. These workers read logs, parse raw text into structured JSON, perform geolocation Lookups, map trace correlation IDs, and compile batch payloads.
- Distributed Search & Indexing Engine (Elasticsearch / ClickHouse): The parsed and enriched JSON documents are sent in bulk batches (e.g., 5MB sizes) to the Elasticsearch cluster, where they are ingested, written to a transaction log (translog), and written to Lucene segments to become searchable.
Low-Level Design & Component Mechanics
To scale database write capacity and prevent index bloat, we implement strict schema mappings and non-blocking worker processing.
1. Elasticsearch Index Mapping Template Configuration
Without an explicit mapping template, Elasticsearch automatically infers schema types. A developer logging a random key-value pair could trigger a "Mapping Explosion," causing the master nodes to run out of memory. We enforce strict templates with wildcard exceptions for user-custom attributes.
{
"index_patterns": ["logs-app-*"],
"template": {
"settings": {
"index": {
"number_of_shards": 12,
"number_of_replicas": 1,
"refresh_interval": "5s",
"codec": "best_compression"
}
},
"mappings": {
"dynamic": "strict",
"properties": {
"timestamp": { "type": "date" },
"service": { "type": "keyword" },
"level": { "type": "keyword" },
"trace_id": { "type": "keyword" },
"span_id": { "type": "keyword" },
"user_id": { "type": "keyword" },
"message": {
"type": "text",
"analyzer": "standard",
"norms": false
},
"host": {
"properties": {
"name": { "type": "keyword" },
"ip": { "type": "ip" }
}
},
"custom_metadata": {
"type": "flattened"
}
}
}
}
}
2. High-Performance Log Ingestion & Batching Agent (TypeScript)
This script simulates the consumer worker logic that consumes logs from Kafka, parses them safely, and writes them in batches to the Elasticsearch index.
import { Client } from '@elastic/elasticsearch';
interface UnparsedKafkaLog {
offset: string;
partition: number;
value: string; // Raw unparsed log string
}
interface StructuredLog {
timestamp: string;
service: string;
level: string;
trace_id?: string;
message: string;
custom_metadata: Record<string, string>;
}
const esClient = new Client({ node: 'https://es-cluster.internal:9200' });
// Non-blocking log parser and enrichment engine
export function parseAndFormatLog(rawLog: string, defaultService: string): StructuredLog {
const now = new Date().toISOString();
// 1. Attempt JSON parsing (Structured App logs)
try {
const parsed = JSON.parse(rawLog);
return {
timestamp: parsed.timestamp || now,
service: parsed.service || defaultService,
level: (parsed.level || 'INFO').toUpperCase(),
trace_id: parsed.traceId || parsed.trace_id,
message: parsed.message || parsed.msg || '',
custom_metadata: parsed.metadata || {}
};
} catch {
// 2. Fallback to regex grok parsing for unstructured standard format
// Pattern: TIMESTAMP [SERVICE] LEVEL TRACE_ID MESSAGE
const regex = /^([\d-T:.Z\s]+)\s+\[([^\]]+)\]\s+(INFO|WARN|ERROR|DEBUG)\s+([^\s]+)?\s+-\s+(.*)$/;
const match = rawLog.match(regex);
if (match) {
return {
timestamp: match[1].trim(),
service: match[2],
level: match[3],
trace_id: match[4] === 'null' ? undefined : match[4],
message: match[5],
custom_metadata: {}
};
}
// 3. Last resort fallback: Store as raw unparsed message
return {
timestamp: now,
service: defaultService,
level: 'UNKNOWN',
message: rawLog,
custom_metadata: { parsing_failed: 'true' }
};
}
}
// Thread-safe batch flush executor
export async function flushLogBatch(batch: UnparsedKafkaLog[], defaultService: string): Promise<void> {
if (batch.length === 0) return;
const esOperations = [];
for (const rawRecord of batch) {
const parsed = parseAndFormatLog(rawRecord.value, defaultService);
// Define the index operation target (Time-based Indexing: logs-app-YYYY-MM-DD)
const indexDate = parsed.timestamp.split('T')[0];
const indexName = `logs-app-${indexDate}`;
esOperations.push({ index: { _index: indexName } });
esOperations.push(parsed);
}
try {
const response = await esClient.bulk({
refresh: false, // CRITICAL: NEVER force a refresh synchronously on indexation; let lucene write asynchronously.
operations: esOperations
});
if (response.errors) {
console.error(`Bulk insert completed with errors. Evaluated: ${response.items.length} records.`);
// Extract specific item failure logs here for Dead-Letter routing...
}
} catch (err) {
console.error('Failed to dispatch bulk insert to Elasticsearch cluster: ', err);
throw err; // Propagate exception to trigger Kafka partition retry or commit-rollbacks
}
}
3. Data Tiering & Lifecycle Management
Logs have high value immediately after creation but depreciate exponentially over time. Storing old logs on expensive NVMe SSDs wastes millions of dollars. We partition our data lifecycle into three tiers:
stateDiagram-v2
[*] --> HotTier : Ingestion (NVMe SSD)
note right of HotTier
- High Write Throughput
- Query Searchable <5s
- Retain: 1 to 7 Days
end note
HotTier --> WarmTier : Shard Relocation (HDD)
note right of WarmTier
- Read-Only Indices
- Query Searchable (Slower)
- Retain: 8 to 30 Days
end note
WarmTier --> ColdTier : Snapshot Archival (S3 Object Storage)
note right of ColdTier
- Frozen snapshot blocks
- Deep Search / Restore required
- Retain: 31 to 365 Days
end note
ColdTier --> [*] : Automated TTL Deletion
- Hot Tier (1-7 Days): All index operations write to NVMe SSD instances. Indexing refresh interval is set to 5 seconds. Each index is split into 12 shards, distributed evenly across hot data nodes.
- Warm Tier (8-30 Days): Indices are marked read-only, consolidated using segment-merge actions (
force_mergeto 1 segment per shard to release file descriptor overhead), and migrated to cheaper HDD nodes. - Cold Tier (31-365 Days): Logs are snapshotted into compressed block formats and moved to AWS S3 or Google Cloud Storage. Searching these logs requires mount restoration, trading query latency for extremely low costs.
Scaling Challenges & Production Bottlenecks
Ingesting millions of events per second inevitably hits physical computing bottlenecks. Below are the core real-world issues we must mitigate.
1. The Logging Infinite Echo Feedback Loop
A critical outage in our downstream indexing engine causes our log parser to throw database connection errors. If the log parser logs its database connection error to its own collection queue, this action will enqueue another error log, which throws another connection failure, causing an immediate Infinite Logging Loop. This will generate millions of log messages in seconds, crashing the local disc and exhausting all network interfaces.
graph LR
Indexer[Log Indexer Worker] -->|1. Index Failure| ES[Elasticsearch Node]
Indexer -.->|2. Logs Error message| AppLogger[Logging Library]
AppLogger -->|3. Enqueue failure event| Kafka[Kafka Log Broker]
Kafka -->|4. Consume & Attempt rewrite| Indexer
Mitigation:
- Strict Routing Isolation: Logging agents must never log their own errors to the high-scale pipeline they manage.
- Local Dropping / Stdout Suppression: Ensure all logging engines running on logging components direct their logs only to local file destinations with strict rotation limits (e.g., maximum 50MB) or immediately drop logging statements below the
WARNINGseverity level.
2. Elasticsearch Mapping & Segment Merging Bottlenecks
Elasticsearch handles writes by compiling memory buffers into search-active segments. Every time a refresh is triggered, a new segment is created. A high volume of small writes creates millions of tiny Lucene segments, exhausting file handles and saturating the JVM CPU during merge operations.
Mitigation:
- Large Bulk Batching: Enforce client-side buffering. Workers must batch writes until they hit either $5\text{ MB}$ in size or $2 \text{ seconds}$ in age.
- Tuning Refresh Interval: Increase
index.refresh_intervalfrom the default1sto5sor15s. This trades a small search latency window for a massive boost in write throughput. - Disabling Sync Translog Writes: Set
index.translog.durabilitytoasync. This decouples every HTTP request from waiting for physical disk syncs, buffering commits in memory with a periodic 5-second disk write interval.
3. GC Pause Interruptions and JVM Heap Sizing
Elasticsearch is written in Java and is highly susceptible to Stop-the-World garbage collection pauses. If a garbage collection pause exceeds 10 seconds, master nodes might assume the node is dead, triggering shard rebalancing. This creates a cascading failure across the cluster.
Mitigation:
- Heap Sizing Rule of Thumb: Allocate exactly $50%$ of physical RAM to the JVM Heap, capping it at a maximum of $32\text{ GB}$. Why? Beyond 32GB, the JVM loses "Compressed Ordinary Object Pointers" (Compressed OOPs), which actually increases memory consumption and decreases GC efficiency.
- Garbage Collector Tuning: Use the G1 Garbage Collector (or ZGC on modern JVM runtimes) and explicitly set target pause times:
-XX:MaxGCPauseMillis=200
Technical Trade-offs & Strategic Compromises
Choosing the storage engine for your log system changes your cost profile and query capability.
| Technology Choice | Index Overhead | Arbitrary Query Latency | Storage Cost Profile | Operational Complexity | Use-Case Suitability |
|---|---|---|---|---|---|
| Elasticsearch / OpenSearch | High (Up to 1.5x raw size) | Ultra-Low (<100ms full-text) | Expensive (Needs SSDs, high memory) | High (Requires active cluster management) | Deep ad-hoc searches, security analysis, debugging systems |
| Grafana Loki | Zero (Indexes metadata only) | Very High (Brute-force scan on content) | Cheap (Stores raw chunks in S3/Object Storage) | Low | Simple trace correlations, host log tailing |
| ClickHouse (Columnar DB) | Medium (~0.3x raw size) | Low-Medium (Column-aggregate queries) | Low (Excellent native compression) | Medium | Analytical logging, metrics aggregation, high-throughput audits |
Strategic Decision: Loki vs Elasticsearch
For a payment platform, the engineering team opted for Elasticsearch. While Grafana Loki is significantly cheaper to operate because it does not create inverted indexes on log contents, it struggles when developers need to execute ad-hoc searches across millions of log records containing specific customer IDs or error variables. The high investment in Elasticsearch infrastructure pays for itself in faster incident resolution times.
Failure Scenarios and Fault Tolerance
Designing for resilience means assuming every component in the ingestion pipeline will experience an outage.
1. Consumer Group Rebalancing Storms in Kafka
When an indexer node crashes or is scaled out, Kafka pauses consumption to redistribute partitions across the remaining workers. During high ingestion, a slow node can trigger consecutive timeouts, causing a Rebalancing Storm that stalls the pipeline.
Fault Tolerance Strategy:
- Set
max.poll.interval.msto a high limit (e.g., 300,000ms / 5 minutes) to give workers enough time to process and flush large bulks before Kafka assumes they are dead. - If a bulk write fails due to temporary Elasticsearch congestion, the indexer must write the batch to a local disk spool and continue consuming from Kafka. This avoids holding up the partition consumption offset.
2. Shard Allocation Failure & Split-Brain Resilience
In an active cluster, if networking between nodes is severed, two distinct partitions might both assume they are the master cluster, leading to divergent log states (split-brain).
Fault Tolerance Strategy:
- We use a modern Elasticsearch/OpenSearch engine that relies on consensus algorithms (Raft-like Zen Discovery) requiring a quorum of master-eligible nodes.
- Deploy master-eligible nodes across exactly three separate physical Availability Zones (AZs) behind active health checkers, guaranteeing that a single zone outage cannot disrupt the master quorum.
Staff Engineer Perspective
Verbal Script & Mock Interview
Mock Interview Dialogue
Interviewer: "We expect our logging system to ingest 10 Terabytes of logs daily. During peak traffic, how would you prevent Elasticsearch write saturation from delaying our real-time log ingestion?"
Candidate: *"To protect Elasticsearch from write saturation during traffic peaks, we decouple log collection from log indexing using Apache Kafka as a highly durable buffer. Shippers write log batches to Kafka partitions keyed by service_name. The indexing workers consume from Kafka using a pull model, pulling logs in batches of up to 5MB or 2,000 records.
If Elasticsearch becomes saturated, its indexing queues will fill up, and it will return an HTTP 429 Too Many Requests or slow down its responses. Instead of failing or blocking the application, our indexing workers pause their Kafka consumption, buffer the logs locally on disk, and apply exponential backoff. Once Elasticsearch catches up and recovers, the workers resume consumption, draining the Kafka queues at a sustainable rate."*
Interviewer: "Excellent. How would you configure Elasticsearch sharding for this volume? How many shards do we need, and what sizing guidelines are you following?"
Candidate: *"For log indices, the industry standard guideline is to keep primary shard sizes between $30\text{ GB}$ and $50\text{ GB}$. Shards that are too small create high metadata overhead for the master nodes, while shards that are too large make node recovery and shard rebalancing slow.
At $10 \text{ TB}$ of raw logs per day, with an index expansion factor of $1.4x$, we ingest about $14 \text{ TB}$ of indexed data daily. To keep shard sizes at around $40\text{ GB}$ each: $$\text{Number of Shards} = \frac{14,000 \text{ GB}}{40 \text{ GB}} \approx 350 \text{ primary shards per day}$$
We configure time-based indices (e.g., logs-app-YYYY.MM.DD) with 35 primary shards and 1 replica shard per index, and use Elasticsearch Index Lifecycle Management (ILM) to automatically rollover indices if they reach $50\text{ GB}$ or 24 hours of age."*
Interviewer: "What happens if a developer logs a massive JSON object that exceeds the maximum document size limit of Elasticsearch?"
Candidate: *"Elasticsearch has a default maximum document size limit of $100\text{ MB}$. A log of that size is highly unusual and typically indicates a thread dump or an infinite loop printing data.
To prevent a single malformed log from blocking our indexing pipelines, the indexing consumer wraps the bulk write operation in a try-catch block. If Elasticsearch rejects a specific document due to size or mapping conflicts, our consumer extracts that specific record, writes it to a Dead-Letter Queue (DLQ) in Kafka, and submits the rest of the batch. SREs can monitor the DLQ topic and inspect the payload without stalling the main log pipeline."*
Interviewer: "Perfect. That is a highly resilient design. I have no further questions!"