package com.cube.replication;
import com.cube.cluster.ClusterNode;
import com.cube.consistency.ConsistencyLevel;
import com.cube.storage.LSMStorageEngine;
import org.junit.jupiter.api.*;
import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import static org.junit.jupiter.api.Assertions.*;
/**
* Tests for Phase 2: Replication and Consistency
*/
public class ReplicationTest {
private Path testDir;
private LSMStorageEngine storage;
private HintedHandoffManager hintedHandoff;
private ReadRepairManager readRepair;
private ReplicationCoordinator coordinator;
@BeforeEach
public void setUp() throws IOException {
testDir = Files.createTempDirectory("cube-replication-test-");
storage = new LSMStorageEngine(testDir.toString());
Path hintsDir = testDir.resolve("hints");
hintedHandoff = new HintedHandoffManager(hintsDir.toString(), 1000, 3600000);
readRepair = new ReadRepairManager(100); // 100% read repair chance
ReplicationStrategy strategy = new SimpleReplicationStrategy();
coordinator = new ReplicationCoordinator(
storage,
strategy,
hintedHandoff,
readRepair,
3, // RF=3
5000, // 5s write timeout
3000 // 3s read timeout
);
}
@AfterEach
public void tearDown() throws IOException {
if (coordinator != null) {
coordinator.shutdown();
}
if (storage != null) {
storage.close();
}
deleteDirectory(testDir);
}
@Test
public void testConsistencyLevelCalculation() {
// QUORUM with RF=3 should require 2 responses
assertEquals(2, ConsistencyLevel.QUORUM.getRequiredResponses(3));
// QUORUM with RF=5 should require 3 responses
assertEquals(3, ConsistencyLevel.QUORUM.getRequiredResponses(5));
// ALL should require all replicas
assertEquals(3, ConsistencyLevel.ALL.getRequiredResponses(3));
assertEquals(5, ConsistencyLevel.ALL.getRequiredResponses(5));
// ONE should require 1
assertEquals(1, ConsistencyLevel.ONE.getRequiredResponses(3));
}
@Test
public void testSimpleReplicationStrategy() {
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy();
List<ClusterNode> nodes = new ArrayList<>();
nodes.add(new ClusterNode("node1", "host1", 8080));
nodes.add(new ClusterNode("node2", "host2", 8080));
nodes.add(new ClusterNode("node3", "host3", 8080));
// Get replicas for a key
List<ClusterNode> replicas = strategy.getReplicaNodes("testkey", 2, nodes);
assertFalse(replicas.isEmpty());
assertTrue(replicas.size() <= 2);
}
@Test
public void testHintedHandoff() {
String targetNode = "node-2";
String key = "test-key";
byte[] value = "test-value".getBytes();
// Store hint
hintedHandoff.storeHint(targetNode, key, value);
// Verify hint was stored
assertEquals(1, hintedHandoff.getHintCount(targetNode));
assertEquals(1, hintedHandoff.getTotalHintCount());
}
@Test
public void testHintedHandoffReplay() {
String targetNode = "node-2";
String key = "test-key";
byte[] value = "test-value".getBytes();
// Store hint
hintedHandoff.storeHint(targetNode, key, value);
// Replay hints
List<String> replayedKeys = new ArrayList<>();
hintedHandoff.replayHintsForNode(targetNode, hint -> {
replayedKeys.add(hint.getKey());
return true; // Success
});
// Verify hint was replayed
assertTrue(replayedKeys.contains(key));
assertEquals(0, hintedHandoff.getHintCount(targetNode));
}
@Test
public void testReadRepairDetection() {
ReadRepairManager manager = new ReadRepairManager(100);
ClusterNode node1 = new ClusterNode("node1", "host1", 8080);
ClusterNode node2 = new ClusterNode("node2", "host2", 8080);
ClusterNode node3 = new ClusterNode("node3", "host3", 8080);
// Create responses with different values
List<ReadRepairManager.ReadResponse> responses = new ArrayList<>();
responses.add(new ReadRepairManager.ReadResponse(node1, "key1", "value1".getBytes(), 1000));
responses.add(new ReadRepairManager.ReadResponse(node2, "key1", "value2".getBytes(), 2000)); // Newer
responses.add(new ReadRepairManager.ReadResponse(node3, "key1", "value1".getBytes(), 1000));
// Detect conflicts
List<ReadRepairManager.ReadResponse> conflicts = manager.detectConflicts(responses);
assertEquals(3, conflicts.size(), "Should detect conflicts when values differ");
manager.shutdown();
}
@Test
public void testReadRepairChoosesNewestValue() throws Exception {
ReadRepairManager manager = new ReadRepairManager(100);
ClusterNode node1 = new ClusterNode("node1", "host1", 8080);
ClusterNode node2 = new ClusterNode("node2", "host2", 8080);
ClusterNode node3 = new ClusterNode("node3", "host3", 8080);
List<ReadRepairManager.ReadResponse> responses = new ArrayList<>();
responses.add(new ReadRepairManager.ReadResponse(node1, "key1", "old".getBytes(), 1000));
responses.add(new ReadRepairManager.ReadResponse(node2, "key1", "new".getBytes(), 3000)); // Newest
responses.add(new ReadRepairManager.ReadResponse(node3, "key1", "old".getBytes(), 1000));
// Perform read repair
List<String> repairedNodes = new ArrayList<>();
ReadRepairManager.ReadRepairResult result = manager.performReadRepairBlocking(
responses,
(node, key, value, timestamp) -> {
repairedNodes.add(node.getNodeId());
return true;
}
);
// Verify newest value was chosen
assertArrayEquals("new".getBytes(), result.getCanonicalValue());
// Verify repair was needed
assertTrue(result.isRepairNeeded());
// Verify old nodes were repaired
assertEquals(2, result.getRepairedNodes());
assertTrue(repairedNodes.contains("node1"));
assertTrue(repairedNodes.contains("node3"));
manager.shutdown();
}
@Test
public void testNetworkTopologyStrategy() {
Map<String, Integer> dcRF = new HashMap<>();
dcRF.put("dc1", 3);
dcRF.put("dc2", 2);
NetworkTopologyReplicationStrategy strategy =
new NetworkTopologyReplicationStrategy(dcRF);
List<ClusterNode> nodes = new ArrayList<>();
nodes.add(new ClusterNode("node1", "host1", 8080, "dc1", "rack1"));
nodes.add(new ClusterNode("node2", "host2", 8080, "dc1", "rack2"));
nodes.add(new ClusterNode("node3", "host3", 8080, "dc1", "rack3"));
nodes.add(new ClusterNode("node4", "host4", 8080, "dc2", "rack1"));
nodes.add(new ClusterNode("node5", "host5", 8080, "dc2", "rack2"));
List<ClusterNode> replicas = strategy.getReplicaNodes("testkey", 3, nodes);
assertFalse(replicas.isEmpty());
assertEquals("NetworkTopologyStrategy", strategy.getName());
}
@Test
public void testWriteWithConsistencyOne() throws IOException {
List<ClusterNode> nodes = new ArrayList<>();
nodes.add(new ClusterNode("node1", "localhost", 8080));
ReplicationCoordinator.WriteResult result = coordinator.write(
"key1",
"value1".getBytes(),
ConsistencyLevel.ONE,
nodes
);
assertTrue(result.isSuccess());
assertEquals(1, result.getSuccessfulWrites());
}
@Test
public void testReadWithConsistencyOne() throws IOException {
List<ClusterNode> nodes = new ArrayList<>();
nodes.add(new ClusterNode("node1", "localhost", 8080));
// Write first
coordinator.writeLocal("key1", "value1".getBytes());
// Read
ReplicationCoordinator.ReadResult result = coordinator.read(
"key1",
ConsistencyLevel.ONE,
nodes
);
assertTrue(result.isSuccess());
assertArrayEquals("value1".getBytes(), result.getValue());
}
@Test
public void testCoordinatorStats() {
Map<String, Object> stats = coordinator.getStats();
assertNotNull(stats);
assertEquals(3, stats.get("replicationFactor"));
assertEquals(5000L, stats.get("writeTimeoutMs"));
assertEquals(3000L, stats.get("readTimeoutMs"));
}
private void deleteDirectory(Path dir) throws IOException {
if (Files.exists(dir)) {
Files.walk(dir)
.sorted(Comparator.reverseOrder())
.forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
// Ignore
}
});
}
}
}