Newer
Older
noctua / src / main / java / com / cube / replication / ReadRepairManager.java
@agalyaramadoss agalyaramadoss on 13 Feb 9 KB first commit
package com.cube.replication;

import com.cube.cluster.ClusterNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;

/**
 * Read Repair Manager - Ensures consistency by comparing replicas during reads.
 * When inconsistencies are detected, the most recent value is propagated to all replicas.
 */
public class ReadRepairManager {
    
    private static final Logger logger = LoggerFactory.getLogger(ReadRepairManager.class);
    
    private final ExecutorService repairExecutor;
    private final long readRepairChance; // Percentage (0-100)
    private final Random random;
    
    /**
     * Represents a read response from a replica
     */
    public static class ReadResponse {
        private final ClusterNode node;
        private final String key;
        private final byte[] value;
        private final long timestamp;
        private final boolean found;
        
        public ReadResponse(ClusterNode node, String key, byte[] value, long timestamp) {
            this.node = node;
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
            this.found = value != null;
        }
        
        public ClusterNode getNode() { return node; }
        public String getKey() { return key; }
        public byte[] getValue() { return value; }
        public long getTimestamp() { return timestamp; }
        public boolean isFound() { return found; }
    }
    
    /**
     * Represents a read repair result
     */
    public static class ReadRepairResult {
        private final String key;
        private final boolean repairNeeded;
        private final int repairedNodes;
        private final byte[] canonicalValue;
        
        public ReadRepairResult(String key, boolean repairNeeded, int repairedNodes, byte[] canonicalValue) {
            this.key = key;
            this.repairNeeded = repairNeeded;
            this.repairedNodes = repairedNodes;
            this.canonicalValue = canonicalValue;
        }
        
        public String getKey() { return key; }
        public boolean isRepairNeeded() { return repairNeeded; }
        public int getRepairedNodes() { return repairedNodes; }
        public byte[] getCanonicalValue() { return canonicalValue; }
    }
    
    public ReadRepairManager(long readRepairChance) {
        this.readRepairChance = readRepairChance;
        this.random = new Random();
        
        this.repairExecutor = Executors.newFixedThreadPool(
            Math.max(2, Runtime.getRuntime().availableProcessors() / 2),
            r -> {
                Thread t = new Thread(r, "CubeDB-ReadRepair");
                t.setDaemon(true);
                return t;
            }
        );
        
        logger.info("Read repair manager initialized with {}% chance", readRepairChance);
    }
    
    /**
     * Check if read repair should be performed for this read
     */
    public boolean shouldPerformReadRepair() {
        if (readRepairChance >= 100) {
            return true;
        }
        if (readRepairChance <= 0) {
            return false;
        }
        return random.nextInt(100) < readRepairChance;
    }
    
    /**
     * Perform read repair by comparing responses from all replicas.
     * Returns the most recent value and asynchronously repairs inconsistent replicas.
     * 
     * @param responses Responses from all replica nodes
     * @param repairCallback Callback to write repaired values to nodes
     * @return The canonical (most recent) value
     */
    public CompletableFuture<ReadRepairResult> performReadRepair(
            List<ReadResponse> responses,
            RepairCallback repairCallback) {
        
        if (responses.isEmpty()) {
            return CompletableFuture.completedFuture(
                new ReadRepairResult(null, false, 0, null));
        }
        
        String key = responses.get(0).getKey();
        
        // Find the most recent value (highest timestamp)
        ReadResponse canonical = responses.stream()
            .filter(ReadResponse::isFound)
            .max(Comparator.comparingLong(ReadResponse::getTimestamp))
            .orElse(null);
        
        if (canonical == null) {
            // No value found on any replica
            return CompletableFuture.completedFuture(
                new ReadRepairResult(key, false, 0, null));
        }
        
        // Check if repair is needed
        boolean repairNeeded = false;
        List<ClusterNode> nodesToRepair = new ArrayList<>();
        
        for (ReadResponse response : responses) {
            if (!response.isFound()) {
                // Node doesn't have the value
                repairNeeded = true;
                nodesToRepair.add(response.getNode());
            } else if (response.getTimestamp() < canonical.getTimestamp()) {
                // Node has stale value
                repairNeeded = true;
                nodesToRepair.add(response.getNode());
            } else if (!Arrays.equals(response.getValue(), canonical.getValue())) {
                // Node has different value with same timestamp (conflict)
                repairNeeded = true;
                nodesToRepair.add(response.getNode());
            }
        }
        
        if (!repairNeeded) {
            return CompletableFuture.completedFuture(
                new ReadRepairResult(key, false, 0, canonical.getValue()));
        }
        
        // Perform async repair
        return CompletableFuture.supplyAsync(() -> {
            int repairedCount = 0;
            
            for (ClusterNode node : nodesToRepair) {
                try {
                    boolean success = repairCallback.repairNode(
                        node, 
                        canonical.getKey(), 
                        canonical.getValue(), 
                        canonical.getTimestamp()
                    );
                    
                    if (success) {
                        repairedCount++;
                        logger.debug("Repaired node {} for key {}", node.getNodeId(), key);
                    }
                } catch (Exception e) {
                    logger.error("Failed to repair node " + node.getNodeId() + " for key " + key, e);
                }
            }
            
            if (repairedCount > 0) {
                logger.info("Read repair completed for key {}: repaired {} of {} nodes", 
                    key, repairedCount, nodesToRepair.size());
            }
            
            return new ReadRepairResult(key, true, repairedCount, canonical.getValue());
            
        }, repairExecutor);
    }
    
    /**
     * Perform blocking read repair
     */
    public ReadRepairResult performReadRepairBlocking(
            List<ReadResponse> responses,
            RepairCallback repairCallback) throws Exception {
        
        return performReadRepair(responses, repairCallback)
            .get(5, TimeUnit.SECONDS);
    }
    
    /**
     * Detect conflicts between replicas
     */
    public List<ReadResponse> detectConflicts(List<ReadResponse> responses) {
        List<ReadResponse> conflicts = new ArrayList<>();
        
        if (responses.size() < 2) {
            return conflicts;
        }
        
        // Group by value
        Map<String, List<ReadResponse>> valueGroups = new HashMap<>();
        
        for (ReadResponse response : responses) {
            if (response.isFound()) {
                String valueKey = Arrays.toString(response.getValue()) + ":" + response.getTimestamp();
                valueGroups.computeIfAbsent(valueKey, k -> new ArrayList<>()).add(response);
            }
        }
        
        // If we have more than one distinct value, we have conflicts
        if (valueGroups.size() > 1) {
            conflicts.addAll(responses);
        }
        
        return conflicts;
    }
    
    /**
     * Get read repair statistics
     */
    public Map<String, Object> getStats() {
        Map<String, Object> stats = new HashMap<>();
        stats.put("readRepairChance", readRepairChance + "%");
        stats.put("repairThreads", ((ThreadPoolExecutor) repairExecutor).getPoolSize());
        stats.put("activeRepairs", ((ThreadPoolExecutor) repairExecutor).getActiveCount());
        stats.put("queuedRepairs", ((ThreadPoolExecutor) repairExecutor).getQueue().size());
        return stats;
    }
    
    /**
     * Shutdown the repair executor
     */
    public void shutdown() {
        repairExecutor.shutdown();
        try {
            if (!repairExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
                repairExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            repairExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        logger.info("Read repair manager shutdown");
    }
    
    /**
     * Callback interface for repairing nodes
     */
    @FunctionalInterface
    public interface RepairCallback {
        /**
         * Write the canonical value to a node
         * 
         * @param node The node to repair
         * @param key The key to write
         * @param value The canonical value
         * @param timestamp The timestamp of the canonical value
         * @return true if repair succeeded
         */
        boolean repairNode(ClusterNode node, String key, byte[] value, long timestamp) throws Exception;
    }
}