package com.cube.cluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* Cluster utilities for managing node topology, health monitoring, and membership.
*/
public class ClusterUtils {
private static final Logger logger = LoggerFactory.getLogger(ClusterUtils.class);
/**
* Node health checker - monitors node health and updates state
*/
public static class HealthChecker {
private final Map<String, ClusterNode> nodes;
private final ScheduledExecutorService scheduler;
private final long checkIntervalMs;
private final long timeoutMs;
public HealthChecker(Map<String, ClusterNode> nodes, long checkIntervalMs, long timeoutMs) {
this.nodes = nodes;
this.checkIntervalMs = checkIntervalMs;
this.timeoutMs = timeoutMs;
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "HealthChecker");
t.setDaemon(true);
return t;
});
}
public void start() {
scheduler.scheduleAtFixedRate(
this::checkAllNodes,
0,
checkIntervalMs,
TimeUnit.MILLISECONDS
);
logger.info("Health checker started (interval: {}ms, timeout: {}ms)",
checkIntervalMs, timeoutMs);
}
private void checkAllNodes() {
long now = System.currentTimeMillis();
for (ClusterNode node : nodes.values()) {
Duration timeSinceHeartbeat = Duration.between(
node.getLastHeartbeat(), Instant.now());
if (timeSinceHeartbeat.toMillis() > timeoutMs) {
if (node.getState() == ClusterNode.NodeState.ALIVE) {
node.setState(ClusterNode.NodeState.SUSPECTED);
logger.warn("Node {} marked as SUSPECTED (no heartbeat for {}ms)",
node.getNodeId(), timeSinceHeartbeat.toMillis());
} else if (node.getState() == ClusterNode.NodeState.SUSPECTED &&
timeSinceHeartbeat.toMillis() > timeoutMs * 2) {
node.setState(ClusterNode.NodeState.DEAD);
logger.error("Node {} marked as DEAD", node.getNodeId());
}
} else if (node.getState() != ClusterNode.NodeState.ALIVE) {
node.setState(ClusterNode.NodeState.ALIVE);
logger.info("Node {} recovered to ALIVE", node.getNodeId());
}
}
}
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
/**
* Cluster topology information
*/
public static class Topology {
private final List<ClusterNode> nodes;
private final Map<String, List<ClusterNode>> datacenterNodes;
private final Map<String, List<ClusterNode>> rackNodes;
public Topology(List<ClusterNode> nodes) {
this.nodes = new ArrayList<>(nodes);
this.datacenterNodes = new HashMap<>();
this.rackNodes = new HashMap<>();
buildTopology();
}
private void buildTopology() {
for (ClusterNode node : nodes) {
// Group by datacenter
datacenterNodes.computeIfAbsent(node.getDatacenter(), k -> new ArrayList<>())
.add(node);
// Group by rack
String rackKey = node.getDatacenter() + ":" + node.getRack();
rackNodes.computeIfAbsent(rackKey, k -> new ArrayList<>())
.add(node);
}
}
public List<ClusterNode> getAllNodes() {
return Collections.unmodifiableList(nodes);
}
public List<ClusterNode> getAliveNodes() {
return nodes.stream()
.filter(ClusterNode::isAlive)
.collect(Collectors.toList());
}
public List<ClusterNode> getNodesByDatacenter(String datacenter) {
return Collections.unmodifiableList(
datacenterNodes.getOrDefault(datacenter, Collections.emptyList()));
}
public List<ClusterNode> getNodesByRack(String datacenter, String rack) {
String key = datacenter + ":" + rack;
return Collections.unmodifiableList(
rackNodes.getOrDefault(key, Collections.emptyList()));
}
public Set<String> getDatacenters() {
return Collections.unmodifiableSet(datacenterNodes.keySet());
}
public int getTotalNodeCount() {
return nodes.size();
}
public int getAliveNodeCount() {
return (int) nodes.stream().filter(ClusterNode::isAlive).count();
}
public void printTopology() {
System.out.println("\n╔════════════════════════════════════════════════════════════╗");
System.out.println("║ Cluster Topology ║");
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.printf("║ Total Nodes: %-44d ║%n", getTotalNodeCount());
System.out.printf("║ Alive Nodes: %-44d ║%n", getAliveNodeCount());
System.out.printf("║ Datacenters: %-44d ║%n", getDatacenters().size());
System.out.println("╠════════════════════════════════════════════════════════════╣");
for (String dc : getDatacenters()) {
List<ClusterNode> dcNodes = getNodesByDatacenter(dc);
System.out.printf("║ Datacenter: %-46s ║%n", dc);
Map<String, Long> rackGroups = dcNodes.stream()
.collect(Collectors.groupingBy(ClusterNode::getRack, Collectors.counting()));
for (Map.Entry<String, Long> entry : rackGroups.entrySet()) {
System.out.printf("║ Rack %-10s %-37s ║%n",
entry.getKey() + ":", entry.getValue() + " nodes");
List<ClusterNode> rackNodes = getNodesByRack(dc, entry.getKey());
for (ClusterNode node : rackNodes) {
String status = node.isAlive() ? "✓" : "✗";
System.out.printf("║ %s %-20s %-28s ║%n",
status, node.getNodeId(), node.getEndpoint());
}
}
}
System.out.println("╚════════════════════════════════════════════════════════════╝\n");
}
}
/**
* Token ring for consistent hashing
*/
public static class TokenRing {
private final TreeMap<Long, ClusterNode> ring;
private final int virtualNodes;
public TokenRing(List<ClusterNode> nodes, int virtualNodes) {
this.ring = new TreeMap<>();
this.virtualNodes = virtualNodes;
for (ClusterNode node : nodes) {
addNode(node);
}
}
public void addNode(ClusterNode node) {
for (int i = 0; i < virtualNodes; i++) {
String key = node.getNodeId() + ":" + i;
long hash = hashToLong(key);
ring.put(hash, node);
}
}
public void removeNode(ClusterNode node) {
for (int i = 0; i < virtualNodes; i++) {
String key = node.getNodeId() + ":" + i;
long hash = hashToLong(key);
ring.remove(hash);
}
}
public ClusterNode getNodeForKey(String key) {
if (ring.isEmpty()) {
return null;
}
long hash = hashToLong(key);
Map.Entry<Long, ClusterNode> entry = ring.ceilingEntry(hash);
if (entry == null) {
entry = ring.firstEntry();
}
return entry.getValue();
}
public List<ClusterNode> getNodesForKey(String key, int count) {
List<ClusterNode> result = new ArrayList<>();
Set<ClusterNode> seen = new HashSet<>();
if (ring.isEmpty()) {
return result;
}
long hash = hashToLong(key);
for (Map.Entry<Long, ClusterNode> entry : ring.tailMap(hash).entrySet()) {
ClusterNode node = entry.getValue();
if (!seen.contains(node)) {
result.add(node);
seen.add(node);
if (result.size() >= count) {
return result;
}
}
}
// Wrap around
for (Map.Entry<Long, ClusterNode> entry : ring.headMap(hash).entrySet()) {
ClusterNode node = entry.getValue();
if (!seen.contains(node)) {
result.add(node);
seen.add(node);
if (result.size() >= count) {
return result;
}
}
}
return result;
}
private long hashToLong(String key) {
return (long) key.hashCode() & 0xffffffffL;
}
public int size() {
return ring.size();
}
public void printRing() {
System.out.println("\n╔════════════════════════════════════════════════════════════╗");
System.out.println("║ Token Ring ║");
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.printf("║ Virtual Nodes per Node: %-34d ║%n", virtualNodes);
System.out.printf("║ Total Tokens: %-34d ║%n", ring.size());
System.out.println("╠════════════════════════════════════════════════════════════╣");
Map<ClusterNode, Long> nodeCounts = new HashMap<>();
for (ClusterNode node : ring.values()) {
nodeCounts.put(node, nodeCounts.getOrDefault(node, 0L) + 1);
}
for (Map.Entry<ClusterNode, Long> entry : nodeCounts.entrySet()) {
System.out.printf("║ %-30s %27d ║%n",
entry.getKey().getNodeId(), entry.getValue());
}
System.out.println("╚════════════════════════════════════════════════════════════╝\n");
}
}
/**
* Cluster statistics aggregator
*/
public static class StatsAggregator {
public static Map<String, Object> aggregateClusterStats(List<ClusterNode> nodes) {
Map<String, Object> stats = new HashMap<>();
stats.put("totalNodes", nodes.size());
stats.put("aliveNodes", nodes.stream().filter(ClusterNode::isAlive).count());
stats.put("deadNodes", nodes.stream()
.filter(n -> n.getState() == ClusterNode.NodeState.DEAD).count());
stats.put("suspectedNodes", nodes.stream()
.filter(n -> n.getState() == ClusterNode.NodeState.SUSPECTED).count());
Map<String, Long> dcDistribution = nodes.stream()
.collect(Collectors.groupingBy(ClusterNode::getDatacenter, Collectors.counting()));
stats.put("datacenterDistribution", dcDistribution);
Map<String, Long> stateDistribution = nodes.stream()
.collect(Collectors.groupingBy(n -> n.getState().name(), Collectors.counting()));
stats.put("stateDistribution", stateDistribution);
return stats;
}
public static void printClusterStats(Map<String, Object> stats) {
System.out.println("\n╔════════════════════════════════════════════════════════════╗");
System.out.println("║ Cluster Statistics ║");
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.printf("║ Total Nodes: %-40d ║%n", stats.get("totalNodes"));
System.out.printf("║ Alive Nodes: %-40d ║%n", stats.get("aliveNodes"));
System.out.printf("║ Dead Nodes: %-40d ║%n", stats.get("deadNodes"));
System.out.printf("║ Suspected Nodes: %-40d ║%n", stats.get("suspectedNodes"));
@SuppressWarnings("unchecked")
Map<String, Long> dcDist = (Map<String, Long>) stats.get("datacenterDistribution");
if (!dcDist.isEmpty()) {
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.println("║ Datacenter Distribution: ║");
for (Map.Entry<String, Long> entry : dcDist.entrySet()) {
System.out.printf("║ %-20s %-35s ║%n",
entry.getKey() + ":", entry.getValue() + " nodes");
}
}
System.out.println("╚════════════════════════════════════════════════════════════╝\n");
}
}
/**
* Node discovery helper
*/
public static class NodeDiscovery {
/**
* Discover nodes from a seed list
*/
public static List<ClusterNode> discoverFromSeeds(List<String> seedAddresses) {
List<ClusterNode> discovered = new ArrayList<>();
for (String seed : seedAddresses) {
String[] parts = seed.split(":");
if (parts.length != 2) {
logger.warn("Invalid seed address: {}", seed);
continue;
}
try {
String host = parts[0];
int port = Integer.parseInt(parts[1]);
String nodeId = "node-" + host + "-" + port;
ClusterNode node = new ClusterNode(nodeId, host, port);
discovered.add(node);
logger.info("Discovered node: {} ({}:{})", nodeId, host, port);
} catch (NumberFormatException e) {
logger.error("Invalid port in seed address: {}", seed);
}
}
return discovered;
}
/**
* Generate seed list from nodes
*/
public static List<String> generateSeedList(List<ClusterNode> nodes) {
return nodes.stream()
.map(n -> n.getHost() + ":" + n.getPort())
.collect(Collectors.toList());
}
}
}