Lesson 10 of 105 13 minFlagship

System Design: Designing a Distributed BLOB Store (like S3/GCS)

How to build a distributed BLOB storage system capable of storing exabytes of data. A deep dive into erasure coding, metadata management, and consistent hashing.

Reading Mode

Hide the curriculum rail and keep the lesson centered for focused reading.

Key Takeaways

  • **Erasure Coding:** Breaking objects into K data chunks and M parity chunks, reducing storage overhead by 50% compared to 3x replication.
  • **Decoupled Planes:** Separating the high-throughput Data Plane (handling chunk streams) from the metadata Control Plane.
  • **Durability Audits:** Employing hierarchical MurmurHash3 checksum trees to dynamically detect and heal localized disk bit-rot.
Recommended Prerequisites
System Design Interview FrameworkSystem Design Module 7: Sharding & Partitioning

Premium outcome

From vague architecture answers to staff-level trade-off thinking.

Backend engineers preparing for senior, staff, and architecture rounds.

What you unlock

  • A reusable system design answer framework for ambiguous prompts
  • Clear language for consistency, scaling, and reliability trade-offs
  • Case-study depth across feeds, payments, storage, and messaging systems

Mental Model

A distributed BLOB (Binary Large Object) storage system is designed to store exabytes of unstructured data with virtually infinite horizontal scale, high availability, and extreme durability. Traditional file systems fail at this scale because hierarchical directory structures create metadata synchronization bottlenecks. A premium BLOB store decouples data storage from metadata coordination: physical files are chunked, transformed via Reed-Solomon Erasure Coding, and streamed directly to bare-metal storage nodes in the Data Plane. Object metadata—such as bucket schemes, chunk mapping locations, and access control lists—is managed independently by a linearizable, sharded metadata consensus cluster in the Control Plane.


Requirements and System Goals

When building an enterprise distributed BLOB store (similar to Amazon S3 or Google Cloud Storage), we must target extreme durability bounds and low latency budgets.

1. Functional Requirements

  • Object CRUD Operations: Allow clients to upload (PUT), download (GET), and delete (DELETE) binary files ranging from 1 byte to 5 terabytes.
  • Multipart Parallel Uploads: Enable clients to split large files into independent parts and upload them in parallel out-of-order, merging them atomically upon completion.
  • Bucket Coordination: Support multi-tenant isolation where users register unique buckets to host logical groupings of objects.

2. Non-Functional Requirements & Performance Budgets

  • Exabyte-Scale Storage capacity: Scale seamlessly to hold billions of objects across hundreds of thousands of physical storage nodes.
  • Eleven Nines Durability SLA: Achieve 99.999999999% (11-nines) durability, guaranteeing that even under massive parallel rack failures, data is never lost.
  • Low-Latency Time-To-First-Byte (TTFB): Deliver a TTFB latency of less than 50ms for small objects and initial chunk streams.
  • Highly Partition-Tolerant Availability: Expose 99.99% availability for write operations and 99.999% availability for read streams.

API Interfaces and Service Contracts

A distributed BLOB store utilizes standardized HTTP/REST APIs for data streams and multipart control.

1. Initialize Multipart Upload API Contract

For objects greater than 100MB, parallel multipart uploads are initiated to prevent network timeout failures.

POST /api/v1/buckets/{bucket_name}/objects/{object_key}?uploads

Response Payload (200 OK):

{
  "bucket": "assets-production",
  "key": "videos/sprint_masterclass.mp4",
  "upload_id": "mp_upload_a8b9c2d1-4433-2211-bb00-eeddccbbaa99",
  "chunk_size_bytes": 16777216,
  "initiated_at": "2026-06-01T11:06:00Z"
}

2. Upload Object Part API Contract

Clients execute parallel HTTP PUT requests to stream raw binary byte parts of the BLOB, referencing the upload_id.

PUT /api/v1/buckets/{bucket_name}/objects/{object_key}?uploadId=mp_upload_a8b9c2d1&partNumber=3

Request Headers:

Content-Length: 16777216
Content-MD5: rAn/VK6ZGuU4RLJIG19uCw==
Content-Type: application/octet-stream

Response Payload (200 OK):

{
  "upload_id": "mp_upload_a8b9c2d1-4433-2211-bb00-eeddccbbaa99",
  "part_number": 3,
  "etag": "etag_part_3_5f8a2c1d3e",
  "checksum_crc32c": "3b29c8e1",
  "bytes_written": 16777216
}

High-Level Design and Visualizations

Decoupling high-volume chunk streaming channels from metadata catalog directories is mandatory to eliminate routing bottlenecks.

1. Data-Plane and Control-Plane Decoupled Architecture

This flowchart demonstrates how client writes bypass metadata lookups once chunk coordinates are resolved, streaming data bytes directly to decentralized Storage Nodes.

graph TD
    subgraph Client Layer
        Client[Application Client] -->|1. PUT / GET Request| APIGateway[API Gateway Cluster]
    end

    subgraph Control Plane (Metadata and Mapping)
        APIGateway -->|2. Lookup Bucket / ACL| AuthSvc[Auth & Policy Engine]
        APIGateway -->|3. Route Shard Directory| MetaDB[(CockroachDB Metadata Store)]
    end

    subgraph Data Plane (High-Throughput Streams)
        APIGateway -->|4. Erasure Code & Stream Chunks| StorageProxy[Storage Coordinator Proxy]
        
        StorageProxy -->|5. Stream Data Chunk 1| Node1[Storage Node 1 (Rack A)]
        StorageProxy -->|5. Stream Data Chunk 2| Node2[Storage Node 2 (Rack B)]
        StorageProxy -->|5. Stream Parity Chunk 1| Node3[Storage Node 3 (Rack C)]
        StorageProxy -->|5. Stream Parity Chunk 2| Node4[Storage Node 4 (Rack D)]
    end

2. Parallel Multipart Upload Ingestion Flow

The sequence diagram below displays the end-to-end parallel execution pipeline for streaming chunked parts.

sequenceDiagram
    autonumber
    participant Client as Application Client
    participant GW as API Gateway
    participant Meta as Metadata Store
    participant ChunkSvc as Chunk Allocation Engine
    participant Store as Storage Nodes Cluster

    Client->>GW: POST /bucket/object?uploads (Init Multipart)
    GW->>Meta: Register active upload session
    Meta-->>GW: Return uploadId
    GW-->>Client: Return uploadId and Chunk Bounds
    
    rect rgb(240, 255, 240)
        Note over Client, Store: Parallel Part Upload Execution
        Par over Client, GW, Store
            Client->>GW: PUT part 1 (16MB Binary)
            GW->>ChunkSvc: Reserve chunk block addresses
            ChunkSvc-->>GW: Alloc Node Addresses [1, 2, 3]
            GW->>Store: Stream binary parts (Write data/parity)
            Store-->>GW: Return Write Ack + Checksums
            GW-->>Client: 200 OK (etag_part_1)
            
            Client->>GW: PUT part 2 (16MB Binary)
            GW->>ChunkSvc: Reserve chunk block addresses
            ChunkSvc-->>GW: Alloc Node Addresses [4, 5, 6]
            GW->>Store: Stream binary parts (Write data/parity)
            Store-->>GW: Return Write Ack + Checksums
            GW-->>Client: 200 OK (etag_part_2)
        end
    end

    Client->>GW: POST /bucket/object?complete (Merge Session)
    GW->>Meta: Finalize metadata state & map active chunks list
    Meta-->>GW: Commit Object Transaction
    GW-->>Client: 201 Created (Upload Completed)

Low-Level Design and Schema Strategies

To support quick spatial lookups, the metadata system coordinates active mapping tables, and the storage engines utilize localized flat-file logs.

1. Object Metadata Catalog Schema (CockroachDB)

This linearizable, horizontally sharded PostgreSQL-compatible DDL schema stores global object properties, directory path hierarchies, and structural chunk lists.

-- Logical Bucket Registry
CREATE TABLE storage_buckets (
    bucket_name VARCHAR(64) PRIMARY KEY,
    owner_id VARCHAR(64) NOT NULL,
    access_control_policy VARCHAR(32) NOT NULL DEFAULT 'PRIVATE',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Object Metadata Ledger
CREATE TABLE bucket_objects (
    object_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    bucket_name VARCHAR(64) REFERENCES storage_buckets(bucket_name),
    object_key VARCHAR(1024) NOT NULL, -- Logical file path: 'videos/sprint.mp4'
    size_bytes BIGINT NOT NULL,
    etag VARCHAR(64) NOT NULL,
    content_type VARCHAR(128) NOT NULL DEFAULT 'application/octet-stream',
    erasure_coding_scheme VARCHAR(16) NOT NULL DEFAULT 'RS_8_4', -- 8 data, 4 parity
    lifecycle_state VARCHAR(16) NOT NULL DEFAULT 'STANDARD', -- STANDARD, COLD, ARCHIVE
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    UNIQUE (bucket_name, object_key)
);

-- Physical Object Chunks Mappings
CREATE TABLE object_chunks_allocation (
    chunk_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    object_id UUID REFERENCES bucket_objects(object_id) ON DELETE CASCADE,
    part_index INT NOT NULL, -- Part position in the sequence
    chunk_index INT NOT NULL, -- Sub-chunk sequence (0 to 11 for RS_8_4)
    storage_node_id VARCHAR(64) NOT NULL,
    physical_block_offset BIGINT NOT NULL,
    checksum_crc32c VARCHAR(8) NOT NULL,
    is_parity BOOLEAN DEFAULT FALSE
);

-- Indexes for rapid GET path queries
CREATE INDEX idx_object_key ON bucket_objects(bucket_name, object_key);
CREATE INDEX idx_chunks_allocation ON object_chunks_allocation(object_id, part_index);

2. Flat-File Append-Only Storage Block State Machine

Rather than allocating a physical OS file for every user object (which degrades OS inode operations under millions of files), storage nodes merge object chunks into a massive, pre-allocated Append-Only Block Log File (e.g. data_block_01.bin).

Operation Action Block Allocation State
Ingress Part Stream appends chunk binary bytes to the end of the active open block file. Increments block write cursor pointer.
Write Complete Records node offset and block length, returning details to the metadata engine. Block offset maps into memory index.
GET Stream Reads a specific byte range directly from the block log without locking the file. Concurrent non-blocking disk reads.
Object Deleted Updates CockroachDB metadata (sets state to tombstone). Does NOT delete raw disk bytes instantly. Disk block space is bypassed.
Compaction Run Iterates through block log files, copies healthy (non-tombstoned) chunks to a new block, and deletes the old block. Reclaims disk space during low-traffic windows.

Scaling and Operational Challenges

1. Reed-Solomon Erasure Coding Mathematics

To survive high-frequency rack failures without incurring the massive cost of traditional $3\times$ replication (which introduces a 200% storage overhead), we utilize Reed-Solomon (RS) Erasure Coding.

  • The Mathematical Model:
    • We partition an object into $K$ independent data chunks.
    • We process the $K$ chunks through a Reed-Solomon generator matrix to produce $M$ parity chunks.
    • The total chunk set size is $N = K + M$.
    • All $N$ chunks are distributed across $N$ physical storage nodes situated on distinct power racks.
    • The Durability Guarantee: The system can survive the concurrent loss of any $M$ storage nodes without any data loss.
  • Storage Efficiency Math:
    • In a standard $RS(8, 4)$ setup ($K = 8$, $M = 4$): $$\text{Storage Efficiency} = \frac{K}{K + M} = \frac{8}{8 + 4} = \frac{8}{12} \approx 66.67%$$ $$\text{Storage Overhead} = 1.5\times \text{ of base object size}$$
    • Compared to $3\times$ Replication: $$\text{Replication Overhead} = 3.0\times \text{ of base object size}$$
    • The Impact: For a 100 Exabyte dataset, using $RS(8, 4)$ instead of replication saves 150 Exabytes of physical disk purchases, saving hundreds of millions of dollars in datacenter infrastructure while maintaining equal or greater durability guarantees.
  • Recovery Mathematics: If $M$ chunks are lost due to a physical disk crash, the coordinator uses Galois Field ($GF(2^8)$) matrix inversion mathematics to reconstruct the original data chunks in real-time, delivering seamless stream access to the client.

2. Garbage Collection of Orphan Multipart Uploads

When parallel multipart uploads are aborted or interrupted due to client network failures, the partially uploaded chunks remain on the storage nodes, consuming massive disk volumes.

  • Mitigation: The API Gateway coordinates an automated background garbage collector. The catalog database tracks active multipart sessions. If a multipart session has no write progress for more than 7 days, it is marked as orphaned. The collector deletes the chunk allocation records, triggers physical chunk tombstone markings on the storage nodes, and frees up petabytes of leaked capacity.

Distributed Blob Storage Trade-offs

Choosing a storage topology requires selecting between disk overheads and processing overheads.

Architectural Dimension Reed-Solomon Erasure Coding Traditional Multi-Region 3x Replication
Physical Storage Overhead Low (typically 1.3x to 1.5x of base data size) High (3.0x raw storage overhead)
CPU Processing Overhead High (Requires intense matrix multiplication encoding/decoding) Low (Simple TCP socket byte replication copies)
Network Rebuild Traffic High (Rebuilding a dead node requires fetching chunks from K nodes) Low (Simple clone copy from any healthy replica node)
Read/Write Latency High (Chunks must be gathered and decoded on the hot path) Low (Direct byte streaming from local storage node)
Durability Tier Extreme (Can survive M concurrent rack failures) Medium (Can survive 2 concurrent node failures)

Failure Modes and Fault Tolerance Strategies

1. Bit-Rot Detection and Self-Healing Checksum Trees

Over time, physical storage media suffers from magnetic degradation, causing silent Bit-Rot where bytes are corrupted without the disk controller throwing an error.

  • The Resilience Strategy: Every chunk is stored alongside a hierarchical MurmurHash3 or CRC32C checksum.
  • The storage nodes run active background Data Scrubber Services during low-traffic windows.
  • The scrubber reads raw chunk bytes, recalculates the checksum, and compares it against the catalog value.
  • If a checksum mismatch is detected (corrupt data), the scrubber isolates the node, alerts the controller, and uses the remaining healthy $K$ chunks from the other nodes to reconstruct the healthy chunk.
  • The reconstructed chunk is written to a new storage node, and the metadata directory is updated, guaranteeing absolute data integrity.

2. Cascading Rack Outage Node Reconstruction

If an entire datacenter power rack containing 40 storage nodes goes offline, the system must rebuild the lost chunks immediately to prevent subsequent node crashes from causing permanent data loss.

  • The Solution: We implement Decentralized Parallel Reconstruction.
  • Instead of routing all rebuild data through a single coordinator node (which creates a massive network bottleneck), the system triggers a decentralized task across the cluster.
  • Healthy storage nodes act as parallel matrix workers, pulling chunks from one another, performing local RS decoding calculations, and writing the restored blocks directly to new target nodes, reducing reconstruction time from days to less than 30 minutes.

Staff Engineer Perspective


Production Readiness Checklist

Ensure these checks are satisfied before putting your distributed BLOB store into active service:

  • Data Scrubbers Configured: Background scrubber runs must be scheduled to scan and verify CRC32C checksum trees on all storage blocks every 7 days.
  • Erasure Coding Distribution: Confirm that the RS(8,4) encoding engine distributes chunks across distinct electrical power domains and switch lines.
  • Multipart Expiry Policies: Configure automated cleanup rules to purge orphaned multipart parts after 7 days of inactivity.
  • LSM-Tree Metadata Indexing: Ensure the metadata keyspace indexes are sharded using consistent hashing to prevent hot metadata tablets.


Verbal Script

Interviewer: "How would you design a highly durable distributed BLOB storage system like Amazon S3 capable of storing exabytes of data, and how do you handle data persistence and node failures?"

Candidate: "To design a distributed BLOB storage system at this massive scale, I would build a highly decoupled architecture that separates the Control Plane from the Data Plane. The Control Plane coordinates metadata—such as bucket names, logical file paths, and ACLs—using a sharded Key-Value store like CockroachDB. The Data Plane streams, encodes, and persists the raw binary bytes directly to bare-metal Storage Nodes.

To prevent metadata scale bottlenecks, we avoid storing large binary files inside relational databases. Instead, we chunk our BLOBs. For large uploads, the client utilizes Multipart Parallel Uploads by calling POST /bucket/object?uploads. The system registers a session, returning an uploadId. The client then PUTs 16MB chunks in parallel, which bypasses metadata locking and maps chunks directly to storage nodes, yielding maximum ingestion throughput.

To achieve extreme durability without incurring the massive hardware costs of traditional $3\times$ replication, I would implement Reed-Solomon Erasure Coding like $RS(8, 4)$. When a chunk arrives at our storage coordinator, we partition it into 8 data chunks and process them through a generator matrix to create 4 parity chunks. These 12 chunks are distributed across 12 distinct storage nodes located on different power racks. This configuration yields a high storage efficiency of $66.7%$, keeping storage overhead at only $1.5\times$ (compared to the $3.0\times$ cost of replication) and guaranteeing that the system can survive the concurrent crash of any 4 racks without any data loss.

For localized data corruption, we protect against silent Bit-Rot by appending a CRC32C checksum tree to every chunk. Background Scrubber Services scan the storage blocks, recalculate checksums, and detect degradation. If a corrupted chunk is discovered, the scrubber initiates a rebuild: it pulls the remaining healthy chunks from the other nodes, runs Galois Field matrix calculations to reconstruct the lost bytes, and writes the healed chunk to a new node, maintaining our eleven-nines durability SLA."

Want to track your progress?

Sign in to save your progress, track completed lessons, and pick up where you left off.