package com.cube.examples;
import com.cube.cluster.ClusterNode;
import com.cube.consistency.ConsistencyLevel;
import com.cube.replication.*;
import com.cube.storage.LSMStorageEngine;
import java.util.*;
/**
* Runnable examples demonstrating Phase 2 features
*/
public class Phase2Examples {
public static void main(String[] args) throws Exception {
System.out.println("=== Cube Database Phase 2 Examples ===\n");
example1_ConsistencyLevels();
example2_HintedHandoff();
example3_ReadRepair();
example4_ReplicationStrategies();
example5_CompleteWorkflow();
System.out.println("\n=== All Phase 2 examples completed! ===");
}
/**
* Example 1: Consistency Levels
*/
public static void example1_ConsistencyLevels() {
System.out.println("Example 1: Consistency Levels");
System.out.println("-------------------------------");
int rf = 3; // Replication Factor
System.out.println("With RF=" + rf + ":");
System.out.println(" ONE requires: " + ConsistencyLevel.ONE.getRequiredResponses(rf) + " response(s)");
System.out.println(" TWO requires: " + ConsistencyLevel.TWO.getRequiredResponses(rf) + " response(s)");
System.out.println(" QUORUM requires: " + ConsistencyLevel.QUORUM.getRequiredResponses(rf) + " response(s)");
System.out.println(" ALL requires: " + ConsistencyLevel.ALL.getRequiredResponses(rf) + " response(s)");
System.out.println("\nConsistency Level Properties:");
System.out.println(" QUORUM is quorum: " + ConsistencyLevel.QUORUM.isQuorum());
System.out.println(" ANY allows hints: " + ConsistencyLevel.ANY.allowsHints());
System.out.println(" LOCAL_ONE is local: " + ConsistencyLevel.LOCAL_ONE.isLocal());
System.out.println();
}
/**
* Example 2: Hinted Handoff
*/
public static void example2_HintedHandoff() throws Exception {
System.out.println("Example 2: Hinted Handoff");
System.out.println("--------------------------");
HintedHandoffManager hintedHandoff = new HintedHandoffManager(
"/tmp/cube-hints-example",
1000, // Max 1000 hints per node
3600000 // 1 hour window
);
// Simulate storing hints for unavailable nodes
System.out.println("Storing hints for unavailable nodes...");
hintedHandoff.storeHint("node-2", "user:1", "Alice".getBytes());
hintedHandoff.storeHint("node-2", "user:2", "Bob".getBytes());
hintedHandoff.storeHint("node-3", "user:3", "Charlie".getBytes());
System.out.println("✓ Stored 3 hints");
System.out.println(" Hints for node-2: " + hintedHandoff.getHintCount("node-2"));
System.out.println(" Hints for node-3: " + hintedHandoff.getHintCount("node-3"));
System.out.println(" Total hints: " + hintedHandoff.getTotalHintCount());
// Simulate replaying hints
System.out.println("\nReplaying hints for node-2...");
int[] replayed = {0};
hintedHandoff.replayHintsForNode("node-2", hint -> {
System.out.println(" → Replaying hint: key=" + hint.getKey());
replayed[0]++;
return true; // Simulate successful replay
});
System.out.println("✓ Replayed " + replayed[0] + " hints");
System.out.println(" Remaining hints for node-2: " + hintedHandoff.getHintCount("node-2"));
hintedHandoff.shutdown();
System.out.println();
}
/**
* Example 3: Read Repair
*/
public static void example3_ReadRepair() throws Exception {
System.out.println("Example 3: Read Repair");
System.out.println("-----------------------");
ReadRepairManager readRepair = new ReadRepairManager(100); // 100% chance
// Create cluster nodes
ClusterNode node1 = new ClusterNode("node-1", "host1", 8080);
ClusterNode node2 = new ClusterNode("node-2", "host2", 8080);
ClusterNode node3 = new ClusterNode("node-3", "host3", 8080);
// Simulate responses with inconsistent values
System.out.println("Received responses from replicas:");
List<ReadRepairManager.ReadResponse> responses = new ArrayList<>();
responses.add(new ReadRepairManager.ReadResponse(node1, "user:1", "Alice_v1".getBytes(), 1000));
responses.add(new ReadRepairManager.ReadResponse(node2, "user:1", "Alice_v2".getBytes(), 2000)); // Newest
responses.add(new ReadRepairManager.ReadResponse(node3, "user:1", "Alice_v1".getBytes(), 1000));
System.out.println(" node-1: value=Alice_v1, timestamp=1000");
System.out.println(" node-2: value=Alice_v2, timestamp=2000 (newest)");
System.out.println(" node-3: value=Alice_v1, timestamp=1000");
// Detect conflicts
List<ReadRepairManager.ReadResponse> conflicts = readRepair.detectConflicts(responses);
System.out.println("\n✓ Detected " + conflicts.size() + " conflicting responses");
// Perform read repair
System.out.println("\nPerforming read repair...");
List<String> repairedNodes = new ArrayList<>();
ReadRepairManager.ReadRepairResult result = readRepair.performReadRepairBlocking(
responses,
(node, key, value, timestamp) -> {
System.out.println(" → Repairing " + node.getNodeId() + " with newest value");
repairedNodes.add(node.getNodeId());
return true;
}
);
System.out.println("\n✓ Read repair completed:");
System.out.println(" Canonical value: " + new String(result.getCanonicalValue()));
System.out.println(" Repair needed: " + result.isRepairNeeded());
System.out.println(" Nodes repaired: " + result.getRepairedNodes());
System.out.println(" Repaired nodes: " + repairedNodes);
readRepair.shutdown();
System.out.println();
}
/**
* Example 4: Replication Strategies
*/
public static void example4_ReplicationStrategies() {
System.out.println("Example 4: Replication Strategies");
System.out.println("----------------------------------");
// Create cluster nodes
List<ClusterNode> nodes = new ArrayList<>();
nodes.add(new ClusterNode("node-1", "10.0.0.1", 8080, "dc1", "rack1"));
nodes.add(new ClusterNode("node-2", "10.0.0.2", 8080, "dc1", "rack2"));
nodes.add(new ClusterNode("node-3", "10.0.0.3", 8080, "dc1", "rack3"));
nodes.add(new ClusterNode("node-4", "10.0.0.4", 8080, "dc2", "rack1"));
nodes.add(new ClusterNode("node-5", "10.0.0.5", 8080, "dc2", "rack2"));
// Simple Strategy
System.out.println("Simple Replication Strategy:");
SimpleReplicationStrategy simpleStrategy = new SimpleReplicationStrategy();
List<ClusterNode> simpleReplicas = simpleStrategy.getReplicaNodes("user:123", 3, nodes);
System.out.println(" Replicas for 'user:123' (RF=3):");
for (ClusterNode node : simpleReplicas) {
System.out.println(" - " + node.getNodeId() + " (" + node.getEndpoint() + ")");
}
// Network Topology Strategy
System.out.println("\nNetwork Topology Strategy:");
Map<String, Integer> dcRF = new HashMap<>();
dcRF.put("dc1", 2);
dcRF.put("dc2", 2);
NetworkTopologyReplicationStrategy ntsStrategy =
new NetworkTopologyReplicationStrategy(dcRF);
List<ClusterNode> ntsReplicas = ntsStrategy.getReplicaNodes("user:123", 3, nodes);
System.out.println(" Replicas for 'user:123' (dc1=2, dc2=2):");
for (ClusterNode node : ntsReplicas) {
System.out.println(" - " + node.getNodeId() +
" (dc=" + node.getDatacenter() +
", rack=" + node.getRack() + ")");
}
System.out.println();
}
/**
* Example 5: Complete Workflow
*/
public static void example5_CompleteWorkflow() throws Exception {
System.out.println("Example 5: Complete Replication Workflow");
System.out.println("-----------------------------------------");
// Initialize components
LSMStorageEngine storage = new LSMStorageEngine("/tmp/cube-phase2-example");
HintedHandoffManager hintedHandoff = new HintedHandoffManager(
"/tmp/cube-phase2-hints", 1000, 3600000);
ReadRepairManager readRepair = new ReadRepairManager(10);
ReplicationStrategy strategy = new SimpleReplicationStrategy();
ReplicationCoordinator coordinator = new ReplicationCoordinator(
storage,
strategy,
hintedHandoff,
readRepair,
3, // RF=3
5000, // 5s write timeout
3000 // 3s read timeout
);
// Create cluster
List<ClusterNode> nodes = new ArrayList<>();
nodes.add(new ClusterNode("node-1", "localhost", 8080));
System.out.println("Cluster initialized:");
System.out.println(" Nodes: " + nodes.size());
System.out.println(" Replication Factor: 3");
System.out.println(" Strategy: " + strategy.getName());
// Perform writes with different consistency levels
System.out.println("\n--- Write Operations ---");
System.out.println("Writing 'user:alice' with CL=ONE...");
ReplicationCoordinator.WriteResult writeOne = coordinator.write(
"user:alice",
"Alice Johnson".getBytes(),
ConsistencyLevel.ONE,
nodes
);
System.out.println("✓ Success: " + writeOne.isSuccess() +
", replicas: " + writeOne.getSuccessfulWrites());
System.out.println("\nWriting 'user:bob' with CL=QUORUM...");
ReplicationCoordinator.WriteResult writeQuorum = coordinator.write(
"user:bob",
"Bob Smith".getBytes(),
ConsistencyLevel.QUORUM,
nodes
);
System.out.println("✓ Success: " + writeQuorum.isSuccess() +
", replicas: " + writeQuorum.getSuccessfulWrites());
// Perform reads
System.out.println("\n--- Read Operations ---");
System.out.println("Reading 'user:alice' with CL=ONE...");
ReplicationCoordinator.ReadResult readOne = coordinator.read(
"user:alice",
ConsistencyLevel.ONE,
nodes
);
if (readOne.isSuccess()) {
System.out.println("✓ Value: " + new String(readOne.getValue()));
System.out.println(" Responses: " + readOne.getResponsesReceived());
System.out.println(" Read repair: " + readOne.isRepairPerformed());
}
System.out.println("\nReading 'user:bob' with CL=QUORUM...");
ReplicationCoordinator.ReadResult readQuorum = coordinator.read(
"user:bob",
ConsistencyLevel.QUORUM,
nodes
);
if (readQuorum.isSuccess()) {
System.out.println("✓ Value: " + new String(readQuorum.getValue()));
System.out.println(" Responses: " + readQuorum.getResponsesReceived());
}
// Show statistics
System.out.println("\n--- Replication Statistics ---");
Map<String, Object> stats = coordinator.getStats();
System.out.println("Replication Factor: " + stats.get("replicationFactor"));
System.out.println("Write Timeout: " + stats.get("writeTimeoutMs") + "ms");
System.out.println("Read Timeout: " + stats.get("readTimeoutMs") + "ms");
System.out.println("Pending Hints: " + stats.get("pendingHints"));
System.out.println("Replication Strategy: " + stats.get("replicationStrategy"));
// Cleanup
coordinator.shutdown();
storage.close();
System.out.println("\n✓ Workflow completed successfully");
System.out.println();
}
}