Newer
Older
noctua / src / test / java / com / cube / replication / ReplicationTest.java
@agalyaramadoss agalyaramadoss on 13 Feb 9 KB first commit
package com.cube.replication;

import com.cube.cluster.ClusterNode;
import com.cube.consistency.ConsistencyLevel;
import com.cube.storage.LSMStorageEngine;
import org.junit.jupiter.api.*;

import java.io.IOException;
import java.nio.file.*;
import java.util.*;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Tests for Phase 2: Replication and Consistency
 */
public class ReplicationTest {
    
    private Path testDir;
    private LSMStorageEngine storage;
    private HintedHandoffManager hintedHandoff;
    private ReadRepairManager readRepair;
    private ReplicationCoordinator coordinator;
    
    @BeforeEach
    public void setUp() throws IOException {
        testDir = Files.createTempDirectory("cube-replication-test-");
        storage = new LSMStorageEngine(testDir.toString());
        
        Path hintsDir = testDir.resolve("hints");
        hintedHandoff = new HintedHandoffManager(hintsDir.toString(), 1000, 3600000);
        
        readRepair = new ReadRepairManager(100); // 100% read repair chance
        
        ReplicationStrategy strategy = new SimpleReplicationStrategy();
        
        coordinator = new ReplicationCoordinator(
            storage,
            strategy,
            hintedHandoff,
            readRepair,
            3, // RF=3
            5000, // 5s write timeout
            3000  // 3s read timeout
        );
    }
    
    @AfterEach
    public void tearDown() throws IOException {
        if (coordinator != null) {
            coordinator.shutdown();
        }
        if (storage != null) {
            storage.close();
        }
        deleteDirectory(testDir);
    }
    
    @Test
    public void testConsistencyLevelCalculation() {
        // QUORUM with RF=3 should require 2 responses
        assertEquals(2, ConsistencyLevel.QUORUM.getRequiredResponses(3));
        
        // QUORUM with RF=5 should require 3 responses
        assertEquals(3, ConsistencyLevel.QUORUM.getRequiredResponses(5));
        
        // ALL should require all replicas
        assertEquals(3, ConsistencyLevel.ALL.getRequiredResponses(3));
        assertEquals(5, ConsistencyLevel.ALL.getRequiredResponses(5));
        
        // ONE should require 1
        assertEquals(1, ConsistencyLevel.ONE.getRequiredResponses(3));
    }
    
    @Test
    public void testSimpleReplicationStrategy() {
        SimpleReplicationStrategy strategy = new SimpleReplicationStrategy();
        
        List<ClusterNode> nodes = new ArrayList<>();
        nodes.add(new ClusterNode("node1", "host1", 8080));
        nodes.add(new ClusterNode("node2", "host2", 8080));
        nodes.add(new ClusterNode("node3", "host3", 8080));
        
        // Get replicas for a key
        List<ClusterNode> replicas = strategy.getReplicaNodes("testkey", 2, nodes);
        
        assertFalse(replicas.isEmpty());
        assertTrue(replicas.size() <= 2);
    }
    
    @Test
    public void testHintedHandoff() {
        String targetNode = "node-2";
        String key = "test-key";
        byte[] value = "test-value".getBytes();
        
        // Store hint
        hintedHandoff.storeHint(targetNode, key, value);
        
        // Verify hint was stored
        assertEquals(1, hintedHandoff.getHintCount(targetNode));
        assertEquals(1, hintedHandoff.getTotalHintCount());
    }
    
    @Test
    public void testHintedHandoffReplay() {
        String targetNode = "node-2";
        String key = "test-key";
        byte[] value = "test-value".getBytes();
        
        // Store hint
        hintedHandoff.storeHint(targetNode, key, value);
        
        // Replay hints
        List<String> replayedKeys = new ArrayList<>();
        hintedHandoff.replayHintsForNode(targetNode, hint -> {
            replayedKeys.add(hint.getKey());
            return true; // Success
        });
        
        // Verify hint was replayed
        assertTrue(replayedKeys.contains(key));
        assertEquals(0, hintedHandoff.getHintCount(targetNode));
    }
    
    @Test
    public void testReadRepairDetection() {
        ReadRepairManager manager = new ReadRepairManager(100);
        
        ClusterNode node1 = new ClusterNode("node1", "host1", 8080);
        ClusterNode node2 = new ClusterNode("node2", "host2", 8080);
        ClusterNode node3 = new ClusterNode("node3", "host3", 8080);
        
        // Create responses with different values
        List<ReadRepairManager.ReadResponse> responses = new ArrayList<>();
        responses.add(new ReadRepairManager.ReadResponse(node1, "key1", "value1".getBytes(), 1000));
        responses.add(new ReadRepairManager.ReadResponse(node2, "key1", "value2".getBytes(), 2000)); // Newer
        responses.add(new ReadRepairManager.ReadResponse(node3, "key1", "value1".getBytes(), 1000));
        
        // Detect conflicts
        List<ReadRepairManager.ReadResponse> conflicts = manager.detectConflicts(responses);
        
        assertEquals(3, conflicts.size(), "Should detect conflicts when values differ");
        
        manager.shutdown();
    }
    
    @Test
    public void testReadRepairChoosesNewestValue() throws Exception {
        ReadRepairManager manager = new ReadRepairManager(100);
        
        ClusterNode node1 = new ClusterNode("node1", "host1", 8080);
        ClusterNode node2 = new ClusterNode("node2", "host2", 8080);
        ClusterNode node3 = new ClusterNode("node3", "host3", 8080);
        
        List<ReadRepairManager.ReadResponse> responses = new ArrayList<>();
        responses.add(new ReadRepairManager.ReadResponse(node1, "key1", "old".getBytes(), 1000));
        responses.add(new ReadRepairManager.ReadResponse(node2, "key1", "new".getBytes(), 3000)); // Newest
        responses.add(new ReadRepairManager.ReadResponse(node3, "key1", "old".getBytes(), 1000));
        
        // Perform read repair
        List<String> repairedNodes = new ArrayList<>();
        ReadRepairManager.ReadRepairResult result = manager.performReadRepairBlocking(
            responses,
            (node, key, value, timestamp) -> {
                repairedNodes.add(node.getNodeId());
                return true;
            }
        );
        
        // Verify newest value was chosen
        assertArrayEquals("new".getBytes(), result.getCanonicalValue());
        
        // Verify repair was needed
        assertTrue(result.isRepairNeeded());
        
        // Verify old nodes were repaired
        assertEquals(2, result.getRepairedNodes());
        assertTrue(repairedNodes.contains("node1"));
        assertTrue(repairedNodes.contains("node3"));
        
        manager.shutdown();
    }
    
    @Test
    public void testNetworkTopologyStrategy() {
        Map<String, Integer> dcRF = new HashMap<>();
        dcRF.put("dc1", 3);
        dcRF.put("dc2", 2);
        
        NetworkTopologyReplicationStrategy strategy = 
            new NetworkTopologyReplicationStrategy(dcRF);
        
        List<ClusterNode> nodes = new ArrayList<>();
        nodes.add(new ClusterNode("node1", "host1", 8080, "dc1", "rack1"));
        nodes.add(new ClusterNode("node2", "host2", 8080, "dc1", "rack2"));
        nodes.add(new ClusterNode("node3", "host3", 8080, "dc1", "rack3"));
        nodes.add(new ClusterNode("node4", "host4", 8080, "dc2", "rack1"));
        nodes.add(new ClusterNode("node5", "host5", 8080, "dc2", "rack2"));
        
        List<ClusterNode> replicas = strategy.getReplicaNodes("testkey", 3, nodes);
        
        assertFalse(replicas.isEmpty());
        assertEquals("NetworkTopologyStrategy", strategy.getName());
    }
    
    @Test
    public void testWriteWithConsistencyOne() throws IOException {
        List<ClusterNode> nodes = new ArrayList<>();
        nodes.add(new ClusterNode("node1", "localhost", 8080));
        
        ReplicationCoordinator.WriteResult result = coordinator.write(
            "key1",
            "value1".getBytes(),
            ConsistencyLevel.ONE,
            nodes
        );
        
        assertTrue(result.isSuccess());
        assertEquals(1, result.getSuccessfulWrites());
    }
    
    @Test
    public void testReadWithConsistencyOne() throws IOException {
        List<ClusterNode> nodes = new ArrayList<>();
        nodes.add(new ClusterNode("node1", "localhost", 8080));
        
        // Write first
        coordinator.writeLocal("key1", "value1".getBytes());
        
        // Read
        ReplicationCoordinator.ReadResult result = coordinator.read(
            "key1",
            ConsistencyLevel.ONE,
            nodes
        );
        
        assertTrue(result.isSuccess());
        assertArrayEquals("value1".getBytes(), result.getValue());
    }
    
    @Test
    public void testCoordinatorStats() {
        Map<String, Object> stats = coordinator.getStats();
        
        assertNotNull(stats);
        assertEquals(3, stats.get("replicationFactor"));
        assertEquals(5000L, stats.get("writeTimeoutMs"));
        assertEquals(3000L, stats.get("readTimeoutMs"));
    }
    
    private void deleteDirectory(Path dir) throws IOException {
        if (Files.exists(dir)) {
            Files.walk(dir)
                .sorted(Comparator.reverseOrder())
                .forEach(path -> {
                    try {
                        Files.delete(path);
                    } catch (IOException e) {
                        // Ignore
                    }
                });
        }
    }
}