Newer
Older
cactus / src / main / java / com / cube / cluster / ClusterUtils.java
@agalyaramadoss agalyaramadoss on 16 Feb 16 KB version update with 21
package com.cube.cluster;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
 * Cluster utilities for managing node topology, health monitoring, and membership.
 */
public class ClusterUtils {
    
    private static final Logger logger = LoggerFactory.getLogger(ClusterUtils.class);
    
    /**
     * Node health checker - monitors node health and updates state
     */
    public static class HealthChecker {
        private final Map<String, ClusterNode> nodes;
        private final ScheduledExecutorService scheduler;
        private final long checkIntervalMs;
        private final long timeoutMs;
        
        public HealthChecker(Map<String, ClusterNode> nodes, long checkIntervalMs, long timeoutMs) {
            this.nodes = nodes;
            this.checkIntervalMs = checkIntervalMs;
            this.timeoutMs = timeoutMs;
            this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
                Thread t = new Thread(r, "HealthChecker");
                t.setDaemon(true);
                return t;
            });
        }
        
        public void start() {
            scheduler.scheduleAtFixedRate(
                this::checkAllNodes,
                0,
                checkIntervalMs,
                TimeUnit.MILLISECONDS
            );
            logger.info("Health checker started (interval: {}ms, timeout: {}ms)", 
                checkIntervalMs, timeoutMs);
        }
        
        private void checkAllNodes() {
            long now = System.currentTimeMillis();
            
            for (ClusterNode node : nodes.values()) {
                Duration timeSinceHeartbeat = Duration.between(
                    node.getLastHeartbeat(), Instant.now());
                
                if (timeSinceHeartbeat.toMillis() > timeoutMs) {
                    if (node.getState() == ClusterNode.NodeState.ALIVE) {
                        node.setState(ClusterNode.NodeState.SUSPECTED);
                        logger.warn("Node {} marked as SUSPECTED (no heartbeat for {}ms)",
                            node.getNodeId(), timeSinceHeartbeat.toMillis());
                    } else if (node.getState() == ClusterNode.NodeState.SUSPECTED &&
                               timeSinceHeartbeat.toMillis() > timeoutMs * 2) {
                        node.setState(ClusterNode.NodeState.DEAD);
                        logger.error("Node {} marked as DEAD", node.getNodeId());
                    }
                } else if (node.getState() != ClusterNode.NodeState.ALIVE) {
                    node.setState(ClusterNode.NodeState.ALIVE);
                    logger.info("Node {} recovered to ALIVE", node.getNodeId());
                }
            }
        }
        
        public void shutdown() {
            scheduler.shutdown();
            try {
                if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                    scheduler.shutdownNow();
                }
            } catch (InterruptedException e) {
                scheduler.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
    
    /**
     * Cluster topology information
     */
    public static class Topology {
        private final List<ClusterNode> nodes;
        private final Map<String, List<ClusterNode>> datacenterNodes;
        private final Map<String, List<ClusterNode>> rackNodes;
        
        public Topology(List<ClusterNode> nodes) {
            this.nodes = new ArrayList<>(nodes);
            this.datacenterNodes = new HashMap<>();
            this.rackNodes = new HashMap<>();
            
            buildTopology();
        }
        
        private void buildTopology() {
            for (ClusterNode node : nodes) {
                // Group by datacenter
                datacenterNodes.computeIfAbsent(node.getDatacenter(), k -> new ArrayList<>())
                    .add(node);
                
                // Group by rack
                String rackKey = node.getDatacenter() + ":" + node.getRack();
                rackNodes.computeIfAbsent(rackKey, k -> new ArrayList<>())
                    .add(node);
            }
        }
        
        public List<ClusterNode> getAllNodes() {
            return Collections.unmodifiableList(nodes);
        }
        
        public List<ClusterNode> getAliveNodes() {
            return nodes.stream()
                .filter(ClusterNode::isAlive)
                .collect(Collectors.toList());
        }
        
        public List<ClusterNode> getNodesByDatacenter(String datacenter) {
            return Collections.unmodifiableList(
                datacenterNodes.getOrDefault(datacenter, Collections.emptyList()));
        }
        
        public List<ClusterNode> getNodesByRack(String datacenter, String rack) {
            String key = datacenter + ":" + rack;
            return Collections.unmodifiableList(
                rackNodes.getOrDefault(key, Collections.emptyList()));
        }
        
        public Set<String> getDatacenters() {
            return Collections.unmodifiableSet(datacenterNodes.keySet());
        }
        
        public int getTotalNodeCount() {
            return nodes.size();
        }
        
        public int getAliveNodeCount() {
            return (int) nodes.stream().filter(ClusterNode::isAlive).count();
        }
        
        public void printTopology() {
            System.out.println("\n╔════════════════════════════════════════════════════════════╗");
            System.out.println("║                   Cluster Topology                         ║");
            System.out.println("╠════════════════════════════════════════════════════════════╣");
            System.out.printf("║ Total Nodes:  %-44d ║%n", getTotalNodeCount());
            System.out.printf("║ Alive Nodes:  %-44d ║%n", getAliveNodeCount());
            System.out.printf("║ Datacenters:  %-44d ║%n", getDatacenters().size());
            System.out.println("╠════════════════════════════════════════════════════════════╣");
            
            for (String dc : getDatacenters()) {
                List<ClusterNode> dcNodes = getNodesByDatacenter(dc);
                System.out.printf("║ Datacenter: %-46s ║%n", dc);
                
                Map<String, Long> rackGroups = dcNodes.stream()
                    .collect(Collectors.groupingBy(ClusterNode::getRack, Collectors.counting()));
                
                for (Map.Entry<String, Long> entry : rackGroups.entrySet()) {
                    System.out.printf("║   Rack %-10s %-37s ║%n",
                        entry.getKey() + ":", entry.getValue() + " nodes");
                    
                    List<ClusterNode> rackNodes = getNodesByRack(dc, entry.getKey());
                    for (ClusterNode node : rackNodes) {
                        String status = node.isAlive() ? "✓" : "✗";
                        System.out.printf("║     %s %-20s %-28s ║%n",
                            status, node.getNodeId(), node.getEndpoint());
                    }
                }
            }
            
            System.out.println("╚════════════════════════════════════════════════════════════╝\n");
        }
    }
    
    /**
     * Token ring for consistent hashing
     */
    public static class TokenRing {
        private final TreeMap<Long, ClusterNode> ring;
        private final int virtualNodes;
        
        public TokenRing(List<ClusterNode> nodes, int virtualNodes) {
            this.ring = new TreeMap<>();
            this.virtualNodes = virtualNodes;
            
            for (ClusterNode node : nodes) {
                addNode(node);
            }
        }
        
        public void addNode(ClusterNode node) {
            for (int i = 0; i < virtualNodes; i++) {
                String key = node.getNodeId() + ":" + i;
                long hash = hashToLong(key);
                ring.put(hash, node);
            }
        }
        
        public void removeNode(ClusterNode node) {
            for (int i = 0; i < virtualNodes; i++) {
                String key = node.getNodeId() + ":" + i;
                long hash = hashToLong(key);
                ring.remove(hash);
            }
        }
        
        public ClusterNode getNodeForKey(String key) {
            if (ring.isEmpty()) {
                return null;
            }
            
            long hash = hashToLong(key);
            Map.Entry<Long, ClusterNode> entry = ring.ceilingEntry(hash);
            
            if (entry == null) {
                entry = ring.firstEntry();
            }
            
            return entry.getValue();
        }
        
        public List<ClusterNode> getNodesForKey(String key, int count) {
            List<ClusterNode> result = new ArrayList<>();
            Set<ClusterNode> seen = new HashSet<>();
            
            if (ring.isEmpty()) {
                return result;
            }
            
            long hash = hashToLong(key);
            
            for (Map.Entry<Long, ClusterNode> entry : ring.tailMap(hash).entrySet()) {
                ClusterNode node = entry.getValue();
                if (!seen.contains(node)) {
                    result.add(node);
                    seen.add(node);
                    if (result.size() >= count) {
                        return result;
                    }
                }
            }
            
            // Wrap around
            for (Map.Entry<Long, ClusterNode> entry : ring.headMap(hash).entrySet()) {
                ClusterNode node = entry.getValue();
                if (!seen.contains(node)) {
                    result.add(node);
                    seen.add(node);
                    if (result.size() >= count) {
                        return result;
                    }
                }
            }
            
            return result;
        }
        
        private long hashToLong(String key) {
            return (long) key.hashCode() & 0xffffffffL;
        }
        
        public int size() {
            return ring.size();
        }
        
        public void printRing() {
            System.out.println("\n╔════════════════════════════════════════════════════════════╗");
            System.out.println("║                     Token Ring                             ║");
            System.out.println("╠════════════════════════════════════════════════════════════╣");
            System.out.printf("║ Virtual Nodes per Node: %-34d ║%n", virtualNodes);
            System.out.printf("║ Total Tokens:           %-34d ║%n", ring.size());
            System.out.println("╠════════════════════════════════════════════════════════════╣");
            
            Map<ClusterNode, Long> nodeCounts = new HashMap<>();
            for (ClusterNode node : ring.values()) {
                nodeCounts.put(node, nodeCounts.getOrDefault(node, 0L) + 1);
            }
            
            for (Map.Entry<ClusterNode, Long> entry : nodeCounts.entrySet()) {
                System.out.printf("║ %-30s %27d ║%n",
                    entry.getKey().getNodeId(), entry.getValue());
            }
            
            System.out.println("╚════════════════════════════════════════════════════════════╝\n");
        }
    }
    
    /**
     * Cluster statistics aggregator
     */
    public static class StatsAggregator {
        
        public static Map<String, Object> aggregateClusterStats(List<ClusterNode> nodes) {
            Map<String, Object> stats = new HashMap<>();
            
            stats.put("totalNodes", nodes.size());
            stats.put("aliveNodes", nodes.stream().filter(ClusterNode::isAlive).count());
            stats.put("deadNodes", nodes.stream()
                .filter(n -> n.getState() == ClusterNode.NodeState.DEAD).count());
            stats.put("suspectedNodes", nodes.stream()
                .filter(n -> n.getState() == ClusterNode.NodeState.SUSPECTED).count());
            
            Map<String, Long> dcDistribution = nodes.stream()
                .collect(Collectors.groupingBy(ClusterNode::getDatacenter, Collectors.counting()));
            stats.put("datacenterDistribution", dcDistribution);
            
            Map<String, Long> stateDistribution = nodes.stream()
                .collect(Collectors.groupingBy(n -> n.getState().name(), Collectors.counting()));
            stats.put("stateDistribution", stateDistribution);
            
            return stats;
        }
        
        public static void printClusterStats(Map<String, Object> stats) {
            System.out.println("\n╔════════════════════════════════════════════════════════════╗");
            System.out.println("║                  Cluster Statistics                        ║");
            System.out.println("╠════════════════════════════════════════════════════════════╣");
            System.out.printf("║ Total Nodes:      %-40d ║%n", stats.get("totalNodes"));
            System.out.printf("║ Alive Nodes:      %-40d ║%n", stats.get("aliveNodes"));
            System.out.printf("║ Dead Nodes:       %-40d ║%n", stats.get("deadNodes"));
            System.out.printf("║ Suspected Nodes:  %-40d ║%n", stats.get("suspectedNodes"));
            
            @SuppressWarnings("unchecked")
            Map<String, Long> dcDist = (Map<String, Long>) stats.get("datacenterDistribution");
            if (!dcDist.isEmpty()) {
                System.out.println("╠════════════════════════════════════════════════════════════╣");
                System.out.println("║ Datacenter Distribution:                                   ║");
                for (Map.Entry<String, Long> entry : dcDist.entrySet()) {
                    System.out.printf("║   %-20s %-35s ║%n",
                        entry.getKey() + ":", entry.getValue() + " nodes");
                }
            }
            
            System.out.println("╚════════════════════════════════════════════════════════════╝\n");
        }
    }
    
    /**
     * Node discovery helper
     */
    public static class NodeDiscovery {
        
        /**
         * Discover nodes from a seed list
         */
        public static List<ClusterNode> discoverFromSeeds(List<String> seedAddresses) {
            List<ClusterNode> discovered = new ArrayList<>();
            
            for (String seed : seedAddresses) {
                String[] parts = seed.split(":");
                if (parts.length != 2) {
                    logger.warn("Invalid seed address: {}", seed);
                    continue;
                }
                
                try {
                    String host = parts[0];
                    int port = Integer.parseInt(parts[1]);
                    
                    String nodeId = "node-" + host + "-" + port;
                    ClusterNode node = new ClusterNode(nodeId, host, port);
                    discovered.add(node);
                    
                    logger.info("Discovered node: {} ({}:{})", nodeId, host, port);
                } catch (NumberFormatException e) {
                    logger.error("Invalid port in seed address: {}", seed);
                }
            }
            
            return discovered;
        }
        
        /**
         * Generate seed list from nodes
         */
        public static List<String> generateSeedList(List<ClusterNode> nodes) {
            return nodes.stream()
                .map(n -> n.getHost() + ":" + n.getPort())
                .collect(Collectors.toList());
        }
    }
}