Phase 2 adds distributed database capabilities with tunable consistency levels, read repair, and hinted handoff - making Cube truly Cassandra-like!
Control the trade-off between consistency, availability, and performance:
Automatically detects and repairs inconsistencies during reads:
Handles temporarily unavailable nodes:
SimpleReplicationStrategy:
NetworkTopologyStrategy:
┌─────────────────────────────────────────────────────────┐ │ Replication Coordinator │ ├─────────────────────────────────────────────────────────┤ │ │ │ Write Path: │ │ ┌──────────┐ │ │ │ Client │ │ │ └────┬─────┘ │ │ │ CL=QUORUM │ │ ▼ │ │ ┌──────────────┐ │ │ │ Coordinator │ │ │ └───┬──┬───┬───┘ │ │ │ │ │ Write to RF=3 replicas │ │ ▼ ▼ ▼ │ │ Node1 Node2 Node3 │ │ ✓ ✓ ✗ (down) │ │ │ │ │ ▼ │ │ [Hinted Handoff] │ │ Store hint for Node3 │ │ │ │ Read Path with Read Repair: │ │ ┌──────────┐ │ │ │ Client │ │ │ └────┬─────┘ │ │ │ CL=QUORUM │ │ ▼ │ │ ┌──────────────┐ │ │ │ Coordinator │ │ │ └───┬──┬───┬───┘ │ │ │ │ │ Read from replicas │ │ ▼ ▼ ▼ │ │ Node1 Node2 Node3 │ │ v1,t1 v2,t2 v1,t1 │ │ │ │ │ │ │ └──┴───┘ │ │ │ Compare responses │ │ ▼ │ │ Choose v2 (newest) │ │ │ │ │ ▼ │ │ [Read Repair] │ │ Repair Node1 & Node3 │ │ │ └─────────────────────────────────────────────────────────┘
import com.cube.consistency.ConsistencyLevel;
import com.cube.replication.ReplicationCoordinator;
// Write with QUORUM (strong consistency)
ReplicationCoordinator.WriteResult result = coordinator.write(
"user:123",
"Alice".getBytes(),
ConsistencyLevel.QUORUM,
clusterNodes
);
if (result.isSuccess()) {
System.out.println("Wrote to " + result.getSuccessfulWrites() + " replicas");
}
// Read with ONE (fast, eventual consistency)
ReplicationCoordinator.ReadResult readResult = coordinator.read(
"user:123",
ConsistencyLevel.ONE,
clusterNodes
);
if (readResult.isSuccess()) {
String value = new String(readResult.getValue());
System.out.println("Read value: " + value);
}
// Write with ALL (maximum consistency)
coordinator.write(
"important:data",
"critical".getBytes(),
ConsistencyLevel.ALL,
clusterNodes
);
import com.cube.replication.HintedHandoffManager;
// Initialize hinted handoff
HintedHandoffManager hintedHandoff = new HintedHandoffManager(
"/var/lib/cube/hints", // Hints directory
10000, // Max hints per node
3600000 // 1 hour hint window
);
// Store hint for unavailable node
hintedHandoff.storeHint(
"node-2", // Target node
"user:123", // Key
"Alice".getBytes() // Value
);
// Replay hints when node recovers
hintedHandoff.replayHintsForNode("node-2", hint -> {
// Send hint to node over network
return sendToNode(hint.getTargetNodeId(), hint.getKey(), hint.getValue());
});
// Get hint statistics
int totalHints = hintedHandoff.getTotalHintCount();
int node2Hints = hintedHandoff.getHintCount("node-2");
import com.cube.replication.ReadRepairManager;
import com.cube.replication.ReadRepairManager.ReadResponse;
// Initialize read repair with 10% probability
ReadRepairManager readRepair = new ReadRepairManager(10);
// Collect responses from replicas
List<ReadResponse> responses = new ArrayList<>();
responses.add(new ReadResponse(node1, "key1", "old".getBytes(), 1000));
responses.add(new ReadResponse(node2, "key1", "new".getBytes(), 2000)); // Newer
responses.add(new ReadResponse(node3, "key1", "old".getBytes(), 1000));
// Perform read repair
ReadRepairManager.ReadRepairResult result = readRepair.performReadRepairBlocking(
responses,
(node, key, value, timestamp) -> {
// Repair the node
sendRepairToNode(node, key, value, timestamp);
return true;
}
);
// Check result
if (result.isRepairNeeded()) {
System.out.println("Repaired " + result.getRepairedNodes() + " nodes");
}
byte[] canonicalValue = result.getCanonicalValue(); // "new"
Simple Strategy:
import com.cube.replication.SimpleReplicationStrategy;
ReplicationStrategy strategy = new SimpleReplicationStrategy();
List<ClusterNode> replicas = strategy.getReplicaNodes(
"user:123", // Key
3, // Replication factor
allNodes // Available nodes
);
System.out.println("Replicas: " + replicas);
Network Topology Strategy:
import com.cube.replication.NetworkTopologyReplicationStrategy;
// Configure replication per datacenter
Map<String, Integer> dcRF = new HashMap<>();
dcRF.put("us-east", 3);
dcRF.put("us-west", 2);
dcRF.put("eu-west", 2);
ReplicationStrategy strategy = new NetworkTopologyReplicationStrategy(dcRF);
List<ClusterNode> replicas = strategy.getReplicaNodes(
"user:123",
3,
allNodes
);
// Will place 3 replicas in us-east, 2 in us-west, 2 in eu-west
import com.cube.cluster.ClusterNode;
import com.cube.consistency.ConsistencyLevel;
import com.cube.replication.*;
import com.cube.storage.LSMStorageEngine;
public class Phase2Example {
public static void main(String[] args) throws Exception {
// Initialize storage
LSMStorageEngine storage = new LSMStorageEngine("/var/lib/cube/data");
// Initialize components
HintedHandoffManager hintedHandoff = new HintedHandoffManager(
"/var/lib/cube/hints", 10000, 3600000);
ReadRepairManager readRepair = new ReadRepairManager(10);
ReplicationStrategy strategy = new SimpleReplicationStrategy();
ReplicationCoordinator coordinator = new ReplicationCoordinator(
storage,
strategy,
hintedHandoff,
readRepair,
3, // RF=3
5000, // 5s write timeout
3000 // 3s read timeout
);
// Define cluster
List<ClusterNode> nodes = new ArrayList<>();
nodes.add(new ClusterNode("node1", "10.0.0.1", 8080));
nodes.add(new ClusterNode("node2", "10.0.0.2", 8080));
nodes.add(new ClusterNode("node3", "10.0.0.3", 8080));
// Strong consistency write
ReplicationCoordinator.WriteResult writeResult = coordinator.write(
"user:alice",
"Alice Johnson".getBytes(),
ConsistencyLevel.QUORUM, // Wait for 2 of 3 replicas
nodes
);
System.out.println("Write successful: " + writeResult.isSuccess());
System.out.println("Replicas written: " + writeResult.getSuccessfulWrites());
// Fast eventual consistency read
ReplicationCoordinator.ReadResult readResult = coordinator.read(
"user:alice",
ConsistencyLevel.ONE, // Read from first available replica
nodes
);
if (readResult.isSuccess()) {
String value = new String(readResult.getValue());
System.out.println("Value: " + value);
System.out.println("Read repair performed: " + readResult.isRepairPerformed());
}
// Get statistics
Map<String, Object> stats = coordinator.getStats();
System.out.println("Replication stats: " + stats);
// Cleanup
coordinator.shutdown();
storage.close();
}
}
| Use Case | Write CL | Read CL | Explanation |
|---|---|---|---|
| High Availability | ONE | ONE | Fastest, eventual consistency |
| Balanced | QUORUM | QUORUM | Strong consistency, good performance |
| Strong Consistency | QUORUM | ALL | Ensure all reads see latest write |
| Maximum Consistency | ALL | ALL | Strictest, slowest |
| Session Consistency | ONE | QUORUM | Fast writes, consistent reads |
// Always perform read repair ReadRepairManager readRepair = new ReadRepairManager(100); // 10% chance (probabilistic) ReadRepairManager readRepair = new ReadRepairManager(10); // Never perform read repair ReadRepairManager readRepair = new ReadRepairManager(0);
HintedHandoffManager hintedHandoff = new HintedHandoffManager(
"/var/lib/cube/hints", // Directory for hints
10000, // Max hints per node (prevent overflow)
3600000 // Hint window: 1 hour (discard older hints)
);
| CL | Write Latency | Read Latency | Consistency | Availability |
|---|---|---|---|---|
| ANY | Lowest | N/A | Weakest | Highest |
| ONE | Very Low | Very Low | Weak | High |
| QUORUM | Medium | Medium | Strong | Medium |
| ALL | Highest | Highest | Strongest | Lowest |
# Run Phase 2 tests mvn test -Dtest=ReplicationTest # Expected output: [INFO] Tests run: 13, Failures: 0, Errors: 0, Skipped: 0
// Get replication statistics
Map<String, Object> stats = coordinator.getStats();
System.out.println("Replication Factor: " + stats.get("replicationFactor"));
System.out.println("Pending Hints: " + stats.get("pendingHints"));
System.out.println("Read Repair Stats: " + stats.get("readRepairStats"));
System.out.println("Active Tasks: " + stats.get("activeReplicationTasks"));
// Ensure readers always see latest write coordinator.write(key, value, ConsistencyLevel.QUORUM, nodes); coordinator.read(key, ConsistencyLevel.QUORUM, nodes);
// Maximize availability with eventual consistency coordinator.write(key, value, ConsistencyLevel.ONE, nodes); coordinator.read(key, ConsistencyLevel.ONE, nodes);
// Fast writes, but ensure reads are consistent coordinator.write(key, value, ConsistencyLevel.ONE, nodes); Thread.sleep(10); // Allow replication coordinator.read(key, ConsistencyLevel.QUORUM, nodes);
Cause: Fewer nodes than replication factor
Solution: Reduce RF or add more nodes
Cause: Nodes too slow or unreachable
Solution: Increase write timeout or use lower consistency level
Cause: Node down for extended period
Solution: Investigate node issues, consider manual repair
Cause: Network partitions or clock skew
Solution: Use NTP for time sync, check network stability
Phase 2 Complete! Cube is now a true distributed database! 🎉
Key Achievements: