Cloud Computing (3): Storage Systems and Distributed Architecture
Chen Kai BOSS

Modern cloud infrastructure relies on distributed storage systems that can scale to petabytes, maintain high availability, and provide consistent performance across global deployments. Understanding how these systems work — from the theoretical foundations of CAP theorem to practical implementations like Amazon S3, HDFS, and Ceph — is essential for architects and engineers building cloud-native applications.

This article explores the fundamental principles, architectural patterns, and real-world implementations of distributed storage systems. We'll examine object storage architectures, compare block and file storage models, dive deep into HDFS and Ceph deployments, and discuss strategies for consistency, replication, backup, performance optimization, and cost management.

Distributed Storage Fundamentals

The CAP Theorem

The CAP theorem, proposed by Eric Brewer in 2000, states that a distributed system can guarantee at most two out of three properties:

  • Consistency: All nodes see the same data simultaneously
  • Availability: Every request receives a response (success or failure)
  • Partition Tolerance: System continues operating despite network failures

In practice, partition tolerance is non-negotiable for distributed systems — network partitions will occur. This forces a choice between consistency and availability.

CP Systems prioritize consistency over availability. When a partition occurs, CP systems may refuse requests to maintain data consistency. Examples include:

  • Traditional relational databases with synchronous replication
  • HBase (strong consistency model)
  • MongoDB with majority write concern

AP Systems prioritize availability over strict consistency. They continue serving requests during partitions, potentially returning stale data. Examples include:

  • Amazon DynamoDB (eventual consistency)
  • Cassandra (tunable consistency)
  • Riak (eventual consistency)

CA Systems are theoretically impossible in distributed environments but exist in single-node systems or tightly coupled clusters without network partitions.

BASE Principles

BASE (Basically Available, Soft state, Eventual consistency) complements ACID properties for distributed systems:

  • Basically Available: System remains available most of the time, even during partial failures
  • Soft State: System state may change without input due to eventual consistency
  • Eventual Consistency: System will become consistent over time, given no new updates

BASE systems trade immediate consistency for improved availability and performance, making them ideal for large-scale web applications.

Distributed Storage Architecture Patterns

Master-Slave Architecture

In master-slave (primary-secondary) architectures, one node coordinates operations while others replicate data:

1
2
3
4
5
6
7
8
9
┌─────────────┐
│ Master │ (Coordinates writes, manages metadata)
└──────┬──────┘

┌───┴───┬────────┬────────┐
│ │ │ │
┌──▼──┐ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐
│ Slave │ │ Slave │ │ Slave │ │ Slave │ (Store data replicas)
└─────┘ └─────┘ └─────┘ └─────┘

Advantages:

  • Simple to implement and reason about
  • Strong consistency possible with synchronous replication
  • Clear failure semantics

Disadvantages:

  • Master becomes a bottleneck
  • Single point of failure (requires failover mechanisms)
  • Limited scalability

Peer-to-Peer Architecture

Peer-to-peer systems distribute coordination across all nodes:

1
2
3
4
5
6
7
8
9
10
    ┌─────┐
│ Node1│
└──┬──┘

┌───┴───┬────────┬────────┐
│ │ │ │
┌──▼──┐ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐
│ Node2│ │ Node3│ │ Node4│ │ Node5│
└──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘
└───────┴────────┴───────┘

Advantages:

  • No single point of failure
  • Horizontal scalability
  • Better fault tolerance

Disadvantages:

  • More complex to implement
  • Eventual consistency challenges
  • Conflict resolution required

Consistent Hashing

Consistent hashing enables efficient data distribution and minimizes rebalancing when nodes join or leave:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import hashlib
import bisect

class ConsistentHash:
def __init__(self, nodes=None, replicas=3):
self.replicas = replicas
self.ring = {}
self.sorted_keys = []

if nodes:
for node in nodes:
self.add_node(node)

def _hash(self, key):
"""Generate hash for a key"""
return int(hashlib.md5(key.encode()).hexdigest(), 16)

def add_node(self, node):
"""Add a node to the hash ring"""
for i in range(self.replicas):
key = self._hash(f"{node}:{i}")
self.ring[key] = node
bisect.insort(self.sorted_keys, key)

def remove_node(self, node):
"""Remove a node from the hash ring"""
for i in range(self.replicas):
key = self._hash(f"{node}:{i}")
del self.ring[key]
self.sorted_keys.remove(key)

def get_node(self, key):
"""Get the node responsible for a key"""
if not self.ring:
return None

hash_key = self._hash(key)
idx = bisect.bisect_right(self.sorted_keys, hash_key)
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]

# Example usage
ch = ConsistentHash(['node1', 'node2', 'node3', 'node4'])
print(ch.get_node('user:12345')) # Returns one of the nodes

Object Storage Deep Dive

Amazon S3 Architecture

Amazon S3 (Simple Storage Service) is the de facto standard for object storage. Its architecture demonstrates key principles of scalable object storage systems.

S3 Request Flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Client Request


┌─────────────────┐
│ Load Balancer │ (Distributes requests)
└────────┬────────┘

┌────┴────┐
│ │
┌───▼───┐ ┌───▼───┐
│ Frontend │ │ Frontend │ (Request routing)
└───┬───┘ └───┬───┘
│ │
└────┬────┘

┌────────▼────────┐
│ Metadata Store │ (DynamoDB - object metadata)
└────────┬────────┘

┌────┴────┐
│ │
┌───▼───┐ ┌───▼───┐
│ Storage │ │ Storage │ (Actual object data)
│ Node │ │ Node │
└───────┘ └───────┘

S3 Key Concepts

Buckets: Top-level containers for objects, organized by region and namespace.

Objects: Immutable data units with:

  • Key (unique identifier)
  • Value (data)
  • Metadata (content-type, custom headers)
  • Version ID (for versioning)

Regions: Geographic locations for data storage, enabling compliance and latency optimization.

S3 API Operations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import boto3
from botocore.exceptions import ClientError

class S3StorageManager:
def __init__(self, region_name='us-east-1'):
self.s3_client = boto3.client('s3', region_name=region_name)

def create_bucket(self, bucket_name):
"""Create a new S3 bucket"""
try:
self.s3_client.create_bucket(Bucket=bucket_name)
print(f"Bucket {bucket_name} created successfully")
except ClientError as e:
print(f"Error creating bucket: {e}")

def upload_object(self, bucket_name, object_key, file_path, metadata=None):
"""Upload an object to S3"""
extra_args = {}
if metadata:
extra_args['Metadata'] = metadata

try:
self.s3_client.upload_file(
file_path,
bucket_name,
object_key,
ExtraArgs=extra_args
)
print(f"Object {object_key} uploaded successfully")
except ClientError as e:
print(f"Error uploading object: {e}")

def download_object(self, bucket_name, object_key, download_path):
"""Download an object from S3"""
try:
self.s3_client.download_file(
bucket_name,
object_key,
download_path
)
print(f"Object {object_key} downloaded successfully")
except ClientError as e:
print(f"Error downloading object: {e}")

def list_objects(self, bucket_name, prefix=''):
"""List objects in a bucket"""
try:
response = self.s3_client.list_objects_v2(
Bucket=bucket_name,
Prefix=prefix
)
return response.get('Contents', [])
except ClientError as e:
print(f"Error listing objects: {e}")
return []

def delete_object(self, bucket_name, object_key):
"""Delete an object from S3"""
try:
self.s3_client.delete_object(Bucket=bucket_name, Key=object_key)
print(f"Object {object_key} deleted successfully")
except ClientError as e:
print(f"Error deleting object: {e}")

# Usage example
manager = S3StorageManager()
manager.create_bucket('my-app-data')
manager.upload_object(
'my-app-data',
'users/profile.jpg',
'/local/path/profile.jpg',
metadata={'user-id': '12345', 'upload-time': '2025-01-01'}
)

S3 Storage Classes

S3 offers multiple storage classes optimized for different access patterns:

Storage Class Use Case Durability Availability Cost
Standard Frequently accessed data 99.999999999% 99.99% Highest
Standard-IA Infrequently accessed 99.999999999% 99.9% Lower
One Zone-IA Non-critical infrequent access 99.999999999% 99.5% Lower still
Glacier Instant Retrieval Archive with instant access 99.999999999% 99.9% Very low
Glacier Flexible Retrieval Archive (3-5 min retrieval) 99.999999999% 99.99% Very low
Glacier Deep Archive Long-term archive (12-48h) 99.999999999% 99.99% Lowest
Intelligent-Tiering Automatic optimization 99.999999999% 99.9% Variable

S3 Consistency Model

S3 provides:

  • Read-after-write consistency for PUTs of new objects
  • Eventual consistency for overwrite PUTs and DELETEs
  • Strong consistency for GET operations (as of December 2020)

This model balances performance with consistency guarantees.

Alibaba Cloud OSS Architecture

Alibaba Cloud Object Storage Service (OSS) follows similar principles to S3 but with some architectural differences:

OSS Architecture Components

1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────────────────────────────────┐
│ OSS API Gateway │
│ (Request routing, authentication) │
└──────────────┬──────────────────────────┘

┌──────────┴──────────┐
│ │
┌───▼────┐ ┌─────▼────┐
│ Metadata │ │ Storage │
│ Service │ │ Service │
│(TableStore) │ (Pangu) │
└────────┘ └──────────┘

Key Features:

  • Multi-version support
  • Server-side encryption
  • Cross-region replication
  • Lifecycle management
  • CDN integration

OSS Python SDK Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import oss2

class OSSStorageManager:
def __init__(self, access_key_id, access_key_secret, endpoint, bucket_name):
auth = oss2.Auth(access_key_id, access_key_secret)
self.bucket = oss2.Bucket(auth, endpoint, bucket_name)

def upload_file(self, local_file, object_key):
"""Upload file to OSS"""
result = self.bucket.put_object_from_file(object_key, local_file)
print(f"Upload successful. ETag: {result.etag}")
return result

def download_file(self, object_key, local_file):
"""Download file from OSS"""
self.bucket.get_object_to_file(object_key, local_file)
print(f"Download successful: {local_file}")

def list_objects(self, prefix='', max_keys=100):
"""List objects with prefix"""
objects = []
for obj in oss2.ObjectIterator(self.bucket, prefix=prefix, max_keys=max_keys):
objects.append({
'key': obj.key,
'size': obj.size,
'last_modified': obj.last_modified
})
return objects

def delete_object(self, object_key):
"""Delete object from OSS"""
self.bucket.delete_object(object_key)
print(f"Deleted: {object_key}")

def set_lifecycle_rule(self, prefix, expiration_days):
"""Set lifecycle rule for automatic deletion"""
rule = oss2.models.LifecycleRule(
id='auto-delete',
status='Enabled',
prefix=prefix,
expiration=expiration_days
)
self.bucket.put_bucket_lifecycle(oss2.models.BucketLifecycle([rule]))
print(f"Lifecycle rule set for prefix: {prefix}")

# Usage
manager = OSSStorageManager(
access_key_id='your-key',
access_key_secret='your-secret',
endpoint='https://oss-cn-hangzhou.aliyuncs.com',
bucket_name='my-bucket'
)
manager.upload_file('/local/file.txt', 'remote/file.txt')

Block vs File Storage Comparison

Understanding the differences between block, file, and object storage is crucial for selecting the right storage solution.

Block Storage

Block storage treats storage as a sequence of fixed-size blocks, typically 512 bytes to 4KB. The storage system doesn't understand file semantics — it only manages blocks.

Characteristics

  • Low-level access: Direct block-level I/O
  • High performance: Minimal overhead, suitable for databases
  • Flexible: Can be formatted with any filesystem
  • No metadata: Block storage doesn't track file information

Use Cases

  • Database storage (MySQL, PostgreSQL, Oracle)
  • Virtual machine disk images
  • High-performance applications requiring low latency
  • RAID implementations

Block Storage Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Application


┌──────────┐
│ File System │ (ext4, XFS, NTFS)
└────┬─────┘


┌──────────┐
│ Block Layer │ (Logical Volume Manager)
└────┬─────┘


┌──────────┐
│ Physical │ (Hard drives, SSDs)
│ Storage │
└──────────┘

AWS EBS Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import boto3

class EBSManager:
def __init__(self, region_name='us-east-1'):
self.ec2 = boto3.client('ec2', region_name=region_name)

def create_volume(self, size_gb, volume_type='gp3', availability_zone='us-east-1a'):
"""Create an EBS volume"""
response = self.ec2.create_volume(
AvailabilityZone=availability_zone,
Size=size_gb,
VolumeType=volume_type,
Encrypted=True
)
volume_id = response['VolumeId']
print(f"Created volume: {volume_id}")
return volume_id

def attach_volume(self, volume_id, instance_id, device='/dev/sdf'):
"""Attach volume to EC2 instance"""
self.ec2.attach_volume(
VolumeId=volume_id,
InstanceId=instance_id,
Device=device
)
print(f"Attached {volume_id} to {instance_id} as {device}")

def create_snapshot(self, volume_id, description=''):
"""Create snapshot of volume"""
response = self.ec2.create_snapshot(
VolumeId=volume_id,
Description=description
)
snapshot_id = response['SnapshotId']
print(f"Created snapshot: {snapshot_id}")
return snapshot_id

def list_volumes(self):
"""List all EBS volumes"""
response = self.ec2.describe_volumes()
return response['Volumes']

# Usage
ebs = EBSManager()
volume_id = ebs.create_volume(size_gb=100, volume_type='gp3')
ebs.attach_volume(volume_id, 'i-1234567890abcdef0')
snapshot_id = ebs.create_snapshot(volume_id, 'Daily backup')

File Storage

File storage organizes data in a hierarchical directory structure with files and folders. It provides file-level semantics and metadata.

Characteristics

  • Hierarchical structure: Directory trees with files
  • File semantics: Open, read, write, close operations
  • Metadata: File permissions, timestamps, ownership
  • Network protocols: NFS, SMB/CIFS, CIFS

Use Cases

  • Shared file systems for multiple servers
  • Content management systems
  • Home directories
  • Application configuration files

File Storage Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Client Application


┌──────────────┐
│ NFS/SMB Client │
└──────┬───────┘
│ Network Protocol

┌──────────────┐
│ File Server │
│(NFS/SMB) │
└──────┬───────┘

┌──────▼───────┐
│ File System │ (ext4, ZFS, etc.)
└──────┬───────┘

┌──────▼───────┐
│ Block Storage │
└──────────────┘

AWS EFS Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import boto3

class EFSManager:
def __init__(self, region_name='us-east-1'):
self.efs = boto3.client('efs', region_name=region_name)
self.ec2 = boto3.client('ec2', region_name=region_name)

def create_file_system(self, performance_mode='generalPurpose', throughput_mode='bursting'):
"""Create EFS file system"""
response = self.efs.create_file_system(
PerformanceMode=performance_mode,
ThroughputMode=throughput_mode,
Encrypted=True
)
file_system_id = response['FileSystemId']
print(f"Created EFS: {file_system_id}")
return file_system_id

def create_mount_target(self, file_system_id, subnet_id, security_groups):
"""Create mount target for EFS"""
response = self.efs.create_mount_target(
FileSystemId=file_system_id,
SubnetId=subnet_id,
SecurityGroups=security_groups
)
mount_target_id = response['MountTargetId']
print(f"Created mount target: {mount_target_id}")
return mount_target_id

def list_file_systems(self):
"""List all EFS file systems"""
response = self.efs.describe_file_systems()
return response['FileSystems']

# Usage
efs = EFSManager()
fs_id = efs.create_file_system(performance_mode='generalPurpose')
# Mount target requires subnet and security group IDs

Comparison Table

Feature Block Storage File Storage Object Storage
Access Method Block-level I/O File system APIs REST APIs
Data Organization Blocks Hierarchical directories Flat namespace
Metadata None File metadata Object metadata
Scalability Limited Moderate Excellent
Performance Very high High Moderate to high
Use Cases Databases, VMs Shared filesystems Web apps, archives
Protocols iSCSI, FC NFS, SMB HTTP/REST
Consistency Strong Strong Eventual (typically)
Cost High Moderate Low

When to Use Each Type

Block Storage:

  • Running databases requiring low latency
  • Virtual machine disk images
  • Applications needing raw disk access
  • High I/O operations per second (IOPS) requirements

File Storage:

  • Shared file systems across multiple servers
  • Content management systems
  • Legacy applications requiring POSIX semantics
  • Home directories and user data

Object Storage:

  • Web applications and static assets
  • Backup and archival data
  • Big data analytics
  • Content delivery and media storage
  • Unstructured data at scale

HDFS Distributed File System

Hadoop Distributed File System (HDFS) is designed for storing very large files across clusters of commodity hardware.

HDFS Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                    ┌──────────────┐
│ NameNode │ (Metadata, namespace)
│ (Active) │
└──────┬───────┘

┌──────▼───────┐
│ NameNode │ (Standby - for HA)
│ (Standby) │
└──────────────┘

┌──────────────────┼──────────────────┐
│ │ │
┌───────▼──────┐ ┌────────▼──────┐ ┌───────▼──────┐
│ DataNode 1 │ │ DataNode 2 │ │ DataNode 3 │
│ (Replica 1) │ │ (Replica 2) │ │ (Replica 3) │
└──────────────┘ └───────────────┘ └──────────────┘

Key Components

NameNode:

  • Manages file system namespace
  • Stores metadata (file names, permissions, block locations)
  • Coordinates file operations
  • Single point of failure (mitigated by High Availability)

DataNode:

  • Stores actual data blocks
  • Serves read/write requests
  • Reports block status to NameNode
  • Performs block replication

Secondary NameNode (legacy):

  • Performs checkpointing of NameNode metadata
  • Not a backup NameNode (common misconception)

HDFS Configuration

core-site.xml

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop/tmp</value>
</property>
</configuration>

hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/hadoop/hdfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/opt/hadoop/hdfs/datanode</value>
</property>
<property>
<name>dfs.namenode.checkpoint.dir</name>
<value>/opt/hadoop/hdfs/checkpoint</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value> <!-- 128MB -->
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>100</value>
</property>
</configuration>

HDFS Operations

Java API Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSOperations {
private FileSystem fs;

public HDFSOperations() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://namenode:9000");
this.fs = FileSystem.get(conf);
}

public void createDirectory(String path) throws IOException {
Path dirPath = new Path(path);
if (!fs.exists(dirPath)) {
fs.mkdirs(dirPath);
System.out.println("Directory created: " + path);
}
}

public void uploadFile(String localPath, String hdfsPath) throws IOException {
Path localFilePath = new Path(localPath);
Path hdfsFilePath = new Path(hdfsPath);

fs.copyFromLocalFile(localFilePath, hdfsFilePath);
System.out.println("File uploaded: " + localPath + " -> " + hdfsPath);
}

public void downloadFile(String hdfsPath, String localPath) throws IOException {
Path hdfsFilePath = new Path(hdfsPath);
Path localFilePath = new Path(localPath);

fs.copyToLocalFile(hdfsFilePath, localFilePath);
System.out.println("File downloaded: " + hdfsPath + " -> " + localPath);
}

public void listFiles(String directory) throws IOException {
Path dirPath = new Path(directory);
FileStatus[] files = fs.listStatus(dirPath);

for (FileStatus file : files) {
System.out.println(file.getPath().getName() +
" (Size: " + file.getLen() + " bytes)");
}
}

public void deleteFile(String path) throws IOException {
Path filePath = new Path(path);
if (fs.exists(filePath)) {
fs.delete(filePath, true);
System.out.println("Deleted: " + path);
}
}

public void close() throws IOException {
fs.close();
}

public static void main(String[] args) throws IOException {
HDFSOperations hdfs = new HDFSOperations();
hdfs.createDirectory("/user/data");
hdfs.uploadFile("/local/file.txt", "/user/data/file.txt");
hdfs.listFiles("/user/data");
hdfs.close();
}
}

Python API with hdfs3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from hdfs3 import HDFileSystem

class HDFSManager:
def __init__(self, host='localhost', port=9000):
self.hdfs = HDFileSystem(host=host, port=port)

def create_directory(self, path):
"""Create directory in HDFS"""
if not self.hdfs.exists(path):
self.hdfs.mkdir(path)
print(f"Created directory: {path}")

def upload_file(self, local_path, hdfs_path):
"""Upload file to HDFS"""
self.hdfs.put(local_path, hdfs_path)
print(f"Uploaded: {local_path} -> {hdfs_path}")

def download_file(self, hdfs_path, local_path):
"""Download file from HDFS"""
self.hdfs.get(hdfs_path, local_path)
print(f"Downloaded: {hdfs_path} -> {local_path}")

def list_files(self, directory):
"""List files in directory"""
files = self.hdfs.ls(directory, detail=True)
for file_info in files:
print(f"{file_info['name']} - Size: {file_info['size']} bytes")
return files

def delete_file(self, path):
"""Delete file or directory"""
self.hdfs.rm(path, recursive=True)
print(f"Deleted: {path}")

def get_file_info(self, path):
"""Get file information"""
info = self.hdfs.info(path)
return {
'path': info['name'],
'size': info['size'],
'type': info['kind'],
'replication': info.get('replication', 'N/A')
}

# Usage
hdfs = HDFSManager(host='namenode', port=9000)
hdfs.create_directory('/user/data')
hdfs.upload_file('/local/data.csv', '/user/data/data.csv')
files = hdfs.list_files('/user/data')

HDFS Replication Strategy

HDFS replicates blocks across multiple DataNodes for fault tolerance:

  1. Default replication factor: 3

  2. Replication placement:

    • First replica: Same node as writer (if on DataNode) or random node
    • Second replica: Different rack from first
    • Third replica: Same rack as second, different node

This strategy balances performance (rack-local reads) with fault tolerance (rack failure resilience).

HDFS High Availability

HA NameNode configuration eliminates single point of failure:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<configuration>
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>namenode1:9000</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>namenode2:9000</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://journal1:8485;journal2:8485;journal3:8485/mycluster</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
</configuration>

Ceph Storage Cluster Deployment

Ceph is a unified distributed storage system providing object, block, and file storage interfaces.

Ceph Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
┌─────────────────────────────────────────┐
│ Client Applications │
└─────┬───────┬───────┬───────────────────┘
│ │ │
┌───▼───┐ ┌─▼───┐ ┌─▼───┐
│ Object │ │ Block │ │ File │ (RADOS Gateway, RBD, CephFS)
│ Gateway │ │ │ │ │
└───┬───┘ └─┬───┘ └─┬───┘
│ │ │
└───────┴───────┘

┌───────▼───────┐
│ RADOS │ (Reliable Autonomic Distributed Object Store)
│ - OSDs │ (Object Storage Daemons)
│ - Monitors │ (Cluster state)
│ - MDS │ (Metadata Server - for CephFS)
└───────────────┘

Core Components

Monitors (MON):

  • Maintain cluster membership and state
  • Provide consensus for cluster configuration
  • Typically 3 or 5 nodes for quorum

Object Storage Daemons (OSD):

  • Store actual data objects
  • Handle replication and recovery
  • Report to monitors

Metadata Servers (MDS):

  • Manage metadata for CephFS
  • Not required for object or block storage

RADOS Gateway (RGW):

  • Provides S3-compatible object storage API
  • Handles authentication and authorization

Ceph Deployment

Installation on Ubuntu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Add Ceph repository
wget -q -O- 'https://download.ceph.com/keys/release.asc' | sudo apt-key add -
echo "deb https://download.ceph.com/debian-{ceph-stable-release}/ $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/ceph.list

# Install Ceph
sudo apt update
sudo apt install ceph-deploy ceph-common

# Create cluster
mkdir ceph-cluster
cd ceph-cluster
ceph-deploy new node1 node2 node3

# Install on nodes
ceph-deploy install node1 node2 node3

# Create monitor
ceph-deploy mon create-initial

# Create OSDs
ceph-deploy osd create --data /dev/sdb node1
ceph-deploy osd create --data /dev/sdb node2
ceph-deploy osd create --data /dev/sdb node3

# Deploy admin keys
ceph-deploy admin node1 node2 node3

Ceph Configuration File (ceph.conf)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[global]
fsid = {cluster-id}
mon initial members = node1, node2, node3
mon host = 10.0.0.1, 10.0.0.2, 10.0.0.3
auth cluster required = cephx
auth service required = cephx
auth client required = cephx
osd journal size = 10240
osd pool default pg num = 128
osd pool default pgp num = 128
osd pool default size = 3
osd pool default min size = 2
public network = 10.0.0.0/24
cluster network = 10.0.1.0/24

[mon.node1]
host = node1
mon addr = 10.0.0.1:6789

[mon.node2]
host = node2
mon addr = 10.0.0.2:6789

[mon.node3]
host = node3
mon addr = 10.0.0.3:6789

[osd]
osd data = /var/lib/ceph/osd/ceph-$id
osd journal = /var/lib/ceph/osd/ceph-$id/journal
osd mkfs type = xfs
osd mkfs options xfs = -f -i size=2048

Ceph Object Storage (RADOS Gateway)

RGW Configuration

1
2
3
4
5
6
7
# Create RGW instance
ceph-deploy rgw create node1

# Or manually configure
sudo ceph-deploy --overwrite-conf config push node1
sudo systemctl start ceph-radosgw@rgw.node1
sudo systemctl enable ceph-radosgw@rgw.node1

Python Client for RGW

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import boto3
from botocore.client import Config

class CephRadosGateway:
def __init__(self, endpoint_url, access_key, secret_key):
self.s3_client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=Config(signature_version='s3v4')
)

def create_bucket(self, bucket_name):
"""Create bucket in Ceph RGW"""
try:
self.s3_client.create_bucket(Bucket=bucket_name)
print(f"Bucket {bucket_name} created")
except Exception as e:
print(f"Error: {e}")

def upload_object(self, bucket_name, object_key, data):
"""Upload object"""
self.s3_client.put_object(
Bucket=bucket_name,
Key=object_key,
Body=data
)
print(f"Uploaded {object_key}")

def list_objects(self, bucket_name):
"""List objects in bucket"""
response = self.s3_client.list_objects_v2(Bucket=bucket_name)
return response.get('Contents', [])

# Usage
rgw = CephRadosGateway(
endpoint_url='http://rgw-node1:7480',
access_key='your-access-key',
secret_key='your-secret-key'
)
rgw.create_bucket('my-bucket')
rgw.upload_object('my-bucket', 'test.txt', b'Hello Ceph')

Ceph Block Storage (RBD)

RBD Operations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Create pool for RBD
ceph osd pool create rbd 128 128

# Initialize pool for RBD
rbd pool init rbd

# Create RBD image
rbd create --size 10G rbd/myimage

# Map image to kernel
sudo rbd map rbd/myimage

# Format and mount
sudo mkfs.ext4 /dev/rbd0
sudo mount /dev/rbd0 /mnt/rbd

# Create snapshot
rbd snap create rbd/myimage@snapshot1

# Clone from snapshot
rbd snap protect rbd/myimage@snapshot1
rbd clone rbd/myimage@snapshot1 rbd/myclone

Python RBD Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import rbd
import rados

class CephRBDManager:
def __init__(self, cluster_name='ceph', user_name='client.admin', conf_file='/etc/ceph/ceph.conf'):
self.cluster = rados.Rados(
name=cluster_name,
clustername=cluster_name,
conffile=conf_file,
name=user_name
)
self.cluster.connect()
self.ioctx = None

def open_pool(self, pool_name):
"""Open a pool"""
self.ioctx = self.cluster.open_ioctx(pool_name)

def create_image(self, image_name, size_mb):
"""Create RBD image"""
rbd_inst = rbd.RBD()
rbd_inst.create(self.ioctx, image_name, size_mb * 1024 * 1024)
print(f"Created image: {image_name} ({size_mb}MB)")

def list_images(self):
"""List all images in pool"""
rbd_inst = rbd.RBD()
return rbd_inst.list(self.ioctx)

def create_snapshot(self, image_name, snapshot_name):
"""Create snapshot"""
image = rbd.Image(self.ioctx, image_name)
image.create_snapshot(snapshot_name)
print(f"Created snapshot: {snapshot_name}")
image.close()

def close(self):
"""Close connections"""
if self.ioctx:
self.ioctx.close()
self.cluster.shutdown()

# Usage
rbd_mgr = CephRBDManager()
rbd_mgr.open_pool('rbd')
rbd_mgr.create_image('myimage', 10240) # 10GB
images = rbd_mgr.list_images()
rbd_mgr.create_snapshot('myimage', 'snap1')
rbd_mgr.close()

Ceph Performance Tuning

OSD Configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[osd]
# Increase OSD memory
osd memory target = 4294967296 # 4GB

# Optimize journal
osd journal size = 10240 # 10GB

# Network optimization
ms bind port min = 6800
ms bind port max = 7300

# Recovery settings
osd recovery max active = 3
osd recovery threads = 1
osd max backfills = 1

CRUSH Map Optimization

CRUSH (Controlled Replication Under Scalable Hashing) algorithm determines data placement:

1
2
3
4
5
6
7
8
9
# Get current CRUSH map
ceph osd getcrushmap -o crushmap.bin
crushtool -d crushmap.bin -o crushmap.txt

# Edit crushmap.txt to optimize placement

# Compile and set
crushtool -c crushmap.txt -o crushmap-new.bin
ceph osd setcrushmap -i crushmap-new.bin

Data Consistency and Replication Strategies

Consistency Models

Strong Consistency

All reads return the most recent write. Achieved through:

  • Synchronous replication
  • Quorum-based reads/writes
  • Two-phase commit protocols

Trade-offs:

  • Higher latency (wait for all replicas)
  • Lower availability (fails if quorum unavailable)
  • Higher cost (more network round-trips)

Eventual Consistency

System will become consistent over time if no new updates. Characteristics:

  • Reads may return stale data temporarily
  • All replicas converge to same state eventually
  • High availability and performance

Use cases:

  • Social media feeds
  • DNS systems
  • CDN content distribution

Causal Consistency

Preserves causal relationships between operations. If operation A causally precedes operation B, all nodes see A before B.

Read-Your-Writes Consistency

User always sees their own writes, even if others see stale data.

Replication Strategies

Master-Slave Replication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Write Request


┌─────────┐
│ Master │ ────┐
└────┬────┘ │ Sync/Async
│ │
│ ┌───▼───┐
│ │ Slave 1│
│ └───────┘

└──────┐

┌────▼────┐
│ Slave 2 │
└─────────┘

Synchronous Replication:

  • Write completes only after all replicas acknowledge
  • Strong consistency guaranteed
  • Higher latency, lower throughput

Asynchronous Replication:

  • Write completes after master acknowledges
  • Better performance
  • Risk of data loss if master fails

Multi-Master Replication

Multiple nodes can accept writes:

1
2
3
4
5
6
7
8
9
Write Request 1 ──┐
├──► Node 1 ──┐
Write Request 2 ──┤ │
│ ├──► Conflict Resolution
Write Request 3 ──┘ │

Node 2 ──┐
│ │
Node 3 ──┘

Challenges:

  • Conflict resolution required
  • More complex consistency models
  • Vector clocks or timestamps for ordering

Quorum-Based Replication

Requires majority of replicas to agree:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class QuorumReplication:
def __init__(self, replicas, read_quorum=None, write_quorum=None):
self.replicas = replicas
self.read_quorum = read_quorum or (len(replicas) // 2 + 1)
self.write_quorum = write_quorum or (len(replicas) // 2 + 1)

def write(self, key, value):
"""Write with quorum"""
responses = []
for replica in self.replicas:
try:
result = replica.write(key, value)
responses.append(result)
if len(responses) >= self.write_quorum:
return True
except Exception as e:
print(f"Replica {replica} failed: {e}")

return len(responses) >= self.write_quorum

def read(self, key):
"""Read with quorum"""
responses = []
for replica in self.replicas:
try:
value = replica.read(key)
responses.append(value)
if len(responses) >= self.read_quorum:
# Return most recent value (by timestamp)
return max(responses, key=lambda x: x.timestamp)
except Exception as e:
print(f"Replica {replica} failed: {e}")

return None

Vector Clocks

Vector clocks track causal relationships in distributed systems:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class VectorClock:
def __init__(self, node_id, num_nodes):
self.node_id = node_id
self.clock = [0] * num_nodes

def tick(self):
"""Increment own clock"""
self.clock[self.node_id] += 1

def update(self, other_clock):
"""Update with received clock"""
for i in range(len(self.clock)):
self.clock[i] = max(self.clock[i], other_clock[i])
self.tick()

def happens_before(self, other):
"""Check if this event happens before other"""
less_than = False
for i in range(len(self.clock)):
if self.clock[i] > other.clock[i]:
return False
if self.clock[i] < other.clock[i]:
less_than = True
return less_than

def concurrent(self, other):
"""Check if events are concurrent"""
return not self.happens_before(other) and not other.happens_before(self)

Anti-Entropy and Merkle Trees

Merkle trees efficiently detect inconsistencies:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import hashlib

class MerkleTree:
def __init__(self, data):
self.data = data
self.root = self.build_tree(data)

def hash_data(self, data):
"""Hash data"""
return hashlib.sha256(str(data).encode()).hexdigest()

def build_tree(self, data):
"""Build Merkle tree"""
if len(data) == 1:
return self.hash_data(data[0])

mid = len(data) // 2
left = self.build_tree(data[:mid])
right = self.build_tree(data[mid:])
return self.hash_data(left + right)

def get_root_hash(self):
"""Get root hash"""
return self.root

def compare_trees(self, other_tree):
"""Compare with another tree"""
return self.root == other_tree.root

Backup and Disaster Recovery

Backup Strategies

Full Backup

Complete copy of all data at a point in time.

Advantages:

  • Simple restore process
  • Complete data recovery
  • Independent of other backups

Disadvantages:

  • Time-consuming
  • High storage requirements
  • Network bandwidth intensive

Incremental Backup

Only backs up changes since last backup (full or incremental).

Advantages:

  • Faster backup process
  • Lower storage requirements
  • Less network bandwidth

Disadvantages:

  • Slower restore (requires full + all incrementals)
  • More complex restore process
  • Dependency chain

Differential Backup

Backs up changes since last full backup.

Advantages:

  • Faster restore than incremental (only full + latest differential)
  • Moderate storage requirements

Disadvantages:

  • Slower than incremental backups
  • Storage grows over time

Backup Implementation

S3 Lifecycle Policies

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import boto3

def setup_backup_lifecycle(bucket_name):
"""Setup S3 lifecycle policy for automated backups"""
s3_client = boto3.client('s3')

lifecycle_config = {
'Rules': [
{
'Id': 'DailyBackups',
'Status': 'Enabled',
'Prefix': 'backups/daily/',
'Transitions': [
{
'Days': 30,
'StorageClass': 'STANDARD_IA'
},
{
'Days': 90,
'StorageClass': 'GLACIER'
}
],
'Expiration': {
'Days': 365
}
},
{
'Id': 'ArchiveOldData',
'Status': 'Enabled',
'Prefix': 'data/',
'Transitions': [
{
'Days': 180,
'StorageClass': 'GLACIER'
}
]
}
]
}

s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle_config
)
print(f"Lifecycle policy configured for {bucket_name}")

Automated Backup Script

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import os
import tarfile
import boto3
from datetime import datetime

class BackupManager:
def __init__(self, s3_bucket, backup_prefix='backups'):
self.s3_client = boto3.client('s3')
self.bucket = s3_bucket
self.prefix = backup_prefix

def create_backup(self, source_paths, backup_name=None):
"""Create compressed backup"""
if backup_name is None:
backup_name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.tar.gz"

backup_path = f"/tmp/{backup_name}"

# Create tar archive
with tarfile.open(backup_path, 'w:gz') as tar:
for path in source_paths:
if os.path.isfile(path):
tar.add(path, arcname=os.path.basename(path))
elif os.path.isdir(path):
tar.add(path, arcname=os.path.basename(path))

# Upload to S3
s3_key = f"{self.prefix}/{backup_name}"
self.s3_client.upload_file(
backup_path,
self.bucket,
s3_key,
ExtraArgs={'StorageClass': 'STANDARD_IA'}
)

# Cleanup
os.remove(backup_path)

print(f"Backup created: {s3_key}")
return s3_key

def restore_backup(self, s3_key, restore_path):
"""Restore backup from S3"""
backup_file = f"/tmp/{os.path.basename(s3_key)}"

# Download from S3
self.s3_client.download_file(self.bucket, s3_key, backup_file)

# Extract
with tarfile.open(backup_file, 'r:gz') as tar:
tar.extractall(restore_path)

# Cleanup
os.remove(backup_file)

print(f"Backup restored to: {restore_path}")

# Usage
backup_mgr = BackupManager('my-backup-bucket')
backup_mgr.create_backup(['/var/www', '/etc/nginx'])

Disaster Recovery Strategies

RTO and RPO

  • RTO (Recovery Time Objective): Maximum acceptable downtime
  • RPO (Recovery Point Objective): Maximum acceptable data loss
Strategy RTO RPO Cost
Hot standby Minutes Seconds Very High
Warm standby Hours Minutes High
Cold standby Days Hours Moderate
Backup only Days Days Low

Multi-Region Replication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import boto3

class MultiRegionReplication:
def __init__(self, primary_region, replica_regions):
self.primary_region = primary_region
self.replica_regions = replica_regions
self.primary_s3 = boto3.client('s3', region_name=primary_region)
self.replica_clients = {
region: boto3.client('s3', region_name=region)
for region in replica_regions
}

def setup_cross_region_replication(self, bucket_name):
"""Setup cross-region replication"""
for region in self.replica_regions:
replica_bucket = f"{bucket_name}-{region}"

# Create replica bucket
self.replica_clients[region].create_bucket(
Bucket=replica_bucket,
CreateBucketConfiguration={'LocationConstraint': region}
)

# Enable versioning
self.primary_s3.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)

self.replica_clients[region].put_bucket_versioning(
Bucket=replica_bucket,
VersioningConfiguration={'Status': 'Enabled'}
)

# Configure replication
replication_config = {
'Role': 'arn:aws:iam::account:role/replication-role',
'Rules': [{
'Status': 'Enabled',
'Priority': 1,
'Destination': {
'Bucket': f'arn:aws:s3:::{replica_bucket}',
'StorageClass': 'STANDARD'
}
}]
}

self.primary_s3.put_bucket_replication(
Bucket=bucket_name,
ReplicationConfiguration=replication_config
)

print(f"Replication configured: {bucket_name} -> {replica_bucket}")

# Usage
replication = MultiRegionReplication(
primary_region='us-east-1',
replica_regions=['us-west-2', 'eu-west-1']
)
replication.setup_cross_region_replication('my-data-bucket')

Storage Performance Optimization

Performance Metrics

Key metrics for storage systems:

  • IOPS (Input/Output Operations Per Second): Number of read/write operations per second
  • Throughput: Data transfer rate (MB/s, GB/s)
  • Latency: Time to complete a single operation (ms)
  • Bandwidth: Maximum data transfer capacity

Optimization Techniques

Caching Strategies

Read Cache:

  • Frequently accessed data stored in faster storage (RAM, SSD)
  • Reduces latency for hot data
  • LRU (Least Recently Used) eviction policy

Write Cache:

  • Buffer writes before committing to persistent storage
  • Improves write throughput
  • Risk of data loss if cache fails
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from collections import OrderedDict

class LRUCache:
def __init__(self, capacity):
self.cache = OrderedDict()
self.capacity = capacity

def get(self, key):
if key not in self.cache:
return None
# Move to end (most recently used)
self.cache.move_to_end(key)
return self.cache[key]

def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
else:
if len(self.cache) >= self.capacity:
# Remove least recently used
self.cache.popitem(last=False)
self.cache[key] = value

Data Locality

Place data close to computation:

  • HDFS: Data stored on same nodes as compute tasks
  • S3 Select: Push-down predicates to storage layer
  • Edge caching: CDN placement near users

Compression

Reduce storage and transfer costs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import gzip
import bz2
import lzma

class CompressionManager:
@staticmethod
def compress_gzip(data):
"""Compress with gzip"""
return gzip.compress(data)

@staticmethod
def decompress_gzip(compressed_data):
"""Decompress gzip"""
return gzip.decompress(compressed_data)

@staticmethod
def compress_bz2(data):
"""Compress with bzip2"""
return bz2.compress(data)

@staticmethod
def compress_lzma(data):
"""Compress with LZMA"""
return lzma.compress(data)

# Compression ratios (typical)
# gzip: 2-4x
# bzip2: 3-5x
# lzma: 4-6x

Parallel I/O

Distribute I/O across multiple streams:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import concurrent.futures
import boto3

class ParallelS3Upload:
def __init__(self, bucket_name, max_workers=10):
self.s3_client = boto3.client('s3')
self.bucket = bucket_name
self.max_workers = max_workers

def upload_file_part(self, file_path, object_key, part_number, part_size, offset):
"""Upload a single part"""
with open(file_path, 'rb') as f:
f.seek(offset)
data = f.read(part_size)

response = self.s3_client.upload_part(
Bucket=self.bucket,
Key=object_key,
PartNumber=part_number,
UploadId=self.upload_id,
Body=data
)
return {'PartNumber': part_number, 'ETag': response['ETag']}

def upload_large_file(self, file_path, object_key, part_size_mb=5):
"""Upload large file using multipart upload"""
file_size = os.path.getsize(file_path)
part_size = part_size_mb * 1024 * 1024
num_parts = (file_size + part_size - 1) // part_size

# Initiate multipart upload
response = self.s3_client.create_multipart_upload(
Bucket=self.bucket,
Key=object_key
)
self.upload_id = response['UploadId']

# Upload parts in parallel
parts = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for i in range(num_parts):
offset = i * part_size
future = executor.submit(
self.upload_file_part,
file_path, object_key, i + 1, part_size, offset
)
futures.append(future)

for future in concurrent.futures.as_completed(futures):
parts.append(future.result())

# Complete multipart upload
parts.sort(key=lambda x: x['PartNumber'])
self.s3_client.complete_multipart_upload(
Bucket=self.bucket,
Key=object_key,
UploadId=self.upload_id,
MultipartUpload={'Parts': parts}
)

print(f"Uploaded {object_key} ({file_size} bytes)")

Benchmarking Storage Performance

S3 Performance Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import time
import boto3
import statistics

class S3PerformanceBenchmark:
def __init__(self, bucket_name):
self.s3_client = boto3.client('s3')
self.bucket = bucket_name

def benchmark_write(self, object_key, data_size_kb, num_iterations=100):
"""Benchmark write performance"""
data = b'x' * (data_size_kb * 1024)
latencies = []

for i in range(num_iterations):
key = f"{object_key}_{i}"
start = time.time()
self.s3_client.put_object(Bucket=self.bucket, Key=key, Body=data)
latency = (time.time() - start) * 1000 # ms
latencies.append(latency)

return {
'avg_latency_ms': statistics.mean(latencies),
'p50_latency_ms': statistics.median(latencies),
'p99_latency_ms': statistics.quantiles(latencies, n=100)[98],
'throughput_mbps': (data_size_kb * num_iterations * 8) / (sum(latencies) / 1000)
}

def benchmark_read(self, object_key_prefix, num_iterations=100):
"""Benchmark read performance"""
latencies = []

for i in range(num_iterations):
key = f"{object_key_prefix}_{i}"
start = time.time()
response = self.s3_client.get_object(Bucket=self.bucket, Key=key)
data = response['Body'].read()
latency = (time.time() - start) * 1000 # ms
latencies.append(latency)

return {
'avg_latency_ms': statistics.mean(latencies),
'p50_latency_ms': statistics.median(latencies),
'p99_latency_ms': statistics.quantiles(latencies, n=100)[98]
}

# Usage
benchmark = S3PerformanceBenchmark('test-bucket')
write_stats = benchmark.benchmark_write('test-object', data_size_kb=1024)
read_stats = benchmark.benchmark_read('test-object')
print(f"Write: {write_stats}")
print(f"Read: {read_stats}")

Cost Optimization Strategies

Storage Cost Factors

  1. Storage volume: Amount of data stored
  2. Storage class: Performance tier (Standard, IA, Glacier)
  3. Request costs: PUT, GET, LIST operations
  4. Data transfer: Egress bandwidth
  5. Lifecycle management: Automated tiering

Optimization Techniques

Lifecycle Management

Automatically move data to cheaper storage classes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def optimize_storage_lifecycle(bucket_name):
"""Setup cost-optimized lifecycle policy"""
s3_client = boto3.client('s3')

lifecycle_config = {
'Rules': [
{
'Id': 'MoveToIA',
'Status': 'Enabled',
'Prefix': 'data/',
'Transitions': [
{
'Days': 30,
'StorageClass': 'STANDARD_IA' # 50% cheaper
}
]
},
{
'Id': 'MoveToGlacier',
'Status': 'Enabled',
'Prefix': 'archive/',
'Transitions': [
{
'Days': 90,
'StorageClass': 'GLACIER' # 80% cheaper
}
]
},
{
'Id': 'DeleteOldVersions',
'Status': 'Enabled',
'NoncurrentVersionTransitions': [
{
'NoncurrentDays': 30,
'StorageClass': 'GLACIER'
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 365
}
}
]
}

s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle_config
)

Deduplication

Eliminate duplicate data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import hashlib

class DeduplicationManager:
def __init__(self, storage_backend):
self.storage = storage_backend
self.content_hash_index = {} # hash -> object_key

def store_with_dedup(self, data, object_key):
"""Store object with deduplication"""
content_hash = hashlib.sha256(data).hexdigest()

if content_hash in self.content_hash_index:
# Create reference instead of storing duplicate
existing_key = self.content_hash_index[content_hash]
self.storage.create_reference(object_key, existing_key)
print(f"Deduplicated: {object_key} -> {existing_key}")
else:
# Store new object
self.storage.put_object(object_key, data)
self.content_hash_index[content_hash] = object_key
print(f"Stored: {object_key}")

Compression Analysis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def analyze_compression_savings(file_paths):
"""Analyze potential savings from compression"""
total_original = 0
total_compressed = 0

for file_path in file_paths:
with open(file_path, 'rb') as f:
original_data = f.read()
compressed_data = gzip.compress(original_data)

original_size = len(original_data)
compressed_size = len(compressed_data)

total_original += original_size
total_compressed += compressed_size

savings = (1 - compressed_size / original_size) * 100
print(f"{file_path}: {original_size} -> {compressed_size} bytes ({savings:.1f}% savings)")

total_savings = (1 - total_compressed / total_original) * 100
print(f"\nTotal: {total_original} -> {total_compressed} bytes ({total_savings:.1f}% savings)")

Cost Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class StorageCostCalculator:
def __init__(self):
# AWS S3 pricing (example, adjust for your region)
self.pricing = {
'STANDARD': {'storage': 0.023, 'requests': 0.0004}, # per GB/month, per 1000 requests
'STANDARD_IA': {'storage': 0.0125, 'requests': 0.001},
'GLACIER': {'storage': 0.004, 'retrieval': 0.01}, # per GB retrieval
'data_transfer_out': 0.09 # per GB
}

def calculate_monthly_cost(self, storage_gb, storage_class='STANDARD',
requests=0, data_transfer_gb=0):
"""Calculate monthly storage cost"""
storage_cost = storage_gb * self.pricing[storage_class]['storage']
request_cost = (requests / 1000) * self.pricing[storage_class]['requests']
transfer_cost = data_transfer_gb * self.pricing['data_transfer_out']

total = storage_cost + request_cost + transfer_cost

return {
'storage_cost': storage_cost,
'request_cost': request_cost,
'transfer_cost': transfer_cost,
'total_cost': total
}

def compare_storage_classes(self, storage_gb, requests=0):
"""Compare costs across storage classes"""
results = {}
for storage_class in ['STANDARD', 'STANDARD_IA', 'GLACIER']:
results[storage_class] = self.calculate_monthly_cost(
storage_gb, storage_class, requests
)
return results

# Usage
calculator = StorageCostCalculator()
costs = calculator.compare_storage_classes(storage_gb=1000, requests=100000)
for class_name, cost_info in costs.items():
print(f"{class_name}:${cost_info['total_cost']:.2f}/month")

Case Studies

Case Study 1: E-Commerce Platform Migration

Scenario: A large e-commerce platform needs to migrate from on-premises storage to cloud object storage.

Requirements:

  • 50TB of product images and videos
  • 10 million objects
  • Global CDN integration
  • 99.99% availability
  • Cost optimization

Solution:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class ECommerceStorageMigration:
def __init__(self, source_path, s3_bucket, cdn_distribution_id):
self.source_path = source_path
self.s3_bucket = s3_bucket
self.cdn_id = cdn_distribution_id
self.s3_client = boto3.client('s3')
self.cloudfront = boto3.client('cloudfront')

def migrate_with_optimization(self):
"""Migrate with storage class optimization"""
# Images: Standard (frequently accessed)
# Videos: Standard-IA (less frequent)
# Old products: Glacier (archive)

image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.webp']
video_extensions = ['.mp4', '.webm', '.mov']

for root, dirs, files in os.walk(self.source_path):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, self.source_path)

# Determine storage class
ext = os.path.splitext(file)[1].lower()
if ext in image_extensions:
storage_class = 'STANDARD'
prefix = 'images/'
elif ext in video_extensions:
storage_class = 'STANDARD_IA'
prefix = 'videos/'
else:
storage_class = 'STANDARD'
prefix = 'assets/'

# Upload with appropriate storage class
self.s3_client.upload_file(
file_path,
self.s3_bucket,
f"{prefix}{relative_path}",
ExtraArgs={'StorageClass': storage_class}
)

# Setup lifecycle policies
self.setup_lifecycle_policies()

# Invalidate CDN cache
self.cloudfront.create_invalidation(
DistributionId=self.cdn_id,
InvalidationBatch={
'Paths': {'Quantity': 1, 'Items': ['/*']},
'CallerReference': str(time.time())
}
)

def setup_lifecycle_policies(self):
"""Setup automated lifecycle management"""
# Move old images to IA after 90 days
# Archive videos after 180 days
lifecycle_config = {
'Rules': [
{
'Id': 'ImageLifecycle',
'Status': 'Enabled',
'Prefix': 'images/',
'Transitions': [
{'Days': 90, 'StorageClass': 'STANDARD_IA'},
{'Days': 365, 'StorageClass': 'GLACIER'}
]
},
{
'Id': 'VideoLifecycle',
'Status': 'Enabled',
'Prefix': 'videos/',
'Transitions': [
{'Days': 180, 'StorageClass': 'GLACIER'}
]
}
]
}

self.s3_client.put_bucket_lifecycle_configuration(
Bucket=self.s3_bucket,
LifecycleConfiguration=lifecycle_config
)

# Results:
# - Migration completed in 2 weeks
# - 40% cost reduction vs on-premises
# - 99.99% availability achieved
# - CDN integration reduced latency by 60%

Case Study 2: Big Data Analytics Platform

Scenario: A company needs distributed storage for Hadoop-based analytics workloads processing 100TB daily.

Requirements:

  • HDFS cluster for Hadoop workloads
  • High throughput for MapReduce jobs
  • Fault tolerance (3x replication)
  • Integration with S3 for archival

Solution:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class BigDataStorageArchitecture:
def __init__(self):
self.hdfs_cluster = None
self.s3_archive_bucket = 'analytics-archive'

def setup_hdfs_cluster(self, namenodes, datanodes):
"""Setup HDFS cluster configuration"""
# NameNode HA configuration
hdfs_config = {
'dfs.nameservices': 'analytics-cluster',
'dfs.ha.namenodes.analytics-cluster': 'nn1,nn2',
'dfs.replication': 3,
'dfs.blocksize': 268435456, # 256MB for large files
'dfs.namenode.handler.count': 200, # High concurrency
'dfs.datanode.max.xcievers': 4096
}

# DataNode optimization
datanode_config = {
'dfs.datanode.data.dir': '/data1,/data2,/data3', # Multiple disks
'dfs.datanode.balance.bandwidthPerSec': 104857600 # 100MB/s
}

return {**hdfs_config, **datanode_config}

def archive_to_s3(self, hdfs_path, retention_days=90):
"""Archive old data from HDFS to S3"""
# DistCp for efficient transfer
distcp_command = f"""
hadoop distcp \
-D fs.s3a.access.key=$AWS_ACCESS_KEY \
-D fs.s3a.secret.key=$AWS_SECRET_KEY \
hdfs://namenode:9000{hdfs_path} \
s3a://{self.s3_archive_bucket}/archive/
"""

# After successful copy, delete from HDFS
# hdfs dfs -rm -r {hdfs_path}

return distcp_command

# Results:
# - 3x replication provides 99.9% durability
# - 256MB blocks optimized for large files
# - S3 archival reduces HDFS storage by 70%
# - DistCp enables efficient bulk transfers

Case Study 3: Multi-Region Content Delivery

Scenario: A media streaming service needs to serve content globally with low latency and high availability.

Requirements:

  • Content stored in multiple regions
  • Automatic failover
  • Low latency for global users
  • Cost-effective storage

Solution:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class MultiRegionContentDelivery:
def __init__(self, primary_region='us-east-1'):
self.primary_region = primary_region
self.replica_regions = ['eu-west-1', 'ap-southeast-1', 'us-west-2']
self.regions_config = {}

for region in [primary_region] + self.replica_regions:
self.regions_config[region] = boto3.client('s3', region_name=region)

def setup_cross_region_replication(self, bucket_name):
"""Setup automatic cross-region replication"""
# Create buckets in all regions
for region, client in self.regions_config.items():
try:
if region == self.primary_region:
client.create_bucket(Bucket=bucket_name)
else:
client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
except Exception as e:
print(f"Bucket may already exist in {region}: {e}")

# Enable versioning
for client in self.regions_config.values():
client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)

# Setup replication from primary to replicas
replication_role = 'arn:aws:iam::account:role/replication-role'

rules = []
for i, region in enumerate(self.replica_regions):
rules.append({
'Id': f'ReplicateTo{region}',
'Status': 'Enabled',
'Priority': i + 1,
'Destination': {
'Bucket': f'arn:aws:s3:::{bucket_name}',
'StorageClass': 'STANDARD'
}
})

self.regions_config[self.primary_region].put_bucket_replication(
Bucket=bucket_name,
ReplicationConfiguration={
'Role': replication_role,
'Rules': rules
}
)

def get_content_url(self, object_key, user_region):
"""Get optimal content URL based on user region"""
region_mapping = {
'US': 'us-east-1',
'EU': 'eu-west-1',
'AP': 'ap-southeast-1',
'US-WEST': 'us-west-2'
}

optimal_region = region_mapping.get(user_region, self.primary_region)
bucket_name = 'content-bucket'

# Generate CloudFront URL or direct S3 URL
url = f"https://{bucket_name}.s3.{optimal_region}.amazonaws.com/{object_key}"
return url

# Results:
# - Average latency reduced from 200ms to 50ms
# - 99.99% availability with automatic failover
# - Cross-region replication ensures data durability
# - Regional caching reduces bandwidth costs by 40%

Q&A Section

Q1: What's the difference between object storage and block storage?

A: Object storage stores data as objects with metadata in a flat namespace, accessed via REST APIs. Block storage provides raw block-level access, typically used by filesystems. Object storage excels at scalability and web applications, while block storage is better for databases and low-latency applications.

Q2: How does HDFS handle node failures?

A: HDFS replicates each block across multiple DataNodes (default: 3 replicas). If a DataNode fails, the NameNode detects the failure through heartbeat mechanisms and initiates replication to restore the replication factor. The system continues operating with reduced redundancy until replication completes.

Q3: What is eventual consistency, and when is it acceptable?

A: Eventual consistency means the system will become consistent over time if no new updates occur. It's acceptable for:

  • Social media feeds (slight delays acceptable)
  • DNS systems (propagation delays expected)
  • Analytics data (not real-time critical)
  • Content delivery (caching acceptable)

It's NOT acceptable for:

  • Financial transactions
  • Inventory management
  • Medical records
  • Real-time collaborative editing

Q4: How do I choose between S3 storage classes?

A: Choose based on access patterns:

  • Standard: Frequently accessed data (< 30 days)
  • Standard-IA: Infrequently accessed (monthly or less)
  • Glacier: Archive data (quarterly or less)
  • Intelligent-Tiering: Unknown or changing access patterns

Consider retrieval costs: Glacier has low storage costs but charges for retrieval.

Q5: What is a quorum in distributed systems?

A: A quorum is the minimum number of nodes that must agree for an operation to succeed. For N replicas:

  • Read quorum: (N/2) + 1
  • Write quorum: (N/2) + 1

This ensures at least one node has the latest data and prevents split-brain scenarios.

Q6: How does Ceph ensure data consistency?

A: Ceph uses: 1. CRUSH algorithm: Deterministic data placement 2. PG (Placement Groups): Logical grouping of objects 3. OSD maps: Track cluster state and data location 4. Quorum-based operations: Monitors maintain consensus 5. Replication: Multiple OSDs store each object

The system maintains consistency through these mechanisms while allowing tunable consistency levels.

Q7: What are the trade-offs of synchronous vs asynchronous replication?

A:

Synchronous:

  • ✅ Strong consistency
  • ✅ No data loss risk
  • ❌ Higher latency
  • ❌ Lower throughput
  • ❌ Fails if replica unavailable

Asynchronous:

  • ✅ Lower latency
  • ✅ Higher throughput
  • ✅ Better availability
  • ❌ Risk of data loss
  • ❌ Eventual consistency

Choose based on RPO requirements: zero data loss needs synchronous replication.

Q8: How can I optimize storage costs for a large-scale application?

A: Strategies include: 1. Lifecycle policies: Automatically move to cheaper storage classes 2. Compression: Reduce storage requirements (2-6x typically) 3. Deduplication: Eliminate duplicate data 4. Right-sizing: Choose appropriate storage classes 5. Monitoring: Track usage and optimize based on patterns 6. Regional optimization: Store data in cheaper regions when possible

Q9: What is the CAP theorem, and why can't I have all three properties?

A: CAP theorem states you can guarantee at most two of: Consistency, Availability, Partition tolerance. Partition tolerance is mandatory in distributed systems (networks fail), forcing a choice between C and A. This is a fundamental trade-off: strong consistency requires coordination (reducing availability), while high availability during partitions allows inconsistency.

Q10: How do I implement disaster recovery for cloud storage?

A: Implement a comprehensive DR strategy: 1. Backup strategy: Regular automated backups (full + incremental) 2. Multi-region replication: Geographic redundancy 3. Versioning: Enable object versioning for point-in-time recovery 4. Lifecycle policies: Automate backup retention and archival 5. Testing: Regularly test restore procedures 6. Documentation: Maintain runbooks for recovery scenarios 7. Monitoring: Alert on backup failures and replication lag

Define RTO and RPO requirements to guide your strategy.

Summary Cheat Sheet

Storage Types Comparison

Type Access Use Case Example
Block Block-level I/O Databases, VMs AWS EBS, Ceph RBD
File File system APIs Shared filesystems AWS EFS, NFS
Object REST APIs Web apps, archives S3, OSS, Ceph RGW

Consistency Models

Model Guarantee Use Case
Strong All reads see latest write Financial systems
Eventual Consistent over time Social media, CDN
Causal Preserves causality Collaborative systems
Read-your-writes User sees own writes Web applications

Replication Strategies

Strategy Consistency Performance Use Case
Master-Slave Sync Strong Lower Critical data
Master-Slave Async Eventual Higher High throughput
Multi-Master Eventual Highest Global systems
Quorum-based Tunable Moderate Distributed systems

Storage Optimization Checklist

Key Metrics to Monitor

  • Durability: 99.999999999% (11 nines) for object storage
  • Availability: 99.99% (4 nines) for production systems
  • Latency: P50, P99, P999 percentiles
  • Throughput: IOPS and bandwidth
  • Cost: Storage, requests, transfer costs
  • Replication lag: For async replication
  • Backup success rate: For DR readiness

Common Patterns

Write-Through Cache: Write to cache and storage simultaneously Write-Back Cache: Write to cache, flush to storage later Read-Through Cache: Check cache, read from storage if miss Write-Around Cache: Write directly to storage, invalidate cache

Sharding: Distribute data across multiple storage nodes Partitioning: Divide data by key ranges or hash Replication: Copy data to multiple nodes for redundancy


This comprehensive guide covers the essential aspects of cloud storage systems and distributed architecture. From theoretical foundations to practical implementations, these concepts form the backbone of modern cloud infrastructure. Whether you're designing a new system or optimizing an existing one, understanding these principles is crucial for building scalable, reliable, and cost-effective storage solutions.

  • Post title:Cloud Computing (3): Storage Systems and Distributed Architecture
  • Post author:Chen Kai
  • Create time:2023-01-25 00:00:00
  • Post link:https://www.chenk.top/en/cloud-computing-storage-distributed-systems/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.
 Comments