Designing a messaging storage system capable of handling trillions of messages with sub-millisecond query latency is one of the most demanding challenges in modern data engineering. In a chat application like Discord, the database is subjected to append-heavy write patterns, irregular read bursts (e.g., users logging on and scrolling through historical logs), and extreme data skew.
In the early days, Discord successfully operated on MongoDB. However, as their scale exploded, the indices required for full-text search and historical message retrieval grew larger than the physical RAM of their instances. This triggered severe disk paging and cascading latency spikes.
To survive this growth, Discord executed a major architectural migration to Apache Cassandra, and eventually, to ScyllaDB. In this master-tier case study, we will deconstruct the low-level data models, sharding techniques, and C++ engine mechanics that power Discord's messaging infrastructure.
System Requirements and Goals
To scale a chat platform to trillions of messages, the database tier must be designed around strict operational parameters.
1. Functional Requirements
- Real-time Ingestion: Asynchronously ingest and persist messages sent by users in private channels (DMs) and massive public servers.
- Chronological Retrieval: Query historical messages sequentially by time, supporting infinite backward scrolling (pagination).
- High-Concurrency Reads: Support sudden spikes in read volume when a channel is mentioned or a celebrity posts.
- Message Lifecycle (CRUD): Support instant message editing and deletion with immediate consistency on the active channel feed.
2. Non-Functional Requirements
- Sub-Millisecond Ingestion Latency: P99 write ingestion latency must be strictly less than $15\text{ ms}$.
- Neutralize Extreme Data Skew: The database must prevent noisy neighbors or spiky channels (e.g., public communities with millions of active members) from exhausting the hardware resources of individual storage nodes.
- Fault-Tolerant, Masterless Architecture: No Single Point of Failure (SPOF). The storage cluster must withstand the loss of physical nodes or entire availability zones without dropping writes.
- Eliminate Latency Spikes (Stop-the-World Pauses): Avoid garbage collection memory management pauses that introduce unacceptable latency spikes across stateless backend services.
3. Sizing and Capacity Math
Let's conduct a capacity planning estimation for a large-scale messaging system:
- Average Write Ingestion Rate: $120,000$ messages per second (mps).
- Peak Write Ingestion Rate: $400,000$ mps.
- Average Message Size: $200\text{ Bytes}$ (uncompressed text + metadata).
- Daily Ingestion Storage Volume: $$\text{Daily Raw Storage} = 400,000 \text{ mps} \times 200 \text{ Bytes} \times 86,400 \text{ seconds} \approx 6.91 \text{ TB/day}$$
- High Availability Overhead (Replication Factor = 3): $$\text{Daily Replicated Storage} = 6.91 \text{ TB/day} \times 3 = 20.73 \text{ TB/day}$$
- LSM-Tree Write Amplification & Index Overhead (1.4x factor): $$\text{Total Storage Ingestion Rate} = 20.73 \text{ TB/day} \times 1.4 \approx 29 \text{ TB/day}$$
- Retention Policy: Indefinite storage retention. Over 5 years of operations, this cluster will accumulate: $$\text{5-Year Storage Size} = 29 \text{ TB/day} \times 365 \text{ days} \times 5 \text{ years} \approx 52.92 \text{ Petabytes}$$
API Design and Interface Contracts
Messaging interactions are driven by stateless API servers that communicate with ScyllaDB using low-level client drivers.
1. Send Message Endpoint
POST /v1/channels/{channel_id}/messages
Request Payload:
{
"content": "Distributed databases are fascinating!",
"nonce": "1234567890abcdef"
}
Response Payload (201 Created):
{
"id": "119852357400018944",
"channel_id": "8877665544332211",
"author_id": "4433",
"content": "Distributed databases are fascinating!",
"timestamp": "2026-05-23T08:06:14.123Z"
}
2. Retrieve Historical Messages
GET /v1/channels/{channel_id}/messages?limit=50&before=119852357400018944
Response Payload (200 OK):
{
"messages": [
{
"id": "119852356800014321",
"author_id": "5566",
"content": "Yes, sharding is key at scale.",
"timestamp": "2026-05-23T08:06:12.100Z"
}
],
"has_more": true,
"next_cursor": "119852356800014321"
}
High-Level Design Architecture
Centralized messaging requires decoupling client websocket connections, API request routing, and physical data storage nodes.
1. End-to-End Message Delivery Pipeline
graph TD
Client[User Client] -->|Publish Message| Gateway[Websocket Gateway Fleet]
Gateway -->|Forward Write Request| IngestService[Stateless Ingestion API]
subgraph "Durable Storage Tier"
IngestService -->|1. Calculate Bucket ID| ClientDriver[ScyllaDB Token-Aware Driver]
ClientDriver -->|2. Hash Route Write| ScyllaCluster[ScyllaDB Masterless Ring]
ScyllaCluster -->|Hot Node SSD| MemTable[Active MemTable]
MemTable -->|Async Flush| SSTable[SSTable disk files]
end
subgraph "Read & Telemetry Pipeline"
IngestService -->|3. Publish Event| Kafka[Apache Kafka Broker]
Kafka -->|4. Push Real-time update| Gateway
Gateway -->|5. Delivery| TargetClient[Subscribed Channel Clients]
end
%% Styles
style ScyllaCluster fill:#1a1c23,stroke:#10b981,stroke-width:2px,color:#fff
style Kafka fill:#1a1c23,stroke:#f59e0b,stroke-width:2px,color:#fff
2. Hash Ring Partitioning: Static vs. Bucketized Sharding
If we shard messages purely by channel_id (Static sharding), a massive public server like the Fortnite channel will route its entire multi-terabyte message partition to a single physical node on the consistent hashing ring, causing immediate CPU and I/O starvation.
By adopting Bucketized Sharding, the composite partition key (channel_id, bucket_id) splits a single channel's messages into hundreds of distinct partition buckets. The client driver hashes these composite keys, distributing the buckets randomly across distinct nodes in the cluster.
graph LR
subgraph "Static Sharding (Naive)"
ChanFortnite[Channel: Fortnite] -->|100% of data| Node1[(Scylla Node 1: Hotspots & Crash)]
ChanSmall[Channel: Small Chat] --> Node2[(Scylla Node 2)]
end
subgraph "Bucketized Sharding (Scylla Hashing)"
ChanFortniteBucket1[Fortnite, Bucket 1] -->|Hash key| NodeA[(Node A)]
ChanFortniteBucket2[Fortnite, Bucket 2] -->|Hash key| NodeB[(Node B)]
ChanFortniteBucket3[Fortnite, Bucket 3] -->|Hash key| NodeC[(Node C)]
end
style Node1 fill:#991b1b,stroke:#f87171,stroke-width:2px,color:#fff
style NodeA fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
style NodeB fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
style NodeC fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
Low-Level Design & Component Mechanics
To guarantee low latency and prevent unbounded partition growth, we configure custom CQL structures and implement client-side bucket parsing logic.
1. Database Schema DDL (ScyllaDB / Cassandra CQL)
The messaging keyspace uses masterless replication across three availability zones. Tables enforce strict compound primary keys.
CREATE KEYSPACE discord_messaging WITH replication = {
'class': 'NetworkTopologyStrategy',
'us-east-1a': 3,
'us-east-1b': 3,
'us-east-1c': 3
};
USE discord_messaging;
-- Messages Table with Bucketized Sharding
CREATE TABLE messages (
channel_id bigint,
bucket_id int,
message_id timeuuid,
author_id bigint,
content text,
attachments list<text>,
PRIMARY KEY ((channel_id, bucket_id), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC)
AND compaction = {
'class': 'SizeTieredCompactionStrategy',
'max_threshold': 32,
'min_threshold': 4
};
2. Client-Side Bucket Calculation & Sequential Query Code (TypeScript)
This TypeScript script demonstrates how to compute the bucket_id dynamically and retrieve historical messages across bucket transitions sequentially.
import { Client } from 'cassandra-driver';
const scyllaClient = new Client({
contactPoints: ['scylla-node1.internal', 'scylla-node2.internal'],
localDataCenter: 'us-east-1a',
keyspace: 'discord_messaging'
});
// A bucket represents a fixed partition chunk based on chronological Snowflake IDs.
// For our system design, each bucket retains exactly 100,000 messages.
const BUCKET_SIZE = 100000;
export function calculateBucketId(messageSnowflakeId: bigint): number {
// Convert 64-bit Snowflake ID (which embeds timestamp) to compute the sequential bucket ID
return Number(messageSnowflakeId / BigInt(BUCKET_SIZE));
}
interface MessageRow {
channel_id: string;
bucket_id: number;
message_id: string;
author_id: string;
content: string;
}
// Sequentially read historical messages across bucket boundaries
export async function queryChannelHistory(
channelId: bigint,
beforeMessageId: bigint,
limit: number = 50
): Promise<MessageRow[]> {
const resultMessages: MessageRow[] = [];
let currentBeforeId = beforeMessageId;
let activeBucket = calculateBucketId(currentBeforeId);
while (resultMessages.length < limit) {
const query = `
SELECT channel_id, bucket_id, message_id, author_id, content
FROM messages
WHERE channel_id = ? AND bucket_id = ? AND message_id < maxTimeuuid(?)
LIMIT ?
`;
const params = [channelId.toString(), activeBucket, currentBeforeId.toString(), limit - resultMessages.length];
const rs = await scyllaClient.execute(query, params, { prepare: true });
for (const row of rs.rows) {
resultMessages.push({
channel_id: row.get('channel_id'),
bucket_id: row.get('bucket_id'),
message_id: row.get('message_id').toString(),
author_id: row.get('author_id'),
content: row.get('content')
});
}
// If we fetched all matching rows in this bucket and still need more logs,
// we decrement the bucket ID to sequentially search the previous bucket.
if (rs.rows.length < (limit - resultMessages.length)) {
activeBucket--;
if (activeBucket < 0) break; // Reached beginning of channel existence
// Reset the currentBeforeId cursor to max range to capture the entire previous bucket
currentBeforeId = BigInt('18446744073709551615'); // Max unsigned 64-bit value
} else {
break;
}
}
return resultMessages;
}
Scaling Challenges & Production Bottlenecks
Operating a masterless wide-column storage ring at a multi-terabyte daily scale exposes several core physical bottlenecks:
1. LSM Compaction Storms & Write Amplification
ScyllaDB and Cassandra utilize Log-Structured Merge (LSM) Trees. Under heavy write ingestion, Memtables in RAM are constantly flushed to disk as read-only SSTables (Sorted String Tables). Because SSTables are immutable, updating or deleting a row creates redundant entries across multiple files.
To clean up old data, background compaction threads continuously merge SSTables, sorting and compacting them into larger sequential files.
graph TD
subgraph "LSM Storage Engine Compaction Loop"
Mem[Active MemTable in RAM] -->|1. Flush to disk| SST1[SSTable File 1]
Mem -->|2. Flush to disk| SST2[SSTable File 2]
SST1 & SST2 -->|3. Read and Merge| Compactor[Scylla Compaction Thread]
Compactor -->|4. Write Compacted File| SSTCompacted[Consolidated SSTable 3]
Compactor -->|5. Evict obsolete pages| DiskSpace[Freed Disk Capacity]
end
The Bottleneck: If a partition grows to several gigabytes (due to a missing bucket key), compacting this single file consumes massive amounts of CPU and disk I/O. During this time, the storage node's I/O queue saturates, dropping write throughput and causing P99 latencies to spike from $5\text{ ms}$ to over $5,000\text{ ms}$.
Mitigation (Size-Tiered Compaction & Bucketing):
- We configure Size-Tiered Compaction Strategy (STCS) and restrict our partition sizes to less than $100\text{ MB}$ using our dynamic bucket size limit. Because the partitions are small, compaction runs in milliseconds, eliminating resource contention.
2. The Stop-the-World GC Pause (Why C++ Beats Java)
Apache Cassandra is written in Java. Under heavy write loads, millions of transient object allocations saturate the Java Virtual Machine (JVM) heap. When the Garbage Collector runs out of space, it triggers a Stop-The-World (STW) pause, freezing all active database execution threads to clean up garbage.
The Bottleneck: During a 500ms GC pause, stateless backend microservices experience connection timeouts. They respond by retrying their writes, which further saturates the recovery queue, leading to a cascading service collapse.
Mitigation (The C++ Thread-per-Core Model): ScyllaDB solves this JVM limitation by rewriting the entire engine in C++ using the Seastar framework.
- Thread-per-core Architecture: ScyllaDB pins exactly one execution thread to each logical CPU core. It completely bypasses the Linux thread scheduler and kernel context switching.
- Direct DMA Memory Management: Memory is pre-allocated and divided evenly across cores. There is no shared memory heap and no lock contention between cores. This completely eliminates Garbage Collection pauses, delivering flat, sub-millisecond P99 write latencies under maximum load.
Technical Trade-offs & Strategic Compromises
Selecting a distributed storage pattern for messaging requires balancing transactional guarantees, query complexity, and operations cost.
| Storage Architecture | Read Latency (P99) | Write Throughput | Schema Evolution | Transactional Guarantees | Operational Cost |
|---|---|---|---|---|---|
| Relational Sharding (MySQL/Postgres) | Low | Medium (Lock contention) | Rigid / DDL migration | Strong ACID (CP) | High (Requires database proxies) |
| Masterless Wide-Column (ScyllaDB) | Ultra-Low (<5ms) | Max (LSM Appends) | Extremely Flexible | Eventual Consistency (AP) | Low (Flat storage utilization) |
| NewSQL (Google Spanner / CockroachDB) | Medium | Low-Medium (Distributed 2PC locks) | Rigid | Global Serializability (CP) | Extremely High (Network and compute overhead) |
The AP Posture Choice
For a chat application, we explicitly choose Availability & Partition Tolerance (AP) over Consistency (CP). If a network partition occurs between availability zones, our ScyllaDB cluster continues accepting message writes from both sides of the partition, resolving read consistency gaps eventually through Read Repairs and background Entropy Synchronization. We compromise on strict real-time consistency to guarantee that users can always send messages.
Failure Scenarios and Fault Tolerance
Masterless ring storage is designed to survive hardware failures.
1. Partition Tombstone Saturation
In wide-column stores, deleting a message does not instantly erase it from disk. Instead, the database writes a marker called a Tombstone over the old record.
When a user scrolls through a channel's history, ScyllaDB must scan the index, reading through these tombstones. If a bot writes and deletes 100,000 messages in a channel, a query for "the last 50 messages" will force the database node to read 100,000 tombstones in memory.
Fault Tolerance Strategy:
- We configure a strict threshold:
tombstone_failure_threshold = 100000. If a query attempts to read more than 100,000 tombstones, ScyllaDB immediately aborts the query to prevent JVM/Thread memory exhaustion. - We run background Tombstone Compaction loops to merge and purge tombstones once their Time-To-Live (TTL) expiry has passed.
2. Node Failures and Active Anti-Entropy
If one of our ScyllaDB nodes crashes during peak write ingestion, our Masterless write path remains operational.
sequenceDiagram
participant Client as API Client Driver
participant NodeA as Storage Node A (Leader)
participant NodeB as Storage Node B (Replica)
participant NodeC as Storage Node C (Replica - Crashed!)
Client->>NodeA: 1. Write Message (Quorum)
NodeA->>NodeB: 2. Replicate Message
Note over NodeA,NodeC: Node C is offline!
NodeA--xNodeC: 2. Replicate Message (Fails)
NodeA->>NodeA: 3. Write Hinted Handoff to local disk
NodeA-->>Client: 4. Write Confirmed (Quorum Met 2/3)
Note over NodeC: Node C boots back online
NodeA->>NodeC: 5. Playback Hinted Handoff
NodeC->>NodeC: 6. Apply missed writes
Fault Tolerance Strategy:
- Hinted Handoffs: When a replica node is offline, the writing node stores the write payload locally on disk as a "Hint." Once the crashed node boots back online, the writing node plays back these hints, restoring consistency.
- Anti-Entropy Merkle Trees: During idle periods, nodes exchange cryptographic Merkle Trees (tree hashes of their stored partition data) to quickly identify and sync divergent data chunks without transferring full datasets over the network.
Staff Engineer Perspective
Verbal Script & Mock Interview
Mock Interview Dialogue
Interviewer: "Welcome! Let's jump into a classic distributed storage problem: How would you design the storage architecture for a messaging platform like Discord that handles trillions of messages? Specifically, how do you handle data skew caused by massive public channels without degrading query latencies?"
Candidate: *"To scale a messaging platform to trillions of messages while neutralizing the data skew of public channels, I would adopt a masterless, wide-column database architecture like ScyllaDB.
If we use a naive data model where the partition key is simply channel_id, a high-traffic community like the Fortnite channel will force its entire message history—potentially terabytes of data—onto a single physical storage node. This triggers severe write hotspots and catastrophic LSM compaction storms that exhaust host disk I/O.
To prevent this, I would implement Bucketized Sharding. I will design a composite partition key: ((channel_id, bucket_id), message_id). The bucket_id represents a sequential block of messages based on their Snowflake timestamp. For high-volume channels, a bucket could represent 100,000 messages.
When a bucket fills up, the application increments the bucket_id and starts writing to a new partition. Because the partition key is a hash of both channel_id and bucket_id, our consistent hashing algorithm scatters these partition buckets randomly across every node in our ScyllaDB ring. This completely eliminates write hotspots by distributing the load of our massive channels evenly across our entire cluster."*
Interviewer: "Excellent. But by scattering a single channel's messages across multiple physical nodes, you've compromised on read latency. When a user opens a channel, how do you fetch their chat history without conducting a slow, expensive scatter-gather query across all nodes?"
Candidate: *"We explicitly avoid scatter-gather queries by keeping our query routing token-aware on the client-side. When a user opens a channel, our application service knows the current timestamp and Snowflake ID sequence. It calculates the active bucket_id for that channel and queries exactly that partition.
If the active partition contains fewer messages than the user requested (e.g., the user wants 50 messages, but the current bucket only has 10), the application client driver uses non-blocking asynchronous calls to query the previous sequential bucket_id partition (bucket_id - 1).
Because we calculate the exact bucket keys on the client-side, we can execute highly targeted O(1) point-lookup reads directly from the specific replica nodes on the hash ring, preserving sub-millisecond read latencies."*
Interviewer: "That is a very clean read-path optimization. How do you handle message deletions in this wide-column setup? What operational issues can arise from high delete rates?"
Candidate: *"In LSM-Tree engines, deletions do not physically erase records immediately; they write a marker called a Tombstone. When our query parser scans a partition, it must load these tombstones into memory to determine if a message was deleted.
If a channel experiences a high volume of deletions—for example, due to spam cleanup bots—a query for the latest messages will scan thousands of tombstones, exhausting JVM/thread memory and causing write timeouts.
To prevent this, I would establish two safeguards:
First, we configure ScyllaDB's tombstone_failure_threshold to 100,000, aborting queries that attempt to scan beyond this limit to protect the node's memory.
Second, we optimize our delete workflows by isolating them into separate time-bounded buckets. Once a bucket is completely dead-lettered or empty, we can drop the entire partition index rather than compiling individual tombstones, which completely bypasses the LSM merge and compact overhead."*
Interviewer: "Perfect. That shows an outstanding grasp of LSM engine mechanics and real-world system resilience. Let's proceed to the next phase!"