Newer
Older
cactus / src / main / java / com / cube / gossip / GossipProtocol.java
@agalyaramadoss agalyaramadoss on 16 Feb 20 KB version update with 21
package com.cube.gossip;

import com.cube.cluster.ClusterNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.*;
import java.util.concurrent.*;

/**
 * Gossip Protocol Implementation for Cube Database
 * 
 * Features:
 * - SWIM (Scalable Weakly-consistent Infection-style Membership) protocol
 * - Efficient failure detection
 * - Eventually consistent cluster state
 * - Configurable gossip intervals and fanout
 */
public class GossipProtocol {
    
    private static final Logger logger = LoggerFactory.getLogger(GossipProtocol.class);
    
    private final String localNodeId;
    private final ConcurrentHashMap<String, NodeState> nodeStates;
    private final GossipConfig config;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService gossipExecutor;
    private final GossipMessageHandler messageHandler;
    private final List<GossipListener> listeners;

    public String getLocalNodeId() {
        return localNodeId;
    }

    public ConcurrentHashMap<String, NodeState> getNodeStates() {
        return nodeStates;
    }

    public GossipConfig getConfig() {
        return config;
    }

    public ScheduledExecutorService getScheduler() {
        return scheduler;
    }

    public ExecutorService getGossipExecutor() {
        return gossipExecutor;
    }

    public GossipMessageHandler getMessageHandler() {
        return messageHandler;
    }

    public List<GossipListener> getListeners() {
        return listeners;
    }

    /**
     * Node state in the cluster
     */
    public static class NodeState implements Serializable {
        private final String nodeId;
        private final String host;
        private final int port;
        private volatile Status status;
        private volatile long heartbeatCounter;
        private volatile long lastUpdateTime;
        private volatile int suspicionCounter;
        private final Map<String, String> metadata;
        
        public enum Status {
            ALIVE,      // Node is healthy
            SUSPECTED,  // Node might be down
            DEAD,       // Node is confirmed down
            LEAVING,    // Node is gracefully shutting down
            JOINING     // Node is joining the cluster
        }
        
        public NodeState(String nodeId, String host, int port) {
            this.nodeId = nodeId;
            this.host = host;
            this.port = port;
            this.status = Status.JOINING;
            this.heartbeatCounter = 0;
            this.lastUpdateTime = System.currentTimeMillis();
            this.suspicionCounter = 0;
            this.metadata = new ConcurrentHashMap<>();
        }
        
        public String getNodeId() { return nodeId; }
        public String getHost() { return host; }
        public int getPort() { return port; }
        public Status getStatus() { return status; }
        public long getHeartbeatCounter() { return heartbeatCounter; }
        public long getLastUpdateTime() { return lastUpdateTime; }
        public Map<String, String> getMetadata() { return metadata; }
        
        public void setStatus(Status status) {
            this.status = status;
            this.lastUpdateTime = System.currentTimeMillis();
        }
        
        public void incrementHeartbeat() {
            this.heartbeatCounter++;
            this.lastUpdateTime = System.currentTimeMillis();
            this.suspicionCounter = 0;
        }
        
        public void incrementSuspicion() {
            this.suspicionCounter++;
            this.lastUpdateTime = System.currentTimeMillis();
        }
        
        public int getSuspicionCounter() {
            return suspicionCounter;
        }
        
        @Override
        public String toString() {
            return String.format("NodeState{id=%s, host=%s, port=%d, status=%s, heartbeat=%d}",
                nodeId, host, port, status, heartbeatCounter);
        }
    }
    
    /**
     * Gossip configuration
     */
    public static class GossipConfig {
        private final long gossipIntervalMs;      // How often to gossip
        private final int gossipFanout;           // Number of nodes to gossip to
        private final long suspicionTimeoutMs;    // Time before marking node as suspected
        private final long failureTimeoutMs;      // Time before marking node as dead
        private final int maxSuspicionCount;      // Max suspicion count before marking dead
        private final int protocolPort;           // Port for gossip communication
        
        public GossipConfig(long gossipIntervalMs, int gossipFanout, 
                           long suspicionTimeoutMs, long failureTimeoutMs,
                           int maxSuspicionCount, int protocolPort) {
            this.gossipIntervalMs = gossipIntervalMs;
            this.gossipFanout = gossipFanout;
            this.suspicionTimeoutMs = suspicionTimeoutMs;
            this.failureTimeoutMs = failureTimeoutMs;
            this.maxSuspicionCount = maxSuspicionCount;
            this.protocolPort = protocolPort;
        }
        
        public static GossipConfig defaultConfig() {
            return new GossipConfig(
                1000,    // Gossip every 1 second
                3,       // Gossip to 3 random nodes
                5000,    // Suspect after 5 seconds
                15000,   // Mark dead after 15 seconds
                3,       // Max 3 suspicions before dead
                7946     // Default gossip port
            );
        }
    }
    
    /**
     * Gossip message types
     */
    public static class GossipMessage implements Serializable {
        public enum Type {
            PING,           // Heartbeat check
            PING_REQ,       // Indirect ping request
            ACK,            // Acknowledgment
            ALIVE,          // Node is alive announcement
            SUSPECT,        // Node suspected announcement
            DEAD,           // Node dead announcement
            JOIN,           // Join request
            LEAVE,          // Leave announcement
            STATE_SYNC      // Full state synchronization
        }
        
        private final Type type;
        private final String fromNodeId;
        private final String targetNodeId;
        private final long timestamp;
        private final Map<String, NodeState> nodeStates;
        private final long heartbeatCounter;
        
        public GossipMessage(Type type, String fromNodeId) {
            this(type, fromNodeId, null, new HashMap<>(), 0);
        }
        
        public GossipMessage(Type type, String fromNodeId, String targetNodeId,
                           Map<String, NodeState> nodeStates, long heartbeatCounter) {
            this.type = type;
            this.fromNodeId = fromNodeId;
            this.targetNodeId = targetNodeId;
            this.timestamp = System.currentTimeMillis();
            this.nodeStates = nodeStates;
            this.heartbeatCounter = heartbeatCounter;
        }
        
        public Type getType() { return type; }
        public String getFromNodeId() { return fromNodeId; }
        public String getTargetNodeId() { return targetNodeId; }
        public long getTimestamp() { return timestamp; }
        public Map<String, NodeState> getNodeStates() { return nodeStates; }
        public long getHeartbeatCounter() { return heartbeatCounter; }
    }
    
    /**
     * Gossip event listener
     */
    public interface GossipListener {
        void onNodeJoined(NodeState node);
        void onNodeLeft(NodeState node);
        void onNodeSuspected(NodeState node);
        void onNodeAlive(NodeState node);
        void onNodeDead(NodeState node);
    }
    
    public GossipProtocol(String localNodeId, String host, int port, GossipConfig config) {
        this.localNodeId = localNodeId;
        this.config = config;
        this.nodeStates = new ConcurrentHashMap<>();
        this.scheduler = Executors.newScheduledThreadPool(2, r -> {
            Thread t = new Thread(r, "Gossip-Scheduler");
            t.setDaemon(true);
            return t;
        });
        this.gossipExecutor = Executors.newFixedThreadPool(config.gossipFanout, r -> {
            Thread t = new Thread(r, "Gossip-Worker");
            t.setDaemon(true);
            return t;
        });
        this.messageHandler = new GossipMessageHandler(this);
        this.listeners = new CopyOnWriteArrayList<>();
        
        // Add local node
        NodeState localNode = new NodeState(localNodeId, host, port);
        localNode.setStatus(NodeState.Status.ALIVE);
        nodeStates.put(localNodeId, localNode);
    }
    
    /**
     * Start the gossip protocol
     */
    public void start() {
        logger.info("Starting gossip protocol for node: {}", localNodeId);
        
        // Start gossip rounds
        scheduler.scheduleAtFixedRate(
            this::performGossipRound,
            0,
            config.gossipIntervalMs,
            TimeUnit.MILLISECONDS
        );
        
        // Start failure detection
        scheduler.scheduleAtFixedRate(
            this::detectFailures,
            config.suspicionTimeoutMs,
            config.suspicionTimeoutMs / 2,
            TimeUnit.MILLISECONDS
        );
        
        // Start message handler
        messageHandler.start(config.protocolPort);
        
        logger.info("Gossip protocol started on port {}", config.protocolPort);
    }
    
    /**
     * Perform one round of gossip
     */
    private void performGossipRound() {
        try {
            // Increment local heartbeat
            NodeState localNode = nodeStates.get(localNodeId);
            if (localNode != null) {
                localNode.incrementHeartbeat();
            }
            
            // Select random nodes to gossip with
            List<NodeState> targets = selectGossipTargets();
            
            if (targets.isEmpty()) {
                logger.debug("No nodes to gossip with");
                return;
            }
            
            logger.debug("Gossiping with {} nodes", targets.size());
            
            // Send gossip messages to selected nodes
            for (NodeState target : targets) {
                gossipExecutor.submit(() -> gossipWithNode(target));
            }
            
        } catch (Exception e) {
            logger.error("Error in gossip round", e);
        }
    }
    
    /**
     * Select random nodes for gossip
     */
    private List<NodeState> selectGossipTargets() {
        List<NodeState> aliveNodes = new ArrayList<>();
        
        for (NodeState node : nodeStates.values()) {
            if (!node.getNodeId().equals(localNodeId) && 
                node.getStatus() == NodeState.Status.ALIVE) {
                aliveNodes.add(node);
            }
        }
        
        // Shuffle and select fanout number of nodes
        Collections.shuffle(aliveNodes);
        return aliveNodes.subList(0, Math.min(config.gossipFanout, aliveNodes.size()));
    }
    
    /**
     * Gossip with a specific node
     */
    private void gossipWithNode(NodeState target) {
        try {
            // Create state sync message with current view
            Map<String, NodeState> statesToSend = new HashMap<>();
            for (Map.Entry<String, NodeState> entry : nodeStates.entrySet()) {
                // Only send non-dead nodes
                if (entry.getValue().getStatus() != NodeState.Status.DEAD) {
                    statesToSend.put(entry.getKey(), entry.getValue());
                }
            }
            
            GossipMessage message = new GossipMessage(
                GossipMessage.Type.STATE_SYNC,
                localNodeId,
                target.getNodeId(),
                statesToSend,
                nodeStates.get(localNodeId).getHeartbeatCounter()
            );
            
            // Send message
            messageHandler.sendMessage(target.getHost(), target.getPort(), message);
            
            logger.debug("Sent gossip to {}", target.getNodeId());
            
        } catch (Exception e) {
            logger.warn("Failed to gossip with {}: {}", target.getNodeId(), e.getMessage());
            suspectNode(target.getNodeId());
        }
    }
    
    /**
     * Detect failed nodes
     */
    private void detectFailures() {
        long now = System.currentTimeMillis();
        
        for (NodeState node : nodeStates.values()) {
            if (node.getNodeId().equals(localNodeId)) {
                continue; // Skip local node
            }
            
            long timeSinceUpdate = now - node.getLastUpdateTime();
            
            switch (node.getStatus()) {
                case ALIVE:
                    // Check if node should be suspected
                    if (timeSinceUpdate > config.suspicionTimeoutMs) {
                        suspectNode(node.getNodeId());
                    }
                    break;
                    
                case SUSPECTED:
                    // Check if node should be marked dead
                    if (timeSinceUpdate > config.failureTimeoutMs || 
                        node.getSuspicionCounter() >= config.maxSuspicionCount) {
                        markNodeDead(node.getNodeId());
                    }
                    break;
                    
                case DEAD:
                    // Remove dead nodes after some time
                    if (timeSinceUpdate > config.failureTimeoutMs * 3) {
                        removeNode(node.getNodeId());
                    }
                    break;
            }
        }
    }
    
    /**
     * Mark a node as suspected
     */
    private void suspectNode(String nodeId) {
        NodeState node = nodeStates.get(nodeId);
        if (node != null && node.getStatus() == NodeState.Status.ALIVE) {
            node.setStatus(NodeState.Status.SUSPECTED);
            node.incrementSuspicion();
            logger.warn("Node {} is now SUSPECTED", nodeId);
            notifyListeners(l -> l.onNodeSuspected(node));
        }
    }
    
    /**
     * Mark a node as dead
     */
    private void markNodeDead(String nodeId) {
        NodeState node = nodeStates.get(nodeId);
        if (node != null && node.getStatus() != NodeState.Status.DEAD) {
            node.setStatus(NodeState.Status.DEAD);
            logger.error("Node {} is now DEAD", nodeId);
            notifyListeners(l -> l.onNodeDead(node));
        }
    }
    
    /**
     * Remove a node from the cluster
     */
    private void removeNode(String nodeId) {
        NodeState node = nodeStates.remove(nodeId);
        if (node != null) {
            logger.info("Removed node {} from cluster", nodeId);
            notifyListeners(l -> l.onNodeLeft(node));
        }
    }
    
    /**
     * Merge received state with local state
     */
    public void mergeState(Map<String, NodeState> receivedStates) {
        for (Map.Entry<String, NodeState> entry : receivedStates.entrySet()) {
            String nodeId = entry.getKey();
            NodeState receivedNode = entry.getValue();
            
            NodeState localNode = nodeStates.get(nodeId);
            
            if (localNode == null) {
                // New node discovered
                nodeStates.put(nodeId, receivedNode);
                logger.info("Discovered new node: {}", receivedNode);
                notifyListeners(l -> l.onNodeJoined(receivedNode));
            } else {
                // Merge states based on heartbeat counter
                if (receivedNode.getHeartbeatCounter() > localNode.getHeartbeatCounter()) {
                    // Received state is newer
                    NodeState.Status oldStatus = localNode.getStatus();
                    localNode.setStatus(receivedNode.getStatus());
                    localNode.heartbeatCounter = receivedNode.getHeartbeatCounter();
                    localNode.lastUpdateTime = System.currentTimeMillis();
                    
                    // Notify if status changed
                    if (oldStatus != receivedNode.getStatus()) {
                        if (receivedNode.getStatus() == NodeState.Status.ALIVE) {
                            logger.info("Node {} is now ALIVE", nodeId);
                            notifyListeners(l -> l.onNodeAlive(localNode));
                        }
                    }
                }
            }
        }
    }
    
    /**
     * Join a cluster via seed nodes
     */
    public void join(List<String> seedNodes) {
        logger.info("Joining cluster via seeds: {}", seedNodes);
        
        for (String seed : seedNodes) {
            try {
                String[] parts = seed.split(":");
                String host = parts[0];
                int port = Integer.parseInt(parts[1]);
                
                // Send join message
                GossipMessage joinMsg = new GossipMessage(
                    GossipMessage.Type.JOIN,
                    localNodeId
                );
                
                messageHandler.sendMessage(host, port, joinMsg);
                logger.info("Sent join request to {}", seed);
                
            } catch (Exception e) {
                logger.error("Failed to join via seed {}: {}", seed, e.getMessage());
            }
        }
    }
    
    /**
     * Leave the cluster gracefully
     */
    public void leave() {
        logger.info("Leaving cluster gracefully");
        
        NodeState localNode = nodeStates.get(localNodeId);
        if (localNode != null) {
            localNode.setStatus(NodeState.Status.LEAVING);
        }
        
        // Announce leaving to all nodes
        GossipMessage leaveMsg = new GossipMessage(
            GossipMessage.Type.LEAVE,
            localNodeId
        );
        
        for (NodeState node : nodeStates.values()) {
            if (!node.getNodeId().equals(localNodeId)) {
                try {
                    messageHandler.sendMessage(node.getHost(), node.getPort(), leaveMsg);
                } catch (Exception e) {
                    logger.warn("Failed to send leave message to {}", node.getNodeId());
                }
            }
        }
    }
    
    /**
     * Shutdown the gossip protocol
     */
    public void shutdown() {
        logger.info("Shutting down gossip protocol");
        
        leave();
        
        scheduler.shutdown();
        gossipExecutor.shutdown();
        messageHandler.shutdown();
        
        try {
            scheduler.awaitTermination(5, TimeUnit.SECONDS);
            gossipExecutor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    /**
     * Get current cluster state
     */
    public Map<String, NodeState> getClusterState() {
        return new HashMap<>(nodeStates);
    }
    
    /**
     * Get alive nodes
     */
    public List<NodeState> getAliveNodes() {
        return nodeStates.values().stream()
            .filter(n -> n.getStatus() == NodeState.Status.ALIVE)
            .collect(java.util.stream.Collectors.toList());
    }
    
    /**
     * Add gossip listener
     */
    public void addListener(GossipListener listener) {
        listeners.add(listener);
    }
    
    /**
     * Notify all listeners
     */
    private void notifyListeners(java.util.function.Consumer<GossipListener> action) {
        for (GossipListener listener : listeners) {
            try {
                action.accept(listener);
            } catch (Exception e) {
                logger.error("Error notifying listener", e);
            }
        }
    }
    
    /**
     * Get statistics
     */
    public Map<String, Object> getStatistics() {
        Map<String, Object> stats = new HashMap<>();
        stats.put("localNodeId", localNodeId);
        stats.put("totalNodes", nodeStates.size());
        stats.put("aliveNodes", getAliveNodes().size());
        stats.put("suspectedNodes", nodeStates.values().stream()
            .filter(n -> n.getStatus() == NodeState.Status.SUSPECTED).count());
        stats.put("deadNodes", nodeStates.values().stream()
            .filter(n -> n.getStatus() == NodeState.Status.DEAD).count());
        stats.put("gossipInterval", config.gossipIntervalMs);
        stats.put("gossipFanout", config.gossipFanout);
        
        return stats;
    }
}