Lesson 68 of 70 18 minPractice Bank

System Design: Solving the Top K Problem (Heavy Hitters)

How does YouTube track trending videos or Twitter find trending hashtags in real-time? Learn about the Top K problem, Count-Min Sketch, and heavy hitters at scale.

Reading Mode

Hide the curriculum rail and keep the lesson centered for focused reading.

Key Takeaways

  • **YouTube:** The top 10 trending videos in the last hour.
  • **Twitter:** The top 50 trending hashtags globally.
  • **E-commerce:** The top selling products across all categories.

Premium outcome

A curated practice bank for turning patterns into repetition.

Candidates who already know the fundamentals and need sharper repetition.

You leave with

  • A deeper bench of solved problems organized by core interview patterns
  • Faster recall under time pressure from deliberate, categorized practice
  • A stronger bridge between theory lessons and real question execution

Tracking trending hashtags globally in real-time on Twitter, displaying the top 10 trending videos on YouTube, or calculating the most frequently purchased items on Amazon requires solving a classic distributed systems problem: The Top K Problem (also known as the Heavy Hitters problem).

At a small scale, finding the most frequent items is trivial: you maintain a hash map of item_id -> count in memory and sort it. However, when you scale this to millions of events per second and billions of distinct items (e.g., search queries, video clicks, or tweets), this naive approach collapses. A hash map of that size cannot fit into a single server's RAM, and writing every event to a relational database to run SQL COUNT and ORDER BY queries creates catastrophic write saturation.

To design a truly resilient distributed Top K system, engineers must transition from exact-count paradigms to probabilistic approximations. This guide architectures a highly scalable, real-time, and memory-bounded distributed Top K pipeline. We will explore the mathematics of the Count-Min Sketch probabilistic data structure, Flink stream partitioning, and time-decayed sliding aggregation windows.


System Requirements

To solve the Heavy Hitters problem at a multi-million-events-per-second scale, we must establish strict functional and non-functional engineering boundaries.

Functional Requirements

  • Real-Time Trending List: Dynamically output the top $K$ (e.g., top 100) most frequent items over a sliding time window (e.g., last 60 minutes, updated every 10 seconds).
  • Frequency Query API: Expose a high-throughput endpoint for users or internal services to query the estimated frequency of any specific item in the current window.
  • Support Time-Decayed Scaling: Ensure newer events contribute more to the "trending" score than older events, preventing old viral videos from permanently blocking new trends.
  • Multi-Category Filtering: Support tracking top $K$ items within specific categories (e.g., top gaming videos vs. top news videos) without duplicating the ingestion tier.

Non-Functional Requirements

  • High Scale & Throughput: Ingest and process a stream of up to $500,000$ events per second with sub-second processing latency.
  • Bounded Memory Footprint: Enforce a strict memory boundary (e.g., less than 2GB RAM) for the core processing nodes, regardless of how many billions of distinct items flow through the system.
  • Controlled Approximation Error: Accept a small, mathematically bounded margin of error in exchange for massive storage savings. The estimation error must be within a configurable tolerance $\epsilon$ with a confidence probability $1 - \delta$.
  • High Availability & Fault Tolerance: The system must survive partition crashes and process events with at-least-once or exactly-once semantics.

API Design and Interface Contracts

The platform exposes external REST endpoints for clients to query trends, and internal streaming events for ingest.

1. Retrieve Current Top K Hashtags

  • Endpoint: GET /v1/trends/hashtags?limit=10&category=sports

Response Payload (JSON - 200 OK):

{
  "window_end": "2026-05-23T08:06:14Z",
  "window_size_seconds": 3600,
  "category": "sports",
  "top_k": [
    { "hashtag": "superbowl", "estimated_count": 85430, "rank": 1 },
    { "hashtag": "championsleague", "estimated_count": 72100, "rank": 2 },
    { "hashtag": "f1", "estimated_count": 54200, "rank": 3 }
  ]
}

2. Query Specific Item Frequency

  • Endpoint: GET /v1/trends/hashtags/estimate?hashtag=systemdesign

Response Payload (JSON - 200 OK):

{
  "hashtag": "systemdesign",
  "estimated_count": 4820,
  "confidence": 0.99,
  "error_bound_epsilon": 0.0001,
  "timestamp": "2026-05-23T08:06:20Z"
}

3. Error Contract

If the client provides an invalid limit or unsupported category:

  • Response Payload (JSON - 400 Bad Request):
{
  "error": "INVALID_PARAMETER",
  "message": "Limit must be between 1 and 100, and category must be a registered alphanumeric slug.",
  "timestamp": "2026-05-23T08:06:22Z"
}

High-Level Architecture

To process millions of events per second in real-time, the pipeline decouples data collection, partition-based streaming aggregation, and global merging.

The ingestion layer acts as a buffer. Web application servers capture events (e.g., a user publishing a tweet) and write them to Apache Kafka. Kafka partitions the events by a hashed version of the hashtag to ensure high parallelization. Flink worker nodes ingest these streams, perform local Count-Min Sketch updates, and maintain their own local Min-Heaps.

Every 10 seconds, instead of pushing millions of raw events across the network, the workers flush only their local Top K lists to the Global Merging Service. This service sums the occurrences across workers and persists the final Top K list into a highly optimized Redis Sorted Set (ZSET) cache for instant API Gateway retrieval.

graph TD
    %% Ingest Flow
    Client[Client Tweets] -->|1. Ingest Event| Kafka[Apache Kafka Broker]
    
    subgraph "Flink Partitioned Processing Tier"
        Kafka -->|2. Route by hash key| WorkerA[Flink Worker Node A]
        Kafka -->|2. Route by hash key| WorkerB[Flink Worker Node B]
        
        WorkerA -->|3. Local sketch update| SketchA[Local Count-Min Sketch A]
        WorkerA -->|4. Local top list| HeapA[Local Min-Heap Priority Queue]
        
        WorkerB -->|3. Local sketch update| SketchB[Local Count-Min Sketch B]
        WorkerB -->|4. Local top list| HeapB[Local Min-Heap Priority Queue]
    end
 
    subgraph "Global Aggregator Tier"
        WorkerA -->|5. Push local Top K every 10s| CentralAggregator[Global Merging Service]
        WorkerB -->|5. Push local Top K every 10s| CentralAggregator
        
        CentralAggregator -->|6. Write final Top K| RedisCache[(Redis Sorted Set)]
    end
 
    %% Visualisation
    API[API Gateway] -->|Query| RedisCache
    SRE[SRE Dashboard] --> API
 
    %% Styles
    style Kafka fill:#1a1c23,stroke:#f59e0b,stroke-width:2px,color:#fff
    style WorkerA fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
    style WorkerB fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
    style RedisCache fill:#1a1c23,stroke:#ef4444,stroke-width:2px,color:#fff

Count-Min Sketch Hash Matrix Mechanics

A Count-Min Sketch uses a fixed 2D array of width $w$ and depth $d$. When a hashtag (e.g. #systemdesign) arrives in the stream:

  1. It is passed through $d$ distinct, independent hash functions.
  2. Each hash function maps the hashtag to a specific column index inside its corresponding row.
  3. The counter at the resulting coordinate [row][column] is incremented by 1.

To query the frequency of a hashtag, we hash it again, look up all $d$ coordinates, and take the minimum value among them. Taking the minimum neutralizes the influence of hash collisions, delivering an extremely accurate upper-bound frequency.

graph TD
    Input[Hashtag: #systemdesign] -->|Hash Function 1| H1[Row 0, Col 4]
    Input -->|Hash Function 2| H2[Row 1, Col 182]
    Input -->|Hash Function 3| H3[Row 2, Col 99]
 
    subgraph "Count-Min Sketch Matrix (d x w)"
        Row0[Row 0: [0][1][2][3] +++ [4: Val=85435] +++ [w]]
        Row1[Row 1: [0] +++ [182: Val=85600] +++ [w]]
        Row2[Row 2: [0] +++ [99: Val=85430] +++ [w]]
    end
 
    H1 --> Row0
    H2 --> Row1
    H3 --> Row2
 
    Note over Row0,Row2: Query: min(85435, 85600, 85430) = 85430 (Accurate frequency)

Low-Level Design and Schema

To ensure thread-safety and sub-millisecond execution, we implement the Count-Min Sketch and local Min-Heap tracking in memory.

1. Count-Min Sketch & Min-Heap Controller (TypeScript)

This class defines a thread-safe, high-performance Count-Min Sketch coupled to an active Min-Heap (Priority Queue) to track the Top K items on the fly.

import md5 from 'md5';
 
export class CountMinSketch {
  private width: number;
  private depth: number;
  private matrix: Int32Array[];
 
  constructor(width: number, depth: number) {
    this.width = width;
    this.depth = depth;
    // Pre-allocate continuous memory blocks for the matrix counters
    this.matrix = Array.from({ length: depth }, () => new Int32Array(width));
  }
 
  // Generate d independent hash values using salt prefixes
  private getHash(item: string, rowIndex: number): number {
    const rawHash = md5(`${rowIndex}_${item}`); // MD5 salt hashing
    const numericHash = parseInt(rawHash.slice(0, 8), 16);
    return numericHash % this.width;
  }
 
  // Increment counters for incoming item
  public increment(item: string): void {
    for (let d = 0; d < this.depth; d++) {
      const col = this.getHash(item, d);
      this.matrix[d][col]++;
    }
  }
 
  // Fetch estimated frequency (Take the minimum to bypass collisions)
  public estimate(item: string): number {
    let minEstimate = Infinity;
    for (let d = 0; d < this.depth; d++) {
      const col = this.getHash(item, d);
      const val = this.matrix[d][col];
      if (val < minEstimate) {
        minEstimate = val;
      }
    }
    return minEstimate;
  }
}
 
// Active Top K Tracker incorporating a Min-Heap priority queue
export class TopKTracker {
  private k: number;
  private sketch: CountMinSketch;
  private minHeap: { item: string; count: number }[];
  private heapKeys: Set<string>;
 
  constructor(k: number, sketchWidth: number, sketchDepth: number) {
    this.k = k;
    this.sketch = new CountMinSketch(sketchWidth, sketchDepth);
    this.minHeap = [];
    this.heapKeys = new Set();
  }
 
  public processItem(item: string): void {
    // 1. Update Count-Min Sketch
    this.sketch.increment(item);
    const updatedCount = this.sketch.estimate(item);
 
    // 2. Evaluate Heap Placement
    if (this.heapKeys.has(item)) {
      // If the item is already in our heap, update its value and re-heapify
      const index = this.minHeap.findIndex((x) => x.item === item);
      this.minHeap[index].count = updatedCount;
      this.minHeapify();
    } else if (this.minHeap.length < this.k) {
      // If the heap is not full, add it directly
      this.minHeap.push({ item, count: updatedCount });
      this.heapKeys.add(item);
      this.minHeapify();
    } else if (updatedCount > this.minHeap[0].count) {
      // If the item count is larger than the minimum top item, eject the min and insert the new item
      const ejected = this.minHeap.shift();
      if (ejected) this.heapKeys.delete(ejected.item);
      
      this.minHeap.push({ item, count: updatedCount });
      this.heapKeys.add(item);
      this.minHeapify();
    }
  }
 
  private minHeapify(): void {
    // Sort ascending so the item with the minimum frequency is always at index 0
    this.minHeap.sort((a, b) => a.count - b.count);
  }
 
  public getTopK(): { item: string; count: number }[] {
    // Return descending (most frequent first)
    return [...this.minHeap].reverse();
  }
}

2. Time-Decayed Weight Formulas

To prevent historical viral hashtags from permanently dominating the trending board, we apply a sliding time-decay weight formula. When an event occurs, rather than treating it as a raw integer increment of $1$, we calculate its weighted contribution. The formula is defined as: $$W = e^{-\lambda (T_{\text{current}} - T_{\text{event}})}$$

Where $\lambda$ represents the decay constant (determining the speed of signal fading), and the exponential term naturally dampens older signals. In stream processing architectures like Apache Flink, this weight calculation is performed inside a custom mapper function prior to updating the Count-Min Sketch matrix. If a hashtag was posted 1 hour ago, its increment score might only evaluate to $0.05$, whereas a hashtag posted 1 second ago contributes $0.99$. This mathematical decaying forces the global trending charts to remain highly reactive to instantaneous surges rather than historical volume.


Scaling Challenges and Capacity Estimation

Ingesting millions of events per second in real-time exposes several physical bottlenecks:

1. Capacity and Memory Math

Let's conduct a capacity estimation for a global trending hashtag pipeline:

  • Event Ingestion Rate: $300,000$ tweets/second.
  • Average Hashtag Size: $15\text{ Bytes}$ (uncompressed string).
  • Total Daily Raw Events: $$\text{Daily Events} = 300,000 \text{ eps} \times 86,400 \text{ seconds} \approx 25.92 \text{ Billion events/day}$$
  • Naive Hash Map Memory Cost (Unique Keys): If we track $100\text{ Million}$ unique hashtags over a 24-hour period using a standard hash map: $$\text{Key Size} = 15 \text{ Bytes} + 8 \text{ Bytes counter} = 23 \text{ Bytes/item}$$ $$\text{Raw Memory} = 100,000,000 \text{ keys} \times 23 \text{ Bytes} \approx 2.3 \text{ GB}$$ With JVM object headers, hash table bucket pointers, and memory fragmentation, this map will actually consume over $10\text{ GB}$ of RAM, leading to severe Garbage Collection pauses and eventual OutOfMemory (OOM) crashes.
  • Probabilistic Count-Min Sketch Memory Cost: By allocating a fixed $d \times w$ matrix where $d = 5$ (hash rows) and $w = 200,000$ (columns of 32-bit integer counters): $$\text{Total Counters} = 5 \times 200,000 = 1,000,000 \text{ counters}$$ $$\text{Fixed Memory Footprint} = 1,000,000 \times 4 \text{ Bytes} = 4 \text{ MB}$$ The Count-Min Sketch scales down memory requirements by over 99.9% while keeping a mathematically bounded error margin! Even if the unique hashtag count grows to 10 Billion, the memory footprint remains absolutely unchanged at 4 MB.

2. Count-Min Sketch Collision Error Bounds

Because a Count-Min Sketch has a fixed array width $w$, different hashtags will hash to the same column index, leading to overestimation errors. If the width $w$ is too small, the estimation error will exceed acceptable business boundaries, leading to incorrect trending boards.

  • Mitigation: We mathematically size our matrix width $w$ and depth $d$ based on our error tolerance $\epsilon$ and confidence limit $\delta$ using the standard Count-Min Sketch proof equations: $$w = \lceil \frac{e}{\epsilon} \rceil, \quad d = \lceil \ln(\frac{1}{\delta}) \rceil$$ To guarantee that our estimation error is within $\epsilon = 0.01%$ of the total stream volume with a confidence probability of $99%$ ($\delta = 0.01$), we set $w \approx 27,180$ columns and $d \approx 5$ hash rows, bounding the error strictly.

If all users are tweeting about the same trending topic (e.g., #superbowl), standard hash partitioning by hashtag will route 90% of the stream to a single Flink worker node, saturating its memory, causing backpressure to propagate up to Kafka, and crashing the pod due to CPU saturation.

  • Mitigation (Two-Stage Aggregation):
    1. Local Pre-Aggregation Stage: Before sending events across the network, we append a random salt prefix (e.g., a random integer from 1 to $M$, such as 3_superbowl or 8_superbowl) to the hashtags at the ingestion layer. Flink keys the stream by this salted string. Since the hash keys are salted, they are evenly distributed across all parallel Flink worker nodes, neutralizing hotspots.
    2. Global Consolidation Stage: Each local Flink worker aggregates the salted items over a 10-second window. At the end of the window, the workers strip the salt prefix, leaving the pure hashtag names, and forward only their consolidated local Top K summaries to the Global Merging Service. This compresses the network traffic by a factor of 10,000, allowing the centralized coordinator to perform the final merge effortlessly.

4. Redis Sorted Set Skip-List Memory Scale

In our Global Aggregator, we store the consolidated global Top K list inside a Redis Sorted Set (ZSET). A Redis ZSET internally implements a Skip List paired with a Hash Map to guarantee logarithmic $O(\log N)$ insert and search time complexities.

  • Mitigation: We configure a strict pipeline pruning step. Every time Flink writes a bulk aggregation batch to the ZSET, we run a trailing command to prune the sorted set size: ZREMRANGEBYRANK global_trends:hashtags 0 -101 This removes all items beyond the top 100, bounding the memory footprint of the ZSET to less than $20\text{ KB}$ per set namespace.

Failure Scenarios and Resilience

Distributed streaming systems must guarantee continuous operations under broker failures.

Flink workers store their local Count-Min Sketch matrices in volatile memory. If a node crashes, the local state is lost.

  • Resilience Pattern: We configure a RocksDB State Backend with incremental checkpointing. Flink periodically snapshots the Count-Min Sketch state to a durable object store (e.g. AWS S3). If a worker node crashes, the Flink controller schedules a new pod, restores the matrix state from the last checkpoint, and replays the Kafka log offsets, ensuring exactly-once processing guarantees.

2. Kafka Broker Partition Offline

If a Kafka broker hosting critical hashtag partitions fails, the ingestion pipeline will stall, causing event dropouts or client request backpressure.

  • Resilience Pattern: We establish a Kafka partition replication factor of 3 ($ISR = 2$), ensuring that even if one broker crashes, secondary replicas take over within milliseconds without data loss. Ingestion clients utilize internal retry queues with exponential backoff and jitter to queue events locally on the application servers during transient network dropouts.

3. Sliding Time Window Watermarking

Trending hashtags change dynamically over time. In stream processing, the system must process events based on when they occurred physically (Event Time) rather than when they arrived at the server (Processing Time) to handle out-of-order network logs correctly. If a mobile client goes offline for 5 minutes and then reconnects, sending a burst of accumulated tweets, a naive sliding window will create severe data drift.

  • Resilience Pattern: We implement Bounded-Out-of-Orderness Watermarks inside Apache Flink. Flink sets a watermark threshold (e.g. 10 seconds latency). Flink allows late-arriving events within this 10-second threshold to update the active Count-Min Sketch matrix, while dropping events arriving later, maintaining strict state integrity and avoiding infinite window accumulation.

Architectural Trade-offs

Selecting an algorithm to solve the Heavy Hitters problem requires balancing memory usage, accuracy guarantees, and compute complexity.

Algorithm Choice Memory Complexity Write Throughput Exact Count Accuracy Delete Support Computational Complexity
Exact Hash Map $O(N)$ (Unbounded) Low 100% (Strictly Exact) Yes $O(1)$
Count-Min Sketch $O(d \cdot w)$ (Strictly Bounded) Max (Single hash writes) Probabilistic Upper Bound No $O(d)$
Space-Saving Algorithm $O(K)$ (Extremely Low) Medium Probabilistic No $O(\log K)$
Lossy Counting $O(1/\epsilon \cdot \log(\epsilon N))$ Medium Probabilistic No $O(1)$

Exact Hash Map vs. Count-Min Sketch

An Exact Hash Map tracks every single item with absolute precision. This is perfect for accounting systems where financial accuracy is required. However, for a real-time hashtag tracking system handling $300,000$ events per second, the unbounded memory footprint $O(N)$ (where $N$ is the cardinality of unique hashtags) will crash the servers.

The Count-Min Sketch trades absolute precision for a strictly bounded memory footprint $O(d \cdot w)$. By allocating a fixed-size matrix upfront, we completely eliminate memory churn and JVM garbage collection issues, while bounding our estimation error mathematically. This is an optimal trade-off for trending charts where a $0.01%$ frequency error on non-trending items is completely negligible.


Staff Engineer Perspective


Verbal Script

Interviewer: "Welcome! Today, we want to design a system capable of tracking the top 100 trending hashtags globally on Twitter in real-time. The system must support an ingestion rate of 300,000 tweets per second. How would you architect this to maintain sub-second update latencies with bounded memory?"

Candidate: "To track the top 100 trending hashtags under a 300,000 requests-per-second workload, a naive hash-map approach is completely unfeasible due to memory bloat and JVM garbage collection stalls. Instead, I would design a distributed, real-time pipeline utilizing a two-stage streaming architecture powered by Apache Kafka, Apache Flink, and the Count-Min Sketch probabilistic data structure. At the ingestion layer, clients write events to a partitioned Kafka cluster. To prevent hot-key partition bottlenecks, we append a random salt prefix from 1 to $M$ to each hashtag. This distributes the writes evenly across all Kafka partitions and Flink worker nodes. Each Flink worker node maintains a local, highly optimized Count-Min Sketch matrix and a local Min-Heap of size $K$. When a hashtag arrives, the Flink worker hashes it, updates the corresponding cells in the matrix, estimates its current frequency by taking the minimum value, and evaluates it for insertion into the Min-Heap. Because the Count-Min Sketch matrix has a fixed dimension, our local memory footprint is strictly bounded to less than 5MB per worker, completely eliminating GC pauses."

Interviewer: "Excellent. But now you have multiple local Top K lists scattered across Flink workers. How do you merge them into a single, accurate global Top 100 list?"

Candidate: "To merge these local lists without saturating our network, we avoid sending raw event streams to our central coordinator. Instead, we implement a Periodic Flush Model. Every 10 seconds, each Flink worker node strips the random salt prefix from its local Min-Heap items and flushes only its top-100 list to a central Global Merging Service. The Global Merging Service aggregates the duplicate keys across the worker lists, sums their local estimated counts, and sorts them. Because we are only merging $N \times 100$ items instead of millions of raw tweets, this global consolidation runs in milliseconds. The final Global Top 100 list is written to a Redis Sorted Set (ZSET), where the API Gateway can execute sub-millisecond lookups."

Interviewer: "That is a very clean aggregation pipeline. How do you prevent old trending topics from permanently blocking new trends?"

Candidate: "We prevent historical trending dominance by applying a Time-Decay Exponential Formula. Instead of treating every event as a static count of 1, we compute a decay weight $W = e^{-\lambda \cdot \Delta t}$, where $\Delta t$ is the time elapsed since our system epoch. When a Flink worker processes a hashtag, it increments the Count-Min Sketch cells by $W$ instead of 1. Consequently, older hashtag counts naturally fade away exponentially over time, while new, high-velocity hashtags quickly accumulate large weight scores and rise to the top of our priority queues, providing a highly reactive and accurate trending experience."

Want to track your progress?

Sign in to save your progress, track completed lessons, and pick up where you left off.