Newer
Older
noctua / PHASE2_README.md
@agalyaramadoss agalyaramadoss on 13 Feb 15 KB first commit

Cube Database - Phase 2: Consistency & Replication ✅

Overview

Phase 2 adds distributed database capabilities with tunable consistency levels, read repair, and hinted handoff - making Cube truly Cassandra-like!

New Features

1. Tunable Consistency Levels

Control the trade-off between consistency, availability, and performance:

  • ANY - Fastest writes, weakest consistency (accepts hints)
  • ONE - One replica must respond
  • TWO - Two replicas must respond
  • THREE - Three replicas must respond
  • QUORUM - Majority of replicas ((RF/2) + 1)
  • ALL - All replicas must respond (strongest consistency)
  • LOCAL_ONE - One replica in local datacenter
  • LOCAL_QUORUM - Quorum in local datacenter

2. Read Repair

Automatically detects and repairs inconsistencies during reads:

  • Compares responses from all replicas
  • Chooses the most recent value (highest timestamp)
  • Asynchronously propagates correct value to stale replicas
  • Configurable read repair probability (0-100%)

3. Hinted Handoff

Handles temporarily unavailable nodes:

  • Stores writes as "hints" when target node is down
  • Automatically replays hints when node recovers
  • Configurable hint window and max hints per node
  • Persists hints to disk for durability

4. Replication Strategies

SimpleReplicationStrategy:

  • Places replicas on consecutive nodes
  • Good for single-datacenter deployments
  • Uses consistent hashing for key distribution

NetworkTopologyStrategy:

  • Rack and datacenter aware
  • Distributes replicas across racks for fault tolerance
  • Supports multi-datacenter deployments
  • Configurable replication factor per DC

Architecture

┌─────────────────────────────────────────────────────────┐
│           Replication Coordinator                        │
├─────────────────────────────────────────────────────────┤
│                                                           │
│  Write Path:                                             │
│  ┌──────────┐                                            │
│  │  Client  │                                            │
│  └────┬─────┘                                            │
│       │ CL=QUORUM                                        │
│       ▼                                                  │
│  ┌──────────────┐                                        │
│  │ Coordinator  │                                        │
│  └───┬──┬───┬───┘                                        │
│      │  │   │ Write to RF=3 replicas                    │
│      ▼  ▼   ▼                                            │
│   Node1 Node2 Node3                                      │
│     ✓    ✓    ✗ (down)                                   │
│              │                                           │
│              ▼                                           │
│         [Hinted Handoff]                                 │
│         Store hint for Node3                             │
│                                                           │
│  Read Path with Read Repair:                            │
│  ┌──────────┐                                            │
│  │  Client  │                                            │
│  └────┬─────┘                                            │
│       │ CL=QUORUM                                        │
│       ▼                                                  │
│  ┌──────────────┐                                        │
│  │ Coordinator  │                                        │
│  └───┬──┬───┬───┘                                        │
│      │  │   │ Read from replicas                        │
│      ▼  ▼   ▼                                            │
│   Node1 Node2 Node3                                      │
│   v1,t1 v2,t2 v1,t1                                      │
│      │  │   │                                            │
│      └──┴───┘                                            │
│         │ Compare responses                              │
│         ▼                                                │
│    Choose v2 (newest)                                    │
│         │                                                │
│         ▼                                                │
│    [Read Repair]                                         │
│    Repair Node1 & Node3                                  │
│                                                           │
└─────────────────────────────────────────────────────────┘

Usage Examples

Consistency Levels

import com.cube.consistency.ConsistencyLevel;
import com.cube.replication.ReplicationCoordinator;

// Write with QUORUM (strong consistency)
ReplicationCoordinator.WriteResult result = coordinator.write(
    "user:123",
    "Alice".getBytes(),
    ConsistencyLevel.QUORUM,
    clusterNodes
);

if (result.isSuccess()) {
    System.out.println("Wrote to " + result.getSuccessfulWrites() + " replicas");
}

// Read with ONE (fast, eventual consistency)
ReplicationCoordinator.ReadResult readResult = coordinator.read(
    "user:123",
    ConsistencyLevel.ONE,
    clusterNodes
);

if (readResult.isSuccess()) {
    String value = new String(readResult.getValue());
    System.out.println("Read value: " + value);
}

// Write with ALL (maximum consistency)
coordinator.write(
    "important:data",
    "critical".getBytes(),
    ConsistencyLevel.ALL,
    clusterNodes
);

Hinted Handoff

import com.cube.replication.HintedHandoffManager;

// Initialize hinted handoff
HintedHandoffManager hintedHandoff = new HintedHandoffManager(
    "/var/lib/cube/hints",  // Hints directory
    10000,                   // Max hints per node
    3600000                  // 1 hour hint window
);

// Store hint for unavailable node
hintedHandoff.storeHint(
    "node-2",                    // Target node
    "user:123",                  // Key
    "Alice".getBytes()           // Value
);

// Replay hints when node recovers
hintedHandoff.replayHintsForNode("node-2", hint -> {
    // Send hint to node over network
    return sendToNode(hint.getTargetNodeId(), hint.getKey(), hint.getValue());
});

// Get hint statistics
int totalHints = hintedHandoff.getTotalHintCount();
int node2Hints = hintedHandoff.getHintCount("node-2");

Read Repair

import com.cube.replication.ReadRepairManager;
import com.cube.replication.ReadRepairManager.ReadResponse;

// Initialize read repair with 10% probability
ReadRepairManager readRepair = new ReadRepairManager(10);

// Collect responses from replicas
List<ReadResponse> responses = new ArrayList<>();
responses.add(new ReadResponse(node1, "key1", "old".getBytes(), 1000));
responses.add(new ReadResponse(node2, "key1", "new".getBytes(), 2000)); // Newer
responses.add(new ReadResponse(node3, "key1", "old".getBytes(), 1000));

// Perform read repair
ReadRepairManager.ReadRepairResult result = readRepair.performReadRepairBlocking(
    responses,
    (node, key, value, timestamp) -> {
        // Repair the node
        sendRepairToNode(node, key, value, timestamp);
        return true;
    }
);

// Check result
if (result.isRepairNeeded()) {
    System.out.println("Repaired " + result.getRepairedNodes() + " nodes");
}

byte[] canonicalValue = result.getCanonicalValue(); // "new"

Replication Strategies

Simple Strategy:

import com.cube.replication.SimpleReplicationStrategy;

ReplicationStrategy strategy = new SimpleReplicationStrategy();

List<ClusterNode> replicas = strategy.getReplicaNodes(
    "user:123",      // Key
    3,               // Replication factor
    allNodes         // Available nodes
);

System.out.println("Replicas: " + replicas);

Network Topology Strategy:

import com.cube.replication.NetworkTopologyReplicationStrategy;

// Configure replication per datacenter
Map<String, Integer> dcRF = new HashMap<>();
dcRF.put("us-east", 3);
dcRF.put("us-west", 2);
dcRF.put("eu-west", 2);

ReplicationStrategy strategy = new NetworkTopologyReplicationStrategy(dcRF);

List<ClusterNode> replicas = strategy.getReplicaNodes(
    "user:123",
    3,
    allNodes
);

// Will place 3 replicas in us-east, 2 in us-west, 2 in eu-west

Complete Example

import com.cube.cluster.ClusterNode;
import com.cube.consistency.ConsistencyLevel;
import com.cube.replication.*;
import com.cube.storage.LSMStorageEngine;

public class Phase2Example {
    public static void main(String[] args) throws Exception {
        // Initialize storage
        LSMStorageEngine storage = new LSMStorageEngine("/var/lib/cube/data");
        
        // Initialize components
        HintedHandoffManager hintedHandoff = new HintedHandoffManager(
            "/var/lib/cube/hints", 10000, 3600000);
        
        ReadRepairManager readRepair = new ReadRepairManager(10);
        
        ReplicationStrategy strategy = new SimpleReplicationStrategy();
        
        ReplicationCoordinator coordinator = new ReplicationCoordinator(
            storage,
            strategy,
            hintedHandoff,
            readRepair,
            3,      // RF=3
            5000,   // 5s write timeout
            3000    // 3s read timeout
        );
        
        // Define cluster
        List<ClusterNode> nodes = new ArrayList<>();
        nodes.add(new ClusterNode("node1", "10.0.0.1", 8080));
        nodes.add(new ClusterNode("node2", "10.0.0.2", 8080));
        nodes.add(new ClusterNode("node3", "10.0.0.3", 8080));
        
        // Strong consistency write
        ReplicationCoordinator.WriteResult writeResult = coordinator.write(
            "user:alice",
            "Alice Johnson".getBytes(),
            ConsistencyLevel.QUORUM,  // Wait for 2 of 3 replicas
            nodes
        );
        
        System.out.println("Write successful: " + writeResult.isSuccess());
        System.out.println("Replicas written: " + writeResult.getSuccessfulWrites());
        
        // Fast eventual consistency read
        ReplicationCoordinator.ReadResult readResult = coordinator.read(
            "user:alice",
            ConsistencyLevel.ONE,  // Read from first available replica
            nodes
        );
        
        if (readResult.isSuccess()) {
            String value = new String(readResult.getValue());
            System.out.println("Value: " + value);
            System.out.println("Read repair performed: " + readResult.isRepairPerformed());
        }
        
        // Get statistics
        Map<String, Object> stats = coordinator.getStats();
        System.out.println("Replication stats: " + stats);
        
        // Cleanup
        coordinator.shutdown();
        storage.close();
    }
}

Configuration

Consistency Level Selection Guide

Use Case Write CL Read CL Explanation
High Availability ONE ONE Fastest, eventual consistency
Balanced QUORUM QUORUM Strong consistency, good performance
Strong Consistency QUORUM ALL Ensure all reads see latest write
Maximum Consistency ALL ALL Strictest, slowest
Session Consistency ONE QUORUM Fast writes, consistent reads

Replication Factor Guidelines

  • RF=1: No redundancy, single point of failure
  • RF=2: Limited fault tolerance (1 node failure)
  • RF=3: Good balance (2 node failures) - recommended
  • RF=5: High availability (4 node failures)

Read Repair Configuration

// Always perform read repair
ReadRepairManager readRepair = new ReadRepairManager(100);

// 10% chance (probabilistic)
ReadRepairManager readRepair = new ReadRepairManager(10);

// Never perform read repair
ReadRepairManager readRepair = new ReadRepairManager(0);

Hinted Handoff Configuration

HintedHandoffManager hintedHandoff = new HintedHandoffManager(
    "/var/lib/cube/hints",  // Directory for hints
    10000,                   // Max hints per node (prevent overflow)
    3600000                  // Hint window: 1 hour (discard older hints)
);

Performance Characteristics

Consistency Level Impact

CL Write Latency Read Latency Consistency Availability
ANY Lowest N/A Weakest Highest
ONE Very Low Very Low Weak High
QUORUM Medium Medium Strong Medium
ALL Highest Highest Strongest Lowest

Read Repair Overhead

  • 0% chance: No overhead, eventual consistency
  • 10% chance: ~10% of reads slightly slower, good balance
  • 100% chance: All reads check consistency, strongest guarantee

Hinted Handoff

  • Storage: ~1KB per hint
  • Replay: Background process, minimal impact
  • Network: Replayed when node recovers

Testing

# Run Phase 2 tests
mvn test -Dtest=ReplicationTest

# Expected output:
[INFO] Tests run: 13, Failures: 0, Errors: 0, Skipped: 0

Monitoring

// Get replication statistics
Map<String, Object> stats = coordinator.getStats();

System.out.println("Replication Factor: " + stats.get("replicationFactor"));
System.out.println("Pending Hints: " + stats.get("pendingHints"));
System.out.println("Read Repair Stats: " + stats.get("readRepairStats"));
System.out.println("Active Tasks: " + stats.get("activeReplicationTasks"));

Common Patterns

Strong Consistency Pattern

// Ensure readers always see latest write
coordinator.write(key, value, ConsistencyLevel.QUORUM, nodes);
coordinator.read(key, ConsistencyLevel.QUORUM, nodes);

High Availability Pattern

// Maximize availability with eventual consistency
coordinator.write(key, value, ConsistencyLevel.ONE, nodes);
coordinator.read(key, ConsistencyLevel.ONE, nodes);

Session Consistency Pattern

// Fast writes, but ensure reads are consistent
coordinator.write(key, value, ConsistencyLevel.ONE, nodes);
Thread.sleep(10); // Allow replication
coordinator.read(key, ConsistencyLevel.QUORUM, nodes);

Troubleshooting

"Not enough replicas available"

Cause: Fewer nodes than replication factor
Solution: Reduce RF or add more nodes

"Write timeout"

Cause: Nodes too slow or unreachable
Solution: Increase write timeout or use lower consistency level

"Too many hints"

Cause: Node down for extended period
Solution: Investigate node issues, consider manual repair

"Read repair conflicts"

Cause: Network partitions or clock skew
Solution: Use NTP for time sync, check network stability

Next Steps - Phase 3

  • Bloom Filters for faster negative lookups
  • Compression (Snappy, LZ4)
  • Leveled compaction strategy
  • Anti-entropy repair (Merkle trees)
  • Streaming for node replacement

Phase 2 Complete! Cube is now a true distributed database! 🎉

Key Achievements:

  • ✅ Tunable consistency levels
  • ✅ Read repair for consistency
  • ✅ Hinted handoff for availability
  • ✅ Multiple replication strategies
  • ✅ Comprehensive testing