package com.cube.replication;

import com.cube.cluster.ClusterNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * Hinted Handoff Manager - Stores writes for temporarily unavailable nodes.
 * When a node is down, hints are stored locally and replayed when the node recovers.
 */
public class HintedHandoffManager {
    
    private static final Logger logger = LoggerFactory.getLogger(HintedHandoffManager.class);
    
    private final Path hintsDirectory;
    private final Map<String, Queue<Hint>> hintsByNode; // nodeId -> hints
    private final ScheduledExecutorService replayExecutor;
    private final int maxHintsPerNode;
    private final long hintWindowMs;
    
    public static class Hint implements Serializable {
        private static final long serialVersionUID = 1L;
        
        private final String targetNodeId;
        private final String key;
        private final byte[] value;
        private final long timestamp;
        private final long expirationTime;
        
        public Hint(String targetNodeId, String key, byte[] value, long hintWindowMs) {
            this.targetNodeId = targetNodeId;
            this.key = key;
            this.value = value;
            this.timestamp = System.currentTimeMillis();
            this.expirationTime = timestamp + hintWindowMs;
        }
        
        public String getTargetNodeId() { return targetNodeId; }
        public String getKey() { return key; }
        public byte[] getValue() { return value; }
        public long getTimestamp() { return timestamp; }
        
        public boolean isExpired() {
            return System.currentTimeMillis() > expirationTime;
        }
    }
    
    public HintedHandoffManager(String hintsDir, int maxHintsPerNode, long hintWindowMs) throws IOException {
        this.hintsDirectory = Paths.get(hintsDir);
        this.maxHintsPerNode = maxHintsPerNode;
        this.hintWindowMs = hintWindowMs;
        this.hintsByNode = new ConcurrentHashMap<>();
        
        Files.createDirectories(hintsDirectory);
        
        this.replayExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "CubeDB-HintReplay");
            t.setDaemon(true);
            return t;
        });
        
        // Load existing hints from disk
        loadHints();
        
        // Schedule periodic replay
        replayExecutor.scheduleAtFixedRate(
            this::replayHints, 
            10, 
            10, 
            TimeUnit.SECONDS
        );
        
        logger.info("Hinted handoff manager initialized");
    }
    
    /**
     * Store a hint for a temporarily unavailable node
     */
    public void storeHint(String targetNodeId, String key, byte[] value) {
        Queue<Hint> hints = hintsByNode.computeIfAbsent(targetNodeId, k -> new ConcurrentLinkedQueue<>());
        
        // Check if we've exceeded max hints for this node
        if (hints.size() >= maxHintsPerNode) {
            logger.warn("Max hints reached for node {}, dropping hint", targetNodeId);
            return;
        }
        
        Hint hint = new Hint(targetNodeId, key, value, hintWindowMs);
        hints.add(hint);
        
        // Persist hint to disk
        try {
            persistHint(hint);
        } catch (IOException e) {
            logger.error("Failed to persist hint for node " + targetNodeId, e);
        }
        
        logger.debug("Stored hint for node {} (key: {})", targetNodeId, key);
    }
    
    /**
     * Get the number of hints for a specific node
     */
    public int getHintCount(String targetNodeId) {
        Queue<Hint> hints = hintsByNode.get(targetNodeId);
        return hints != null ? hints.size() : 0;
    }
    
    /**
     * Get total number of hints across all nodes
     */
    public int getTotalHintCount() {
        return hintsByNode.values().stream()
            .mapToInt(Queue::size)
            .sum();
    }
    
    /**
     * Replay hints for a specific node
     */
    public void replayHintsForNode(String targetNodeId, HintReplayCallback callback) {
        Queue<Hint> hints = hintsByNode.get(targetNodeId);
        if (hints == null || hints.isEmpty()) {
            return;
        }
        
        int replayed = 0;
        int expired = 0;
        int failed = 0;
        
        Iterator<Hint> iterator = hints.iterator();
        while (iterator.hasNext()) {
            Hint hint = iterator.next();
            
            // Remove expired hints
            if (hint.isExpired()) {
                iterator.remove();
                expired++;
                deleteHintFile(targetNodeId, hint.getKey());
                continue;
            }
            
            // Try to replay hint
            try {
                boolean success = callback.replayHint(hint);
                if (success) {
                    iterator.remove();
                    replayed++;
                    deleteHintFile(targetNodeId, hint.getKey());
                } else {
                    failed++;
                }
            } catch (Exception e) {
                logger.error("Failed to replay hint for node " + targetNodeId, e);
                failed++;
            }
        }
        
        if (replayed > 0 || expired > 0) {
            logger.info("Hint replay for node {}: replayed={}, expired={}, failed={}", 
                targetNodeId, replayed, expired, failed);
        }
    }
    
    /**
     * Replay all hints
     */
    private void replayHints() {
        for (String nodeId : hintsByNode.keySet()) {
            // Note: In a real implementation, you would check if the node is now alive
            // and only replay if it is. For now, we'll leave this as a stub.
            logger.debug("Checking hints for node {}", nodeId);
        }
    }
    
    /**
     * Persist hint to disk
     */
    private void persistHint(Hint hint) throws IOException {
        Path nodeHintsDir = hintsDirectory.resolve(hint.getTargetNodeId());
        Files.createDirectories(nodeHintsDir);
        
        String filename = hint.getKey().replaceAll("[^a-zA-Z0-9.-]", "_") + "-" + hint.getTimestamp() + ".hint";
        Path hintFile = nodeHintsDir.resolve(filename);
        
        try (ObjectOutputStream oos = new ObjectOutputStream(
                new BufferedOutputStream(Files.newOutputStream(hintFile)))) {
            oos.writeObject(hint);
        }
    }
    
    /**
     * Load hints from disk
     */
    private void loadHints() throws IOException {
        if (!Files.exists(hintsDirectory)) {
            return;
        }
        
        try (DirectoryStream<Path> nodeDirs = Files.newDirectoryStream(hintsDirectory)) {
            for (Path nodeDir : nodeDirs) {
                if (!Files.isDirectory(nodeDir)) {
                    continue;
                }
                
                String nodeId = nodeDir.getFileName().toString();
                
                try (DirectoryStream<Path> hintFiles = Files.newDirectoryStream(nodeDir, "*.hint")) {
                    for (Path hintFile : hintFiles) {
                        try (ObjectInputStream ois = new ObjectInputStream(
                                new BufferedInputStream(Files.newInputStream(hintFile)))) {
                            
                            Hint hint = (Hint) ois.readObject();
                            
                            // Skip expired hints
                            if (hint.isExpired()) {
                                Files.deleteIfExists(hintFile);
                                continue;
                            }
                            
                            Queue<Hint> hints = hintsByNode.computeIfAbsent(nodeId, k -> new ConcurrentLinkedQueue<>());
                            hints.add(hint);
                            
                        } catch (ClassNotFoundException | IOException e) {
                            logger.error("Failed to load hint from " + hintFile, e);
                        }
                    }
                }
            }
        }
        
        int totalHints = getTotalHintCount();
        if (totalHints > 0) {
            logger.info("Loaded {} hints from disk", totalHints);
        }
    }
    
    /**
     * Delete hint file from disk
     */
    private void deleteHintFile(String nodeId, String key) {
        try {
            Path nodeHintsDir = hintsDirectory.resolve(nodeId);
            if (!Files.exists(nodeHintsDir)) {
                return;
            }
            
            String keyPrefix = key.replaceAll("[^a-zA-Z0-9.-]", "_");
            
            try (DirectoryStream<Path> files = Files.newDirectoryStream(nodeHintsDir, keyPrefix + "-*.hint")) {
                for (Path file : files) {
                    Files.deleteIfExists(file);
                }
            }
        } catch (IOException e) {
            logger.error("Failed to delete hint file", e);
        }
    }
    
    /**
     * Clear all hints for a node
     */
    public void clearHintsForNode(String targetNodeId) {
        hintsByNode.remove(targetNodeId);
        
        try {
            Path nodeHintsDir = hintsDirectory.resolve(targetNodeId);
            if (Files.exists(nodeHintsDir)) {
                Files.walk(nodeHintsDir)
                    .sorted(Comparator.reverseOrder())
                    .forEach(path -> {
                        try {
                            Files.delete(path);
                        } catch (IOException e) {
                            logger.error("Failed to delete " + path, e);
                        }
                    });
            }
        } catch (IOException e) {
            logger.error("Failed to clear hints directory for " + targetNodeId, e);
        }
        
        logger.info("Cleared all hints for node {}", targetNodeId);
    }
    
    /**
     * Shutdown the hint replay executor
     */
    public void shutdown() {
        replayExecutor.shutdown();
        try {
            if (!replayExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                replayExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            replayExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        logger.info("Hinted handoff manager shutdown");
    }
    
    /**
     * Callback interface for replaying hints
     */
    @FunctionalInterface
    public interface HintReplayCallback {
        boolean replayHint(Hint hint) throws Exception;
    }
}
