package com.cube.replication;
import com.cube.cluster.ClusterNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
/**
* Read Repair Manager - Ensures consistency by comparing replicas during reads.
* When inconsistencies are detected, the most recent value is propagated to all replicas.
*/
public class ReadRepairManager {
private static final Logger logger = LoggerFactory.getLogger(ReadRepairManager.class);
private final ExecutorService repairExecutor;
private final long readRepairChance; // Percentage (0-100)
private final Random random;
/**
* Represents a read response from a replica
*/
public static class ReadResponse {
private final ClusterNode node;
private final String key;
private final byte[] value;
private final long timestamp;
private final boolean found;
public ReadResponse(ClusterNode node, String key, byte[] value, long timestamp) {
this.node = node;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.found = value != null;
}
public ClusterNode getNode() { return node; }
public String getKey() { return key; }
public byte[] getValue() { return value; }
public long getTimestamp() { return timestamp; }
public boolean isFound() { return found; }
}
/**
* Represents a read repair result
*/
public static class ReadRepairResult {
private final String key;
private final boolean repairNeeded;
private final int repairedNodes;
private final byte[] canonicalValue;
public ReadRepairResult(String key, boolean repairNeeded, int repairedNodes, byte[] canonicalValue) {
this.key = key;
this.repairNeeded = repairNeeded;
this.repairedNodes = repairedNodes;
this.canonicalValue = canonicalValue;
}
public String getKey() { return key; }
public boolean isRepairNeeded() { return repairNeeded; }
public int getRepairedNodes() { return repairedNodes; }
public byte[] getCanonicalValue() { return canonicalValue; }
}
public ReadRepairManager(long readRepairChance) {
this.readRepairChance = readRepairChance;
this.random = new Random();
this.repairExecutor = Executors.newFixedThreadPool(
Math.max(2, Runtime.getRuntime().availableProcessors() / 2),
r -> {
Thread t = new Thread(r, "CubeDB-ReadRepair");
t.setDaemon(true);
return t;
}
);
logger.info("Read repair manager initialized with {}% chance", readRepairChance);
}
/**
* Check if read repair should be performed for this read
*/
public boolean shouldPerformReadRepair() {
if (readRepairChance >= 100) {
return true;
}
if (readRepairChance <= 0) {
return false;
}
return random.nextInt(100) < readRepairChance;
}
/**
* Perform read repair by comparing responses from all replicas.
* Returns the most recent value and asynchronously repairs inconsistent replicas.
*
* @param responses Responses from all replica nodes
* @param repairCallback Callback to write repaired values to nodes
* @return The canonical (most recent) value
*/
public CompletableFuture<ReadRepairResult> performReadRepair(
List<ReadResponse> responses,
RepairCallback repairCallback) {
if (responses.isEmpty()) {
return CompletableFuture.completedFuture(
new ReadRepairResult(null, false, 0, null));
}
String key = responses.get(0).getKey();
// Find the most recent value (highest timestamp)
ReadResponse canonical = responses.stream()
.filter(ReadResponse::isFound)
.max(Comparator.comparingLong(ReadResponse::getTimestamp))
.orElse(null);
if (canonical == null) {
// No value found on any replica
return CompletableFuture.completedFuture(
new ReadRepairResult(key, false, 0, null));
}
// Check if repair is needed
boolean repairNeeded = false;
List<ClusterNode> nodesToRepair = new ArrayList<>();
for (ReadResponse response : responses) {
if (!response.isFound()) {
// Node doesn't have the value
repairNeeded = true;
nodesToRepair.add(response.getNode());
} else if (response.getTimestamp() < canonical.getTimestamp()) {
// Node has stale value
repairNeeded = true;
nodesToRepair.add(response.getNode());
} else if (!Arrays.equals(response.getValue(), canonical.getValue())) {
// Node has different value with same timestamp (conflict)
repairNeeded = true;
nodesToRepair.add(response.getNode());
}
}
if (!repairNeeded) {
return CompletableFuture.completedFuture(
new ReadRepairResult(key, false, 0, canonical.getValue()));
}
// Perform async repair
return CompletableFuture.supplyAsync(() -> {
int repairedCount = 0;
for (ClusterNode node : nodesToRepair) {
try {
boolean success = repairCallback.repairNode(
node,
canonical.getKey(),
canonical.getValue(),
canonical.getTimestamp()
);
if (success) {
repairedCount++;
logger.debug("Repaired node {} for key {}", node.getNodeId(), key);
}
} catch (Exception e) {
logger.error("Failed to repair node " + node.getNodeId() + " for key " + key, e);
}
}
if (repairedCount > 0) {
logger.info("Read repair completed for key {}: repaired {} of {} nodes",
key, repairedCount, nodesToRepair.size());
}
return new ReadRepairResult(key, true, repairedCount, canonical.getValue());
}, repairExecutor);
}
/**
* Perform blocking read repair
*/
public ReadRepairResult performReadRepairBlocking(
List<ReadResponse> responses,
RepairCallback repairCallback) throws Exception {
return performReadRepair(responses, repairCallback)
.get(5, TimeUnit.SECONDS);
}
/**
* Detect conflicts between replicas
*/
public List<ReadResponse> detectConflicts(List<ReadResponse> responses) {
List<ReadResponse> conflicts = new ArrayList<>();
if (responses.size() < 2) {
return conflicts;
}
// Group by value
Map<String, List<ReadResponse>> valueGroups = new HashMap<>();
for (ReadResponse response : responses) {
if (response.isFound()) {
String valueKey = Arrays.toString(response.getValue()) + ":" + response.getTimestamp();
valueGroups.computeIfAbsent(valueKey, k -> new ArrayList<>()).add(response);
}
}
// If we have more than one distinct value, we have conflicts
if (valueGroups.size() > 1) {
conflicts.addAll(responses);
}
return conflicts;
}
/**
* Get read repair statistics
*/
public Map<String, Object> getStats() {
Map<String, Object> stats = new HashMap<>();
stats.put("readRepairChance", readRepairChance + "%");
stats.put("repairThreads", ((ThreadPoolExecutor) repairExecutor).getPoolSize());
stats.put("activeRepairs", ((ThreadPoolExecutor) repairExecutor).getActiveCount());
stats.put("queuedRepairs", ((ThreadPoolExecutor) repairExecutor).getQueue().size());
return stats;
}
/**
* Shutdown the repair executor
*/
public void shutdown() {
repairExecutor.shutdown();
try {
if (!repairExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
repairExecutor.shutdownNow();
}
} catch (InterruptedException e) {
repairExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
logger.info("Read repair manager shutdown");
}
/**
* Callback interface for repairing nodes
*/
@FunctionalInterface
public interface RepairCallback {
/**
* Write the canonical value to a node
*
* @param node The node to repair
* @param key The key to write
* @param value The canonical value
* @param timestamp The timestamp of the canonical value
* @return true if repair succeeded
*/
boolean repairNode(ClusterNode node, String key, byte[] value, long timestamp) throws Exception;
}
}