Modern cloud infrastructure relies on distributed storage systems
that can scale to petabytes, maintain high availability, and provide
consistent performance across global deployments. Understanding how
these systems work — from the theoretical foundations of CAP theorem to
practical implementations like Amazon S3, HDFS, and Ceph — is essential
for architects and engineers building cloud-native applications.
This article explores the fundamental principles, architectural
patterns, and real-world implementations of distributed storage systems.
We'll examine object storage architectures, compare block and file
storage models, dive deep into HDFS and Ceph deployments, and discuss
strategies for consistency, replication, backup, performance
optimization, and cost management.
Distributed Storage
Fundamentals
The CAP Theorem
The CAP theorem, proposed by Eric Brewer in 2000, states that a
distributed system can guarantee at most two out of three
properties:
Consistency : All nodes see the same data
simultaneously
Availability : Every request receives a response
(success or failure)
Partition Tolerance : System continues operating
despite network failures
In practice, partition tolerance is non-negotiable for distributed
systems — network partitions will occur. This forces a choice between
consistency and availability.
CP Systems prioritize consistency over availability.
When a partition occurs, CP systems may refuse requests to maintain data
consistency. Examples include:
Traditional relational databases with synchronous replication
HBase (strong consistency model)
MongoDB with majority write concern
AP Systems prioritize availability over strict
consistency. They continue serving requests during partitions,
potentially returning stale data. Examples include:
Amazon DynamoDB (eventual consistency)
Cassandra (tunable consistency)
Riak (eventual consistency)
CA Systems are theoretically impossible in
distributed environments but exist in single-node systems or tightly
coupled clusters without network partitions.
BASE Principles
BASE (Basically Available, Soft state, Eventual consistency)
complements ACID properties for distributed systems:
Basically Available : System remains available most
of the time, even during partial failures
Soft State : System state may change without input
due to eventual consistency
Eventual Consistency : System will become consistent
over time, given no new updates
BASE systems trade immediate consistency for improved availability
and performance, making them ideal for large-scale web applications.
Distributed Storage
Architecture Patterns
Master-Slave Architecture
In master-slave (primary-secondary) architectures, one node
coordinates operations while others replicate data:
1 2 3 4 5 6 7 8 9 ┌─────────────┐ │ Master │ (Coordinates writes, manages metadata) └──────┬──────┘ │ ┌───┴───┬────────┬────────┐ │ │ │ │ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐ │ Slave │ │ Slave │ │ Slave │ │ Slave │ (Store data replicas) └─────┘ └─────┘ └─────┘ └─────┘
Advantages :
Simple to implement and reason about
Strong consistency possible with synchronous replication
Clear failure semantics
Disadvantages :
Master becomes a bottleneck
Single point of failure (requires failover mechanisms)
Limited scalability
Peer-to-Peer Architecture
Peer-to-peer systems distribute coordination across all nodes:
1 2 3 4 5 6 7 8 9 10 ┌─────┐ │ Node1│ └──┬──┘ │ ┌───┴───┬────────┬────────┐ │ │ │ │ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐ │ Node2│ │ Node3│ │ Node4│ │ Node5│ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ └───────┴────────┴───────┘
Advantages :
No single point of failure
Horizontal scalability
Better fault tolerance
Disadvantages :
More complex to implement
Eventual consistency challenges
Conflict resolution required
Consistent Hashing
Consistent hashing enables efficient data distribution and minimizes
rebalancing when nodes join or leave:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import hashlibimport bisectclass ConsistentHash : def __init__ (self, nodes=None , replicas=3 ): self.replicas = replicas self.ring = {} self.sorted_keys = [] if nodes: for node in nodes: self.add_node(node) def _hash (self, key ): """Generate hash for a key""" return int (hashlib.md5(key.encode()).hexdigest(), 16 ) def add_node (self, node ): """Add a node to the hash ring""" for i in range (self.replicas): key = self._hash (f"{node} :{i} " ) self.ring[key] = node bisect.insort(self.sorted_keys, key) def remove_node (self, node ): """Remove a node from the hash ring""" for i in range (self.replicas): key = self._hash (f"{node} :{i} " ) del self.ring[key] self.sorted_keys.remove(key) def get_node (self, key ): """Get the node responsible for a key""" if not self.ring: return None hash_key = self._hash (key) idx = bisect.bisect_right(self.sorted_keys, hash_key) if idx == len (self.sorted_keys): idx = 0 return self.ring[self.sorted_keys[idx]] ch = ConsistentHash(['node1' , 'node2' , 'node3' , 'node4' ]) print (ch.get_node('user:12345' ))
Object Storage Deep Dive
Amazon S3 Architecture
Amazon S3 (Simple Storage Service) is the de facto standard for
object storage. Its architecture demonstrates key principles of scalable
object storage systems.
S3 Request Flow
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Client Request │ ▼ ┌─────────────────┐ │ Load Balancer │ (Distributes requests) └────────┬────────┘ │ ┌────┴────┐ │ │ ┌───▼───┐ ┌───▼───┐ │ Frontend │ │ Frontend │ (Request routing) └───┬───┘ └───┬───┘ │ │ └────┬────┘ │ ┌────────▼────────┐ │ Metadata Store │ (DynamoDB - object metadata) └────────┬────────┘ │ ┌────┴────┐ │ │ ┌───▼───┐ ┌───▼───┐ │ Storage │ │ Storage │ (Actual object data) │ Node │ │ Node │ └───────┘ └───────┘
S3 Key Concepts
Buckets : Top-level containers for objects, organized
by region and namespace.
Objects : Immutable data units with:
Key (unique identifier)
Value (data)
Metadata (content-type, custom headers)
Version ID (for versioning)
Regions : Geographic locations for data storage,
enabling compliance and latency optimization.
S3 API Operations
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import boto3from botocore.exceptions import ClientErrorclass S3StorageManager : def __init__ (self, region_name='us-east-1' ): self.s3_client = boto3.client('s3' , region_name=region_name) def create_bucket (self, bucket_name ): """Create a new S3 bucket""" try : self.s3_client.create_bucket(Bucket=bucket_name) print (f"Bucket {bucket_name} created successfully" ) except ClientError as e: print (f"Error creating bucket: {e} " ) def upload_object (self, bucket_name, object_key, file_path, metadata=None ): """Upload an object to S3""" extra_args = {} if metadata: extra_args['Metadata' ] = metadata try : self.s3_client.upload_file( file_path, bucket_name, object_key, ExtraArgs=extra_args ) print (f"Object {object_key} uploaded successfully" ) except ClientError as e: print (f"Error uploading object: {e} " ) def download_object (self, bucket_name, object_key, download_path ): """Download an object from S3""" try : self.s3_client.download_file( bucket_name, object_key, download_path ) print (f"Object {object_key} downloaded successfully" ) except ClientError as e: print (f"Error downloading object: {e} " ) def list_objects (self, bucket_name, prefix='' ): """List objects in a bucket""" try : response = self.s3_client.list_objects_v2( Bucket=bucket_name, Prefix=prefix ) return response.get('Contents' , []) except ClientError as e: print (f"Error listing objects: {e} " ) return [] def delete_object (self, bucket_name, object_key ): """Delete an object from S3""" try : self.s3_client.delete_object(Bucket=bucket_name, Key=object_key) print (f"Object {object_key} deleted successfully" ) except ClientError as e: print (f"Error deleting object: {e} " ) manager = S3StorageManager() manager.create_bucket('my-app-data' ) manager.upload_object( 'my-app-data' , 'users/profile.jpg' , '/local/path/profile.jpg' , metadata={'user-id' : '12345' , 'upload-time' : '2025-01-01' } )
S3 Storage Classes
S3 offers multiple storage classes optimized for different access
patterns:
Storage Class
Use Case
Durability
Availability
Cost
Standard
Frequently accessed data
99.999999999%
99.99%
Highest
Standard-IA
Infrequently accessed
99.999999999%
99.9%
Lower
One Zone-IA
Non-critical infrequent access
99.999999999%
99.5%
Lower still
Glacier Instant Retrieval
Archive with instant access
99.999999999%
99.9%
Very low
Glacier Flexible Retrieval
Archive (3-5 min retrieval)
99.999999999%
99.99%
Very low
Glacier Deep Archive
Long-term archive (12-48h)
99.999999999%
99.99%
Lowest
Intelligent-Tiering
Automatic optimization
99.999999999%
99.9%
Variable
S3 Consistency Model
S3 provides:
Read-after-write consistency for PUTs of new
objects
Eventual consistency for overwrite PUTs and
DELETEs
Strong consistency for GET operations (as of
December 2020)
This model balances performance with consistency guarantees.
Alibaba Cloud OSS
Architecture
Alibaba Cloud Object Storage Service (OSS) follows similar principles
to S3 but with some architectural differences:
OSS Architecture Components
1 2 3 4 5 6 7 8 9 10 11 12 ┌─────────────────────────────────────────┐ │ OSS API Gateway │ │ (Request routing, authentication) │ └──────────────┬──────────────────────────┘ │ ┌──────────┴──────────┐ │ │ ┌───▼────┐ ┌─────▼────┐ │ Metadata │ │ Storage │ │ Service │ │ Service │ │(TableStore) │ (Pangu) │ └────────┘ └──────────┘
Key Features :
Multi-version support
Server-side encryption
Cross-region replication
Lifecycle management
CDN integration
OSS Python SDK Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import oss2class OSSStorageManager : def __init__ (self, access_key_id, access_key_secret, endpoint, bucket_name ): auth = oss2.Auth(access_key_id, access_key_secret) self.bucket = oss2.Bucket(auth, endpoint, bucket_name) def upload_file (self, local_file, object_key ): """Upload file to OSS""" result = self.bucket.put_object_from_file(object_key, local_file) print (f"Upload successful. ETag: {result.etag} " ) return result def download_file (self, object_key, local_file ): """Download file from OSS""" self.bucket.get_object_to_file(object_key, local_file) print (f"Download successful: {local_file} " ) def list_objects (self, prefix='' , max_keys=100 ): """List objects with prefix""" objects = [] for obj in oss2.ObjectIterator(self.bucket, prefix=prefix, max_keys=max_keys): objects.append({ 'key' : obj.key, 'size' : obj.size, 'last_modified' : obj.last_modified }) return objects def delete_object (self, object_key ): """Delete object from OSS""" self.bucket.delete_object(object_key) print (f"Deleted: {object_key} " ) def set_lifecycle_rule (self, prefix, expiration_days ): """Set lifecycle rule for automatic deletion""" rule = oss2.models.LifecycleRule( id ='auto-delete' , status='Enabled' , prefix=prefix, expiration=expiration_days ) self.bucket.put_bucket_lifecycle(oss2.models.BucketLifecycle([rule])) print (f"Lifecycle rule set for prefix: {prefix} " ) manager = OSSStorageManager( access_key_id='your-key' , access_key_secret='your-secret' , endpoint='https://oss-cn-hangzhou.aliyuncs.com' , bucket_name='my-bucket' ) manager.upload_file('/local/file.txt' , 'remote/file.txt' )
Block vs File Storage
Comparison
Understanding the differences between block, file, and object storage
is crucial for selecting the right storage solution.
Block Storage
Block storage treats storage as a sequence of fixed-size blocks,
typically 512 bytes to 4KB. The storage system doesn't understand file
semantics — it only manages blocks.
Characteristics
Low-level access : Direct block-level I/O
High performance : Minimal overhead, suitable for
databases
Flexible : Can be formatted with any filesystem
No metadata : Block storage doesn't track file
information
Use Cases
Database storage (MySQL, PostgreSQL, Oracle)
Virtual machine disk images
High-performance applications requiring low latency
RAID implementations
Block Storage Architecture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Application │ ▼ ┌──────────┐ │ File System │ (ext4, XFS, NTFS) └────┬─────┘ │ ▼ ┌──────────┐ │ Block Layer │ (Logical Volume Manager) └────┬─────┘ │ ▼ ┌──────────┐ │ Physical │ (Hard drives, SSDs) │ Storage │ └──────────┘
AWS EBS Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import boto3class EBSManager : def __init__ (self, region_name='us-east-1' ): self.ec2 = boto3.client('ec2' , region_name=region_name) def create_volume (self, size_gb, volume_type='gp3' , availability_zone='us-east-1a' ): """Create an EBS volume""" response = self.ec2.create_volume( AvailabilityZone=availability_zone, Size=size_gb, VolumeType=volume_type, Encrypted=True ) volume_id = response['VolumeId' ] print (f"Created volume: {volume_id} " ) return volume_id def attach_volume (self, volume_id, instance_id, device='/dev/sdf' ): """Attach volume to EC2 instance""" self.ec2.attach_volume( VolumeId=volume_id, InstanceId=instance_id, Device=device ) print (f"Attached {volume_id} to {instance_id} as {device} " ) def create_snapshot (self, volume_id, description='' ): """Create snapshot of volume""" response = self.ec2.create_snapshot( VolumeId=volume_id, Description=description ) snapshot_id = response['SnapshotId' ] print (f"Created snapshot: {snapshot_id} " ) return snapshot_id def list_volumes (self ): """List all EBS volumes""" response = self.ec2.describe_volumes() return response['Volumes' ] ebs = EBSManager() volume_id = ebs.create_volume(size_gb=100 , volume_type='gp3' ) ebs.attach_volume(volume_id, 'i-1234567890abcdef0' ) snapshot_id = ebs.create_snapshot(volume_id, 'Daily backup' )
File Storage
File storage organizes data in a hierarchical directory structure
with files and folders. It provides file-level semantics and
metadata.
Characteristics
Hierarchical structure : Directory trees with
files
File semantics : Open, read, write, close
operations
Metadata : File permissions, timestamps,
ownership
Network protocols : NFS, SMB/CIFS, CIFS
Use Cases
Shared file systems for multiple servers
Content management systems
Home directories
Application configuration files
File Storage Architecture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Client Application │ ▼ ┌──────────────┐ │ NFS/SMB Client │ └──────┬───────┘ │ Network Protocol ▼ ┌──────────────┐ │ File Server │ │(NFS/SMB) │ └──────┬───────┘ │ ┌──────▼───────┐ │ File System │ (ext4, ZFS, etc.) └──────┬───────┘ │ ┌──────▼───────┐ │ Block Storage │ └──────────────┘
AWS EFS Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import boto3class EFSManager : def __init__ (self, region_name='us-east-1' ): self.efs = boto3.client('efs' , region_name=region_name) self.ec2 = boto3.client('ec2' , region_name=region_name) def create_file_system (self, performance_mode='generalPurpose' , throughput_mode='bursting' ): """Create EFS file system""" response = self.efs.create_file_system( PerformanceMode=performance_mode, ThroughputMode=throughput_mode, Encrypted=True ) file_system_id = response['FileSystemId' ] print (f"Created EFS: {file_system_id} " ) return file_system_id def create_mount_target (self, file_system_id, subnet_id, security_groups ): """Create mount target for EFS""" response = self.efs.create_mount_target( FileSystemId=file_system_id, SubnetId=subnet_id, SecurityGroups=security_groups ) mount_target_id = response['MountTargetId' ] print (f"Created mount target: {mount_target_id} " ) return mount_target_id def list_file_systems (self ): """List all EFS file systems""" response = self.efs.describe_file_systems() return response['FileSystems' ] efs = EFSManager() fs_id = efs.create_file_system(performance_mode='generalPurpose' )
Comparison Table
Feature
Block Storage
File Storage
Object Storage
Access Method
Block-level I/O
File system APIs
REST APIs
Data Organization
Blocks
Hierarchical directories
Flat namespace
Metadata
None
File metadata
Object metadata
Scalability
Limited
Moderate
Excellent
Performance
Very high
High
Moderate to high
Use Cases
Databases, VMs
Shared filesystems
Web apps, archives
Protocols
iSCSI, FC
NFS, SMB
HTTP/REST
Consistency
Strong
Strong
Eventual (typically)
Cost
High
Moderate
Low
When to Use Each Type
Block Storage :
Running databases requiring low latency
Virtual machine disk images
Applications needing raw disk access
High I/O operations per second (IOPS) requirements
File Storage :
Shared file systems across multiple servers
Content management systems
Legacy applications requiring POSIX semantics
Home directories and user data
Object Storage :
Web applications and static assets
Backup and archival data
Big data analytics
Content delivery and media storage
Unstructured data at scale
HDFS Distributed File System
Hadoop Distributed File System (HDFS) is designed for storing very
large files across clusters of commodity hardware.
HDFS Architecture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ┌──────────────┐ │ NameNode │ (Metadata, namespace) │ (Active) │ └──────┬───────┘ │ ┌──────▼───────┐ │ NameNode │ (Standby - for HA) │ (Standby) │ └──────────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ┌───────▼──────┐ ┌────────▼──────┐ ┌───────▼──────┐ │ DataNode 1 │ │ DataNode 2 │ │ DataNode 3 │ │ (Replica 1) │ │ (Replica 2) │ │ (Replica 3) │ └──────────────┘ └───────────────┘ └──────────────┘
Key Components
NameNode :
Manages file system namespace
Stores metadata (file names, permissions, block locations)
Coordinates file operations
Single point of failure (mitigated by High Availability)
DataNode :
Stores actual data blocks
Serves read/write requests
Reports block status to NameNode
Performs block replication
Secondary NameNode (legacy):
Performs checkpointing of NameNode metadata
Not a backup NameNode (common misconception)
HDFS Configuration
core-site.xml
1 2 3 4 5 6 7 8 9 10 <configuration > <property > <name > fs.defaultFS</name > <value > hdfs://namenode:9000</value > </property > <property > <name > hadoop.tmp.dir</name > <value > /opt/hadoop/tmp</value > </property > </configuration >
hdfs-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 <configuration > <property > <name > dfs.replication</name > <value > 3</value > </property > <property > <name > dfs.namenode.name.dir</name > <value > /opt/hadoop/hdfs/namenode</value > </property > <property > <name > dfs.datanode.data.dir</name > <value > /opt/hadoop/hdfs/datanode</value > </property > <property > <name > dfs.namenode.checkpoint.dir</name > <value > /opt/hadoop/hdfs/checkpoint</value > </property > <property > <name > dfs.blocksize</name > <value > 134217728</value > </property > <property > <name > dfs.namenode.handler.count</name > <value > 100</value > </property > </configuration >
HDFS Operations
Java API Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import java.io.*;public class HDFSOperations { private FileSystem fs; public HDFSOperations () throws IOException { Configuration conf = new Configuration (); conf.set("fs.defaultFS" , "hdfs://namenode:9000" ); this .fs = FileSystem.get(conf); } public void createDirectory (String path) throws IOException { Path dirPath = new Path (path); if (!fs.exists(dirPath)) { fs.mkdirs(dirPath); System.out.println("Directory created: " + path); } } public void uploadFile (String localPath, String hdfsPath) throws IOException { Path localFilePath = new Path (localPath); Path hdfsFilePath = new Path (hdfsPath); fs.copyFromLocalFile(localFilePath, hdfsFilePath); System.out.println("File uploaded: " + localPath + " -> " + hdfsPath); } public void downloadFile (String hdfsPath, String localPath) throws IOException { Path hdfsFilePath = new Path (hdfsPath); Path localFilePath = new Path (localPath); fs.copyToLocalFile(hdfsFilePath, localFilePath); System.out.println("File downloaded: " + hdfsPath + " -> " + localPath); } public void listFiles (String directory) throws IOException { Path dirPath = new Path (directory); FileStatus[] files = fs.listStatus(dirPath); for (FileStatus file : files) { System.out.println(file.getPath().getName() + " (Size: " + file.getLen() + " bytes)" ); } } public void deleteFile (String path) throws IOException { Path filePath = new Path (path); if (fs.exists(filePath)) { fs.delete(filePath, true ); System.out.println("Deleted: " + path); } } public void close () throws IOException { fs.close(); } public static void main (String[] args) throws IOException { HDFSOperations hdfs = new HDFSOperations (); hdfs.createDirectory("/user/data" ); hdfs.uploadFile("/local/file.txt" , "/user/data/file.txt" ); hdfs.listFiles("/user/data" ); hdfs.close(); } }
Python API with hdfs3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 from hdfs3 import HDFileSystemclass HDFSManager : def __init__ (self, host='localhost' , port=9000 ): self.hdfs = HDFileSystem(host=host, port=port) def create_directory (self, path ): """Create directory in HDFS""" if not self.hdfs.exists(path): self.hdfs.mkdir(path) print (f"Created directory: {path} " ) def upload_file (self, local_path, hdfs_path ): """Upload file to HDFS""" self.hdfs.put(local_path, hdfs_path) print (f"Uploaded: {local_path} -> {hdfs_path} " ) def download_file (self, hdfs_path, local_path ): """Download file from HDFS""" self.hdfs.get(hdfs_path, local_path) print (f"Downloaded: {hdfs_path} -> {local_path} " ) def list_files (self, directory ): """List files in directory""" files = self.hdfs.ls(directory, detail=True ) for file_info in files: print (f"{file_info['name' ]} - Size: {file_info['size' ]} bytes" ) return files def delete_file (self, path ): """Delete file or directory""" self.hdfs.rm(path, recursive=True ) print (f"Deleted: {path} " ) def get_file_info (self, path ): """Get file information""" info = self.hdfs.info(path) return { 'path' : info['name' ], 'size' : info['size' ], 'type' : info['kind' ], 'replication' : info.get('replication' , 'N/A' ) } hdfs = HDFSManager(host='namenode' , port=9000 ) hdfs.create_directory('/user/data' ) hdfs.upload_file('/local/data.csv' , '/user/data/data.csv' ) files = hdfs.list_files('/user/data' )
HDFS Replication Strategy
HDFS replicates blocks across multiple DataNodes for fault
tolerance:
Default replication factor : 3
Replication placement :
First replica: Same node as writer (if on DataNode) or random
node
Second replica: Different rack from first
Third replica: Same rack as second, different node
This strategy balances performance (rack-local reads) with fault
tolerance (rack failure resilience).
HDFS High Availability
HA NameNode configuration eliminates single point of failure:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 <configuration > <property > <name > dfs.nameservices</name > <value > mycluster</value > </property > <property > <name > dfs.ha.namenodes.mycluster</name > <value > nn1,nn2</value > </property > <property > <name > dfs.namenode.rpc-address.mycluster.nn1</name > <value > namenode1:9000</value > </property > <property > <name > dfs.namenode.rpc-address.mycluster.nn2</name > <value > namenode2:9000</value > </property > <property > <name > dfs.namenode.shared.edits.dir</name > <value > qjournal://journal1:8485;journal2:8485;journal3:8485/mycluster</value > </property > <property > <name > dfs.client.failover.proxy.provider.mycluster</name > <value > org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value > </property > </configuration >
Ceph Storage Cluster
Deployment
Ceph is a unified distributed storage system providing object, block,
and file storage interfaces.
Ceph Architecture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ┌─────────────────────────────────────────┐ │ Client Applications │ └─────┬───────┬───────┬───────────────────┘ │ │ │ ┌───▼───┐ ┌─▼───┐ ┌─▼───┐ │ Object │ │ Block │ │ File │ (RADOS Gateway, RBD, CephFS) │ Gateway │ │ │ │ │ └───┬───┘ └─┬───┘ └─┬───┘ │ │ │ └───────┴───────┘ │ ┌───────▼───────┐ │ RADOS │ (Reliable Autonomic Distributed Object Store) │ - OSDs │ (Object Storage Daemons) │ - Monitors │ (Cluster state) │ - MDS │ (Metadata Server - for CephFS) └───────────────┘
Core Components
Monitors (MON) :
Maintain cluster membership and state
Provide consensus for cluster configuration
Typically 3 or 5 nodes for quorum
Object Storage Daemons (OSD) :
Store actual data objects
Handle replication and recovery
Report to monitors
Metadata Servers (MDS) :
Manage metadata for CephFS
Not required for object or block storage
RADOS Gateway (RGW) :
Provides S3-compatible object storage API
Handles authentication and authorization
Ceph Deployment
Installation on Ubuntu
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 wget -q -O- 'https://download.ceph.com/keys/release.asc' | sudo apt-key add - echo "deb https://download.ceph.com/debian-{ceph-stable-release}/ $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/ceph.listsudo apt update sudo apt install ceph-deploy ceph-common mkdir ceph-clustercd ceph-clusterceph-deploy new node1 node2 node3 ceph-deploy install node1 node2 node3 ceph-deploy mon create-initial ceph-deploy osd create --data /dev/sdb node1 ceph-deploy osd create --data /dev/sdb node2 ceph-deploy osd create --data /dev/sdb node3 ceph-deploy admin node1 node2 node3
Ceph Configuration File
(ceph.conf)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 [global] fsid = {cluster-id}mon initial members = node1, node2, node3 mon host = 10.0 .0.1 , 10.0 .0.2 , 10.0 .0.3 auth cluster required = cephx auth service required = cephx auth client required = cephx osd journal size = 10240 osd pool default pg num = 128 osd pool default pgp num = 128 osd pool default size = 3 osd pool default min size = 2 public network = 10.0 .0.0 /24 cluster network = 10.0 .1.0 /24 [mon.node1] host = node1mon addr = 10.0 .0.1 :6789 [mon.node2] host = node2mon addr = 10.0 .0.2 :6789 [mon.node3] host = node3mon addr = 10.0 .0.3 :6789 [osd] osd data = /var/lib/ceph/osd/ceph-$id osd journal = /var/lib/ceph/osd/ceph-$id /journal osd mkfs type = xfs osd mkfs options xfs = -f -i size=2048
Ceph Object Storage (RADOS
Gateway)
RGW Configuration
1 2 3 4 5 6 7 ceph-deploy rgw create node1 sudo ceph-deploy --overwrite-conf config push node1 sudo systemctl start ceph-radosgw@rgw.node1 sudo systemctl enable ceph-radosgw@rgw.node1
Python Client for RGW
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import boto3from botocore.client import Configclass CephRadosGateway : def __init__ (self, endpoint_url, access_key, secret_key ): self.s3_client = boto3.client( 's3' , endpoint_url=endpoint_url, aws_access_key_id=access_key, aws_secret_access_key=secret_key, config=Config(signature_version='s3v4' ) ) def create_bucket (self, bucket_name ): """Create bucket in Ceph RGW""" try : self.s3_client.create_bucket(Bucket=bucket_name) print (f"Bucket {bucket_name} created" ) except Exception as e: print (f"Error: {e} " ) def upload_object (self, bucket_name, object_key, data ): """Upload object""" self.s3_client.put_object( Bucket=bucket_name, Key=object_key, Body=data ) print (f"Uploaded {object_key} " ) def list_objects (self, bucket_name ): """List objects in bucket""" response = self.s3_client.list_objects_v2(Bucket=bucket_name) return response.get('Contents' , []) rgw = CephRadosGateway( endpoint_url='http://rgw-node1:7480' , access_key='your-access-key' , secret_key='your-secret-key' ) rgw.create_bucket('my-bucket' ) rgw.upload_object('my-bucket' , 'test.txt' , b'Hello Ceph' )
Ceph Block Storage (RBD)
RBD Operations
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ceph osd pool create rbd 128 128 rbd pool init rbd rbd create --size 10G rbd/myimage sudo rbd map rbd/myimage sudo mkfs.ext4 /dev/rbd0 sudo mount /dev/rbd0 /mnt/rbd rbd snap create rbd/myimage@snapshot1 rbd snap protect rbd/myimage@snapshot1 rbd clone rbd/myimage@snapshot1 rbd/myclone
Python RBD Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import rbdimport radosclass CephRBDManager : def __init__ (self, cluster_name='ceph' , user_name='client.admin' , conf_file='/etc/ceph/ceph.conf' ): self.cluster = rados.Rados( name=cluster_name, clustername=cluster_name, conffile=conf_file, name=user_name ) self.cluster.connect() self.ioctx = None def open_pool (self, pool_name ): """Open a pool""" self.ioctx = self.cluster.open_ioctx(pool_name) def create_image (self, image_name, size_mb ): """Create RBD image""" rbd_inst = rbd.RBD() rbd_inst.create(self.ioctx, image_name, size_mb * 1024 * 1024 ) print (f"Created image: {image_name} ({size_mb} MB)" ) def list_images (self ): """List all images in pool""" rbd_inst = rbd.RBD() return rbd_inst.list (self.ioctx) def create_snapshot (self, image_name, snapshot_name ): """Create snapshot""" image = rbd.Image(self.ioctx, image_name) image.create_snapshot(snapshot_name) print (f"Created snapshot: {snapshot_name} " ) image.close() def close (self ): """Close connections""" if self.ioctx: self.ioctx.close() self.cluster.shutdown() rbd_mgr = CephRBDManager() rbd_mgr.open_pool('rbd' ) rbd_mgr.create_image('myimage' , 10240 ) images = rbd_mgr.list_images() rbd_mgr.create_snapshot('myimage' , 'snap1' ) rbd_mgr.close()
OSD Configuration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 [osd] osd memory target = 4294967296 osd journal size = 10240 ms bind port min = 6800 ms bind port max = 7300 osd recovery max active = 3 osd recovery threads = 1 osd max backfills = 1
CRUSH Map Optimization
CRUSH (Controlled Replication Under Scalable Hashing) algorithm
determines data placement:
1 2 3 4 5 6 7 8 9 ceph osd getcrushmap -o crushmap.bin crushtool -d crushmap.bin -o crushmap.txt crushtool -c crushmap.txt -o crushmap-new.bin ceph osd setcrushmap -i crushmap-new.bin
Data Consistency
and Replication Strategies
Consistency Models
Strong Consistency
All reads return the most recent write. Achieved through:
Synchronous replication
Quorum-based reads/writes
Two-phase commit protocols
Trade-offs :
Higher latency (wait for all replicas)
Lower availability (fails if quorum unavailable)
Higher cost (more network round-trips)
Eventual Consistency
System will become consistent over time if no new updates.
Characteristics:
Reads may return stale data temporarily
All replicas converge to same state eventually
High availability and performance
Use cases :
Social media feeds
DNS systems
CDN content distribution
Causal Consistency
Preserves causal relationships between operations. If operation A
causally precedes operation B, all nodes see A before B.
Read-Your-Writes Consistency
User always sees their own writes, even if others see stale data.
Replication Strategies
Master-Slave Replication
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Write Request │ ▼ ┌─────────┐ │ Master │ ────┐ └────┬────┘ │ Sync/Async │ │ │ ┌───▼───┐ │ │ Slave 1│ │ └───────┘ │ └──────┐ │ ┌────▼────┐ │ Slave 2 │ └─────────┘
Synchronous Replication :
Write completes only after all replicas acknowledge
Strong consistency guaranteed
Higher latency, lower throughput
Asynchronous Replication :
Write completes after master acknowledges
Better performance
Risk of data loss if master fails
Multi-Master Replication
Multiple nodes can accept writes:
1 2 3 4 5 6 7 8 9 Write Request 1 ──┐ ├──► Node 1 ──┐ Write Request 2 ──┤ │ │ ├──► Conflict Resolution Write Request 3 ──┘ │ │ Node 2 ──┐ │ │ Node 3 ──┘
Challenges :
Conflict resolution required
More complex consistency models
Vector clocks or timestamps for ordering
Quorum-Based Replication
Requires majority of replicas to agree:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class QuorumReplication : def __init__ (self, replicas, read_quorum=None , write_quorum=None ): self.replicas = replicas self.read_quorum = read_quorum or (len (replicas) // 2 + 1 ) self.write_quorum = write_quorum or (len (replicas) // 2 + 1 ) def write (self, key, value ): """Write with quorum""" responses = [] for replica in self.replicas: try : result = replica.write(key, value) responses.append(result) if len (responses) >= self.write_quorum: return True except Exception as e: print (f"Replica {replica} failed: {e} " ) return len (responses) >= self.write_quorum def read (self, key ): """Read with quorum""" responses = [] for replica in self.replicas: try : value = replica.read(key) responses.append(value) if len (responses) >= self.read_quorum: return max (responses, key=lambda x: x.timestamp) except Exception as e: print (f"Replica {replica} failed: {e} " ) return None
Vector Clocks
Vector clocks track causal relationships in distributed systems:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class VectorClock : def __init__ (self, node_id, num_nodes ): self.node_id = node_id self.clock = [0 ] * num_nodes def tick (self ): """Increment own clock""" self.clock[self.node_id] += 1 def update (self, other_clock ): """Update with received clock""" for i in range (len (self.clock)): self.clock[i] = max (self.clock[i], other_clock[i]) self.tick() def happens_before (self, other ): """Check if this event happens before other""" less_than = False for i in range (len (self.clock)): if self.clock[i] > other.clock[i]: return False if self.clock[i] < other.clock[i]: less_than = True return less_than def concurrent (self, other ): """Check if events are concurrent""" return not self.happens_before(other) and not other.happens_before(self)
Anti-Entropy and Merkle
Trees
Merkle trees efficiently detect inconsistencies:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import hashlibclass MerkleTree : def __init__ (self, data ): self.data = data self.root = self.build_tree(data) def hash_data (self, data ): """Hash data""" return hashlib.sha256(str (data).encode()).hexdigest() def build_tree (self, data ): """Build Merkle tree""" if len (data) == 1 : return self.hash_data(data[0 ]) mid = len (data) // 2 left = self.build_tree(data[:mid]) right = self.build_tree(data[mid:]) return self.hash_data(left + right) def get_root_hash (self ): """Get root hash""" return self.root def compare_trees (self, other_tree ): """Compare with another tree""" return self.root == other_tree.root
Backup and Disaster Recovery
Backup Strategies
Full Backup
Complete copy of all data at a point in time.
Advantages :
Simple restore process
Complete data recovery
Independent of other backups
Disadvantages :
Time-consuming
High storage requirements
Network bandwidth intensive
Incremental Backup
Only backs up changes since last backup (full or incremental).
Advantages :
Faster backup process
Lower storage requirements
Less network bandwidth
Disadvantages :
Slower restore (requires full + all incrementals)
More complex restore process
Dependency chain
Differential Backup
Backs up changes since last full backup.
Advantages :
Faster restore than incremental (only full + latest
differential)
Moderate storage requirements
Disadvantages :
Slower than incremental backups
Storage grows over time
Backup Implementation
S3 Lifecycle Policies
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import boto3def setup_backup_lifecycle (bucket_name ): """Setup S3 lifecycle policy for automated backups""" s3_client = boto3.client('s3' ) lifecycle_config = { 'Rules' : [ { 'Id' : 'DailyBackups' , 'Status' : 'Enabled' , 'Prefix' : 'backups/daily/' , 'Transitions' : [ { 'Days' : 30 , 'StorageClass' : 'STANDARD_IA' }, { 'Days' : 90 , 'StorageClass' : 'GLACIER' } ], 'Expiration' : { 'Days' : 365 } }, { 'Id' : 'ArchiveOldData' , 'Status' : 'Enabled' , 'Prefix' : 'data/' , 'Transitions' : [ { 'Days' : 180 , 'StorageClass' : 'GLACIER' } ] } ] } s3_client.put_bucket_lifecycle_configuration( Bucket=bucket_name, LifecycleConfiguration=lifecycle_config ) print (f"Lifecycle policy configured for {bucket_name} " )
Automated Backup Script
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import osimport tarfileimport boto3from datetime import datetimeclass BackupManager : def __init__ (self, s3_bucket, backup_prefix='backups' ): self.s3_client = boto3.client('s3' ) self.bucket = s3_bucket self.prefix = backup_prefix def create_backup (self, source_paths, backup_name=None ): """Create compressed backup""" if backup_name is None : backup_name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S' )} .tar.gz" backup_path = f"/tmp/{backup_name} " with tarfile.open (backup_path, 'w:gz' ) as tar: for path in source_paths: if os.path.isfile(path): tar.add(path, arcname=os.path.basename(path)) elif os.path.isdir(path): tar.add(path, arcname=os.path.basename(path)) s3_key = f"{self.prefix} /{backup_name} " self.s3_client.upload_file( backup_path, self.bucket, s3_key, ExtraArgs={'StorageClass' : 'STANDARD_IA' } ) os.remove(backup_path) print (f"Backup created: {s3_key} " ) return s3_key def restore_backup (self, s3_key, restore_path ): """Restore backup from S3""" backup_file = f"/tmp/{os.path.basename(s3_key)} " self.s3_client.download_file(self.bucket, s3_key, backup_file) with tarfile.open (backup_file, 'r:gz' ) as tar: tar.extractall(restore_path) os.remove(backup_file) print (f"Backup restored to: {restore_path} " ) backup_mgr = BackupManager('my-backup-bucket' ) backup_mgr.create_backup(['/var/www' , '/etc/nginx' ])
Disaster Recovery Strategies
RTO and RPO
RTO (Recovery Time Objective) : Maximum acceptable
downtime
RPO (Recovery Point Objective) : Maximum acceptable
data loss
Strategy
RTO
RPO
Cost
Hot standby
Minutes
Seconds
Very High
Warm standby
Hours
Minutes
High
Cold standby
Days
Hours
Moderate
Backup only
Days
Days
Low
Multi-Region Replication
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import boto3class MultiRegionReplication : def __init__ (self, primary_region, replica_regions ): self.primary_region = primary_region self.replica_regions = replica_regions self.primary_s3 = boto3.client('s3' , region_name=primary_region) self.replica_clients = { region: boto3.client('s3' , region_name=region) for region in replica_regions } def setup_cross_region_replication (self, bucket_name ): """Setup cross-region replication""" for region in self.replica_regions: replica_bucket = f"{bucket_name} -{region} " self.replica_clients[region].create_bucket( Bucket=replica_bucket, CreateBucketConfiguration={'LocationConstraint' : region} ) self.primary_s3.put_bucket_versioning( Bucket=bucket_name, VersioningConfiguration={'Status' : 'Enabled' } ) self.replica_clients[region].put_bucket_versioning( Bucket=replica_bucket, VersioningConfiguration={'Status' : 'Enabled' } ) replication_config = { 'Role' : 'arn:aws:iam::account:role/replication-role' , 'Rules' : [{ 'Status' : 'Enabled' , 'Priority' : 1 , 'Destination' : { 'Bucket' : f'arn:aws:s3:::{replica_bucket} ' , 'StorageClass' : 'STANDARD' } }] } self.primary_s3.put_bucket_replication( Bucket=bucket_name, ReplicationConfiguration=replication_config ) print (f"Replication configured: {bucket_name} -> {replica_bucket} " ) replication = MultiRegionReplication( primary_region='us-east-1' , replica_regions=['us-west-2' , 'eu-west-1' ] ) replication.setup_cross_region_replication('my-data-bucket' )
Key metrics for storage systems:
IOPS (Input/Output Operations Per Second) : Number
of read/write operations per second
Throughput : Data transfer rate (MB/s, GB/s)
Latency : Time to complete a single operation
(ms)
Bandwidth : Maximum data transfer capacity
Optimization Techniques
Caching Strategies
Read Cache :
Frequently accessed data stored in faster storage (RAM, SSD)
Reduces latency for hot data
LRU (Least Recently Used) eviction policy
Write Cache :
Buffer writes before committing to persistent storage
Improves write throughput
Risk of data loss if cache fails
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from collections import OrderedDictclass LRUCache : def __init__ (self, capacity ): self.cache = OrderedDict() self.capacity = capacity def get (self, key ): if key not in self.cache: return None self.cache.move_to_end(key) return self.cache[key] def put (self, key, value ): if key in self.cache: self.cache.move_to_end(key) else : if len (self.cache) >= self.capacity: self.cache.popitem(last=False ) self.cache[key] = value
Data Locality
Place data close to computation:
HDFS : Data stored on same nodes as compute
tasks
S3 Select : Push-down predicates to storage
layer
Edge caching : CDN placement near users
Compression
Reduce storage and transfer costs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import gzipimport bz2import lzmaclass CompressionManager : @staticmethod def compress_gzip (data ): """Compress with gzip""" return gzip.compress(data) @staticmethod def decompress_gzip (compressed_data ): """Decompress gzip""" return gzip.decompress(compressed_data) @staticmethod def compress_bz2 (data ): """Compress with bzip2""" return bz2.compress(data) @staticmethod def compress_lzma (data ): """Compress with LZMA""" return lzma.compress(data)
Parallel I/O
Distribute I/O across multiple streams:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import concurrent.futuresimport boto3class ParallelS3Upload : def __init__ (self, bucket_name, max_workers=10 ): self.s3_client = boto3.client('s3' ) self.bucket = bucket_name self.max_workers = max_workers def upload_file_part (self, file_path, object_key, part_number, part_size, offset ): """Upload a single part""" with open (file_path, 'rb' ) as f: f.seek(offset) data = f.read(part_size) response = self.s3_client.upload_part( Bucket=self.bucket, Key=object_key, PartNumber=part_number, UploadId=self.upload_id, Body=data ) return {'PartNumber' : part_number, 'ETag' : response['ETag' ]} def upload_large_file (self, file_path, object_key, part_size_mb=5 ): """Upload large file using multipart upload""" file_size = os.path.getsize(file_path) part_size = part_size_mb * 1024 * 1024 num_parts = (file_size + part_size - 1 ) // part_size response = self.s3_client.create_multipart_upload( Bucket=self.bucket, Key=object_key ) self.upload_id = response['UploadId' ] parts = [] with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [] for i in range (num_parts): offset = i * part_size future = executor.submit( self.upload_file_part, file_path, object_key, i + 1 , part_size, offset ) futures.append(future) for future in concurrent.futures.as_completed(futures): parts.append(future.result()) parts.sort(key=lambda x: x['PartNumber' ]) self.s3_client.complete_multipart_upload( Bucket=self.bucket, Key=object_key, UploadId=self.upload_id, MultipartUpload={'Parts' : parts} ) print (f"Uploaded {object_key} ({file_size} bytes)" )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import timeimport boto3import statisticsclass S3PerformanceBenchmark : def __init__ (self, bucket_name ): self.s3_client = boto3.client('s3' ) self.bucket = bucket_name def benchmark_write (self, object_key, data_size_kb, num_iterations=100 ): """Benchmark write performance""" data = b'x' * (data_size_kb * 1024 ) latencies = [] for i in range (num_iterations): key = f"{object_key} _{i} " start = time.time() self.s3_client.put_object(Bucket=self.bucket, Key=key, Body=data) latency = (time.time() - start) * 1000 latencies.append(latency) return { 'avg_latency_ms' : statistics.mean(latencies), 'p50_latency_ms' : statistics.median(latencies), 'p99_latency_ms' : statistics.quantiles(latencies, n=100 )[98 ], 'throughput_mbps' : (data_size_kb * num_iterations * 8 ) / (sum (latencies) / 1000 ) } def benchmark_read (self, object_key_prefix, num_iterations=100 ): """Benchmark read performance""" latencies = [] for i in range (num_iterations): key = f"{object_key_prefix} _{i} " start = time.time() response = self.s3_client.get_object(Bucket=self.bucket, Key=key) data = response['Body' ].read() latency = (time.time() - start) * 1000 latencies.append(latency) return { 'avg_latency_ms' : statistics.mean(latencies), 'p50_latency_ms' : statistics.median(latencies), 'p99_latency_ms' : statistics.quantiles(latencies, n=100 )[98 ] } benchmark = S3PerformanceBenchmark('test-bucket' ) write_stats = benchmark.benchmark_write('test-object' , data_size_kb=1024 ) read_stats = benchmark.benchmark_read('test-object' ) print (f"Write: {write_stats} " )print (f"Read: {read_stats} " )
Cost Optimization Strategies
Storage Cost Factors
Storage volume : Amount of data stored
Storage class : Performance tier (Standard, IA,
Glacier)
Request costs : PUT, GET, LIST operations
Data transfer : Egress bandwidth
Lifecycle management : Automated tiering
Optimization Techniques
Lifecycle Management
Automatically move data to cheaper storage classes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 def optimize_storage_lifecycle (bucket_name ): """Setup cost-optimized lifecycle policy""" s3_client = boto3.client('s3' ) lifecycle_config = { 'Rules' : [ { 'Id' : 'MoveToIA' , 'Status' : 'Enabled' , 'Prefix' : 'data/' , 'Transitions' : [ { 'Days' : 30 , 'StorageClass' : 'STANDARD_IA' } ] }, { 'Id' : 'MoveToGlacier' , 'Status' : 'Enabled' , 'Prefix' : 'archive/' , 'Transitions' : [ { 'Days' : 90 , 'StorageClass' : 'GLACIER' } ] }, { 'Id' : 'DeleteOldVersions' , 'Status' : 'Enabled' , 'NoncurrentVersionTransitions' : [ { 'NoncurrentDays' : 30 , 'StorageClass' : 'GLACIER' } ], 'NoncurrentVersionExpiration' : { 'NoncurrentDays' : 365 } } ] } s3_client.put_bucket_lifecycle_configuration( Bucket=bucket_name, LifecycleConfiguration=lifecycle_config )
Deduplication
Eliminate duplicate data:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import hashlibclass DeduplicationManager : def __init__ (self, storage_backend ): self.storage = storage_backend self.content_hash_index = {} def store_with_dedup (self, data, object_key ): """Store object with deduplication""" content_hash = hashlib.sha256(data).hexdigest() if content_hash in self.content_hash_index: existing_key = self.content_hash_index[content_hash] self.storage.create_reference(object_key, existing_key) print (f"Deduplicated: {object_key} -> {existing_key} " ) else : self.storage.put_object(object_key, data) self.content_hash_index[content_hash] = object_key print (f"Stored: {object_key} " )
Compression Analysis
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def analyze_compression_savings (file_paths ): """Analyze potential savings from compression""" total_original = 0 total_compressed = 0 for file_path in file_paths: with open (file_path, 'rb' ) as f: original_data = f.read() compressed_data = gzip.compress(original_data) original_size = len (original_data) compressed_size = len (compressed_data) total_original += original_size total_compressed += compressed_size savings = (1 - compressed_size / original_size) * 100 print (f"{file_path} : {original_size} -> {compressed_size} bytes ({savings:.1 f} % savings)" ) total_savings = (1 - total_compressed / total_original) * 100 print (f"\nTotal: {total_original} -> {total_compressed} bytes ({total_savings:.1 f} % savings)" )
Cost Calculator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class StorageCostCalculator : def __init__ (self ): self.pricing = { 'STANDARD' : {'storage' : 0.023 , 'requests' : 0.0004 }, 'STANDARD_IA' : {'storage' : 0.0125 , 'requests' : 0.001 }, 'GLACIER' : {'storage' : 0.004 , 'retrieval' : 0.01 }, 'data_transfer_out' : 0.09 } def calculate_monthly_cost (self, storage_gb, storage_class='STANDARD' , requests=0 , data_transfer_gb=0 ): """Calculate monthly storage cost""" storage_cost = storage_gb * self.pricing[storage_class]['storage' ] request_cost = (requests / 1000 ) * self.pricing[storage_class]['requests' ] transfer_cost = data_transfer_gb * self.pricing['data_transfer_out' ] total = storage_cost + request_cost + transfer_cost return { 'storage_cost' : storage_cost, 'request_cost' : request_cost, 'transfer_cost' : transfer_cost, 'total_cost' : total } def compare_storage_classes (self, storage_gb, requests=0 ): """Compare costs across storage classes""" results = {} for storage_class in ['STANDARD' , 'STANDARD_IA' , 'GLACIER' ]: results[storage_class] = self.calculate_monthly_cost( storage_gb, storage_class, requests ) return results calculator = StorageCostCalculator() costs = calculator.compare_storage_classes(storage_gb=1000 , requests=100000 ) for class_name, cost_info in costs.items(): print (f"{class_name} :${cost_info['total_cost' ]:.2 f} /month" )
Case Studies
Scenario : A large e-commerce platform needs to
migrate from on-premises storage to cloud object storage.
Requirements :
50TB of product images and videos
10 million objects
Global CDN integration
99.99% availability
Cost optimization
Solution :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 class ECommerceStorageMigration : def __init__ (self, source_path, s3_bucket, cdn_distribution_id ): self.source_path = source_path self.s3_bucket = s3_bucket self.cdn_id = cdn_distribution_id self.s3_client = boto3.client('s3' ) self.cloudfront = boto3.client('cloudfront' ) def migrate_with_optimization (self ): """Migrate with storage class optimization""" image_extensions = ['.jpg' , '.jpeg' , '.png' , '.gif' , '.webp' ] video_extensions = ['.mp4' , '.webm' , '.mov' ] for root, dirs, files in os.walk(self.source_path): for file in files: file_path = os.path.join(root, file) relative_path = os.path.relpath(file_path, self.source_path) ext = os.path.splitext(file)[1 ].lower() if ext in image_extensions: storage_class = 'STANDARD' prefix = 'images/' elif ext in video_extensions: storage_class = 'STANDARD_IA' prefix = 'videos/' else : storage_class = 'STANDARD' prefix = 'assets/' self.s3_client.upload_file( file_path, self.s3_bucket, f"{prefix} {relative_path} " , ExtraArgs={'StorageClass' : storage_class} ) self.setup_lifecycle_policies() self.cloudfront.create_invalidation( DistributionId=self.cdn_id, InvalidationBatch={ 'Paths' : {'Quantity' : 1 , 'Items' : ['/*' ]}, 'CallerReference' : str (time.time()) } ) def setup_lifecycle_policies (self ): """Setup automated lifecycle management""" lifecycle_config = { 'Rules' : [ { 'Id' : 'ImageLifecycle' , 'Status' : 'Enabled' , 'Prefix' : 'images/' , 'Transitions' : [ {'Days' : 90 , 'StorageClass' : 'STANDARD_IA' }, {'Days' : 365 , 'StorageClass' : 'GLACIER' } ] }, { 'Id' : 'VideoLifecycle' , 'Status' : 'Enabled' , 'Prefix' : 'videos/' , 'Transitions' : [ {'Days' : 180 , 'StorageClass' : 'GLACIER' } ] } ] } self.s3_client.put_bucket_lifecycle_configuration( Bucket=self.s3_bucket, LifecycleConfiguration=lifecycle_config )
Scenario : A company needs distributed storage for
Hadoop-based analytics workloads processing 100TB daily.
Requirements :
HDFS cluster for Hadoop workloads
High throughput for MapReduce jobs
Fault tolerance (3x replication)
Integration with S3 for archival
Solution :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class BigDataStorageArchitecture : def __init__ (self ): self.hdfs_cluster = None self.s3_archive_bucket = 'analytics-archive' def setup_hdfs_cluster (self, namenodes, datanodes ): """Setup HDFS cluster configuration""" hdfs_config = { 'dfs.nameservices' : 'analytics-cluster' , 'dfs.ha.namenodes.analytics-cluster' : 'nn1,nn2' , 'dfs.replication' : 3 , 'dfs.blocksize' : 268435456 , 'dfs.namenode.handler.count' : 200 , 'dfs.datanode.max.xcievers' : 4096 } datanode_config = { 'dfs.datanode.data.dir' : '/data1,/data2,/data3' , 'dfs.datanode.balance.bandwidthPerSec' : 104857600 } return {**hdfs_config, **datanode_config} def archive_to_s3 (self, hdfs_path, retention_days=90 ): """Archive old data from HDFS to S3""" distcp_command = f""" hadoop distcp \ -D fs.s3a.access.key=$AWS_ACCESS_KEY \ -D fs.s3a.secret.key=$AWS_SECRET_KEY \ hdfs://namenode:9000{hdfs_path} \ s3a://{self.s3_archive_bucket} /archive/ """ return distcp_command
Case Study 3:
Multi-Region Content Delivery
Scenario : A media streaming service needs to serve
content globally with low latency and high availability.
Requirements :
Content stored in multiple regions
Automatic failover
Low latency for global users
Cost-effective storage
Solution :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 class MultiRegionContentDelivery : def __init__ (self, primary_region='us-east-1' ): self.primary_region = primary_region self.replica_regions = ['eu-west-1' , 'ap-southeast-1' , 'us-west-2' ] self.regions_config = {} for region in [primary_region] + self.replica_regions: self.regions_config[region] = boto3.client('s3' , region_name=region) def setup_cross_region_replication (self, bucket_name ): """Setup automatic cross-region replication""" for region, client in self.regions_config.items(): try : if region == self.primary_region: client.create_bucket(Bucket=bucket_name) else : client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint' : region} ) except Exception as e: print (f"Bucket may already exist in {region} : {e} " ) for client in self.regions_config.values(): client.put_bucket_versioning( Bucket=bucket_name, VersioningConfiguration={'Status' : 'Enabled' } ) replication_role = 'arn:aws:iam::account:role/replication-role' rules = [] for i, region in enumerate (self.replica_regions): rules.append({ 'Id' : f'ReplicateTo{region} ' , 'Status' : 'Enabled' , 'Priority' : i + 1 , 'Destination' : { 'Bucket' : f'arn:aws:s3:::{bucket_name} ' , 'StorageClass' : 'STANDARD' } }) self.regions_config[self.primary_region].put_bucket_replication( Bucket=bucket_name, ReplicationConfiguration={ 'Role' : replication_role, 'Rules' : rules } ) def get_content_url (self, object_key, user_region ): """Get optimal content URL based on user region""" region_mapping = { 'US' : 'us-east-1' , 'EU' : 'eu-west-1' , 'AP' : 'ap-southeast-1' , 'US-WEST' : 'us-west-2' } optimal_region = region_mapping.get(user_region, self.primary_region) bucket_name = 'content-bucket' url = f"https://{bucket_name} .s3.{optimal_region} .amazonaws.com/{object_key} " return url
Q&A Section
Q1:
What's the difference between object storage and block storage?
A : Object storage stores data as objects with
metadata in a flat namespace, accessed via REST APIs. Block storage
provides raw block-level access, typically used by filesystems. Object
storage excels at scalability and web applications, while block storage
is better for databases and low-latency applications.
Q2: How does HDFS handle
node failures?
A : HDFS replicates each block across multiple
DataNodes (default: 3 replicas). If a DataNode fails, the NameNode
detects the failure through heartbeat mechanisms and initiates
replication to restore the replication factor. The system continues
operating with reduced redundancy until replication completes.
Q3:
What is eventual consistency, and when is it acceptable?
A : Eventual consistency means the system will become
consistent over time if no new updates occur. It's acceptable for:
Social media feeds (slight delays acceptable)
DNS systems (propagation delays expected)
Analytics data (not real-time critical)
Content delivery (caching acceptable)
It's NOT acceptable for:
Financial transactions
Inventory management
Medical records
Real-time collaborative editing
Q4: How do I
choose between S3 storage classes?
A : Choose based on access patterns:
Standard : Frequently accessed data (< 30
days)
Standard-IA : Infrequently accessed (monthly or
less)
Glacier : Archive data (quarterly or less)
Intelligent-Tiering : Unknown or changing access
patterns
Consider retrieval costs: Glacier has low storage costs but charges
for retrieval.
Q5: What is a quorum
in distributed systems?
A : A quorum is the minimum number of nodes that must
agree for an operation to succeed. For N replicas:
Read quorum: (N/2) + 1
Write quorum: (N/2) + 1
This ensures at least one node has the latest data and prevents
split-brain scenarios.
Q6: How does Ceph
ensure data consistency?
A : Ceph uses: 1. CRUSH algorithm :
Deterministic data placement 2. PG (Placement Groups) :
Logical grouping of objects 3. OSD maps : Track cluster
state and data location 4. Quorum-based operations :
Monitors maintain consensus 5. Replication : Multiple
OSDs store each object
The system maintains consistency through these mechanisms while
allowing tunable consistency levels.
Q7:
What are the trade-offs of synchronous vs asynchronous replication?
A :
Synchronous :
✅ Strong consistency
✅ No data loss risk
❌ Higher latency
❌ Lower throughput
❌ Fails if replica unavailable
Asynchronous :
✅ Lower latency
✅ Higher throughput
✅ Better availability
❌ Risk of data loss
❌ Eventual consistency
Choose based on RPO requirements: zero data loss needs synchronous
replication.
Q8:
How can I optimize storage costs for a large-scale application?
A : Strategies include: 1. Lifecycle
policies : Automatically move to cheaper storage classes 2.
Compression : Reduce storage requirements (2-6x
typically) 3. Deduplication : Eliminate duplicate data
4. Right-sizing : Choose appropriate storage classes 5.
Monitoring : Track usage and optimize based on patterns
6. Regional optimization : Store data in cheaper regions
when possible
Q9:
What is the CAP theorem, and why can't I have all three properties?
A : CAP theorem states you can guarantee at most two
of: Consistency, Availability, Partition tolerance. Partition tolerance
is mandatory in distributed systems (networks fail), forcing a choice
between C and A. This is a fundamental trade-off: strong consistency
requires coordination (reducing availability), while high availability
during partitions allows inconsistency.
Q10:
How do I implement disaster recovery for cloud storage?
A : Implement a comprehensive DR strategy: 1.
Backup strategy : Regular automated backups (full +
incremental) 2. Multi-region replication : Geographic
redundancy 3. Versioning : Enable object versioning for
point-in-time recovery 4. Lifecycle policies : Automate
backup retention and archival 5. Testing : Regularly
test restore procedures 6. Documentation : Maintain
runbooks for recovery scenarios 7. Monitoring : Alert on
backup failures and replication lag
Define RTO and RPO requirements to guide your strategy.
Summary Cheat Sheet
Storage Types Comparison
Type
Access
Use Case
Example
Block
Block-level I/O
Databases, VMs
AWS EBS, Ceph RBD
File
File system APIs
Shared filesystems
AWS EFS, NFS
Object
REST APIs
Web apps, archives
S3, OSS, Ceph RGW
Consistency Models
Model
Guarantee
Use Case
Strong
All reads see latest write
Financial systems
Eventual
Consistent over time
Social media, CDN
Causal
Preserves causality
Collaborative systems
Read-your-writes
User sees own writes
Web applications
Replication Strategies
Strategy
Consistency
Performance
Use Case
Master-Slave Sync
Strong
Lower
Critical data
Master-Slave Async
Eventual
Higher
High throughput
Multi-Master
Eventual
Highest
Global systems
Quorum-based
Tunable
Moderate
Distributed systems
Storage Optimization
Checklist
Key Metrics to Monitor
Durability : 99.999999999% (11 nines) for object
storage
Availability : 99.99% (4 nines) for production
systems
Latency : P50, P99, P999 percentiles
Throughput : IOPS and bandwidth
Cost : Storage, requests, transfer costs
Replication lag : For async replication
Backup success rate : For DR readiness
Common Patterns
Write-Through Cache : Write to cache and storage
simultaneously Write-Back Cache : Write to cache, flush
to storage later Read-Through Cache : Check cache, read
from storage if miss Write-Around Cache : Write directly
to storage, invalidate cache
Sharding : Distribute data across multiple storage
nodes Partitioning : Divide data by key ranges or hash
Replication : Copy data to multiple nodes for
redundancy
This comprehensive guide covers the essential aspects of cloud
storage systems and distributed architecture. From theoretical
foundations to practical implementations, these concepts form the
backbone of modern cloud infrastructure. Whether you're designing a new
system or optimizing an existing one, understanding these principles is
crucial for building scalable, reliable, and cost-effective storage
solutions.