package com.cube.replication;
import com.cube.cluster.ClusterNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
/**
* Hinted Handoff Manager - Stores writes for temporarily unavailable nodes.
* When a node is down, hints are stored locally and replayed when the node recovers.
*/
public class HintedHandoffManager {
private static final Logger logger = LoggerFactory.getLogger(HintedHandoffManager.class);
private final Path hintsDirectory;
private final Map<String, Queue<Hint>> hintsByNode; // nodeId -> hints
private final ScheduledExecutorService replayExecutor;
private final int maxHintsPerNode;
private final long hintWindowMs;
public static class Hint implements Serializable {
private static final long serialVersionUID = 1L;
private final String targetNodeId;
private final String key;
private final byte[] value;
private final long timestamp;
private final long expirationTime;
public Hint(String targetNodeId, String key, byte[] value, long hintWindowMs) {
this.targetNodeId = targetNodeId;
this.key = key;
this.value = value;
this.timestamp = System.currentTimeMillis();
this.expirationTime = timestamp + hintWindowMs;
}
public String getTargetNodeId() { return targetNodeId; }
public String getKey() { return key; }
public byte[] getValue() { return value; }
public long getTimestamp() { return timestamp; }
public boolean isExpired() {
return System.currentTimeMillis() > expirationTime;
}
}
public HintedHandoffManager(String hintsDir, int maxHintsPerNode, long hintWindowMs) throws IOException {
this.hintsDirectory = Paths.get(hintsDir);
this.maxHintsPerNode = maxHintsPerNode;
this.hintWindowMs = hintWindowMs;
this.hintsByNode = new ConcurrentHashMap<>();
Files.createDirectories(hintsDirectory);
this.replayExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "CubeDB-HintReplay");
t.setDaemon(true);
return t;
});
// Load existing hints from disk
loadHints();
// Schedule periodic replay
replayExecutor.scheduleAtFixedRate(
this::replayHints,
10,
10,
TimeUnit.SECONDS
);
logger.info("Hinted handoff manager initialized");
}
/**
* Store a hint for a temporarily unavailable node
*/
public void storeHint(String targetNodeId, String key, byte[] value) {
Queue<Hint> hints = hintsByNode.computeIfAbsent(targetNodeId, k -> new ConcurrentLinkedQueue<>());
// Check if we've exceeded max hints for this node
if (hints.size() >= maxHintsPerNode) {
logger.warn("Max hints reached for node {}, dropping hint", targetNodeId);
return;
}
Hint hint = new Hint(targetNodeId, key, value, hintWindowMs);
hints.add(hint);
// Persist hint to disk
try {
persistHint(hint);
} catch (IOException e) {
logger.error("Failed to persist hint for node " + targetNodeId, e);
}
logger.debug("Stored hint for node {} (key: {})", targetNodeId, key);
}
/**
* Get the number of hints for a specific node
*/
public int getHintCount(String targetNodeId) {
Queue<Hint> hints = hintsByNode.get(targetNodeId);
return hints != null ? hints.size() : 0;
}
/**
* Get total number of hints across all nodes
*/
public int getTotalHintCount() {
return hintsByNode.values().stream()
.mapToInt(Queue::size)
.sum();
}
/**
* Replay hints for a specific node
*/
public void replayHintsForNode(String targetNodeId, HintReplayCallback callback) {
Queue<Hint> hints = hintsByNode.get(targetNodeId);
if (hints == null || hints.isEmpty()) {
return;
}
int replayed = 0;
int expired = 0;
int failed = 0;
Iterator<Hint> iterator = hints.iterator();
while (iterator.hasNext()) {
Hint hint = iterator.next();
// Remove expired hints
if (hint.isExpired()) {
iterator.remove();
expired++;
deleteHintFile(targetNodeId, hint.getKey());
continue;
}
// Try to replay hint
try {
boolean success = callback.replayHint(hint);
if (success) {
iterator.remove();
replayed++;
deleteHintFile(targetNodeId, hint.getKey());
} else {
failed++;
}
} catch (Exception e) {
logger.error("Failed to replay hint for node " + targetNodeId, e);
failed++;
}
}
if (replayed > 0 || expired > 0) {
logger.info("Hint replay for node {}: replayed={}, expired={}, failed={}",
targetNodeId, replayed, expired, failed);
}
}
/**
* Replay all hints
*/
private void replayHints() {
for (String nodeId : hintsByNode.keySet()) {
// Note: In a real implementation, you would check if the node is now alive
// and only replay if it is. For now, we'll leave this as a stub.
logger.debug("Checking hints for node {}", nodeId);
}
}
/**
* Persist hint to disk
*/
private void persistHint(Hint hint) throws IOException {
Path nodeHintsDir = hintsDirectory.resolve(hint.getTargetNodeId());
Files.createDirectories(nodeHintsDir);
String filename = hint.getKey().replaceAll("[^a-zA-Z0-9.-]", "_") + "-" + hint.getTimestamp() + ".hint";
Path hintFile = nodeHintsDir.resolve(filename);
try (ObjectOutputStream oos = new ObjectOutputStream(
new BufferedOutputStream(Files.newOutputStream(hintFile)))) {
oos.writeObject(hint);
}
}
/**
* Load hints from disk
*/
private void loadHints() throws IOException {
if (!Files.exists(hintsDirectory)) {
return;
}
try (DirectoryStream<Path> nodeDirs = Files.newDirectoryStream(hintsDirectory)) {
for (Path nodeDir : nodeDirs) {
if (!Files.isDirectory(nodeDir)) {
continue;
}
String nodeId = nodeDir.getFileName().toString();
try (DirectoryStream<Path> hintFiles = Files.newDirectoryStream(nodeDir, "*.hint")) {
for (Path hintFile : hintFiles) {
try (ObjectInputStream ois = new ObjectInputStream(
new BufferedInputStream(Files.newInputStream(hintFile)))) {
Hint hint = (Hint) ois.readObject();
// Skip expired hints
if (hint.isExpired()) {
Files.deleteIfExists(hintFile);
continue;
}
Queue<Hint> hints = hintsByNode.computeIfAbsent(nodeId, k -> new ConcurrentLinkedQueue<>());
hints.add(hint);
} catch (ClassNotFoundException | IOException e) {
logger.error("Failed to load hint from " + hintFile, e);
}
}
}
}
}
int totalHints = getTotalHintCount();
if (totalHints > 0) {
logger.info("Loaded {} hints from disk", totalHints);
}
}
/**
* Delete hint file from disk
*/
private void deleteHintFile(String nodeId, String key) {
try {
Path nodeHintsDir = hintsDirectory.resolve(nodeId);
if (!Files.exists(nodeHintsDir)) {
return;
}
String keyPrefix = key.replaceAll("[^a-zA-Z0-9.-]", "_");
try (DirectoryStream<Path> files = Files.newDirectoryStream(nodeHintsDir, keyPrefix + "-*.hint")) {
for (Path file : files) {
Files.deleteIfExists(file);
}
}
} catch (IOException e) {
logger.error("Failed to delete hint file", e);
}
}
/**
* Clear all hints for a node
*/
public void clearHintsForNode(String targetNodeId) {
hintsByNode.remove(targetNodeId);
try {
Path nodeHintsDir = hintsDirectory.resolve(targetNodeId);
if (Files.exists(nodeHintsDir)) {
Files.walk(nodeHintsDir)
.sorted(Comparator.reverseOrder())
.forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
logger.error("Failed to delete " + path, e);
}
});
}
} catch (IOException e) {
logger.error("Failed to clear hints directory for " + targetNodeId, e);
}
logger.info("Cleared all hints for node {}", targetNodeId);
}
/**
* Shutdown the hint replay executor
*/
public void shutdown() {
replayExecutor.shutdown();
try {
if (!replayExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
replayExecutor.shutdownNow();
}
} catch (InterruptedException e) {
replayExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
logger.info("Hinted handoff manager shutdown");
}
/**
* Callback interface for replaying hints
*/
@FunctionalInterface
public interface HintReplayCallback {
boolean replayHint(Hint hint) throws Exception;
}
}