package com.cube.gossip;
import com.cube.cluster.ClusterNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.*;
/**
* Gossip Protocol Implementation for Cube Database
*
* Features:
* - SWIM (Scalable Weakly-consistent Infection-style Membership) protocol
* - Efficient failure detection
* - Eventually consistent cluster state
* - Configurable gossip intervals and fanout
*/
public class GossipProtocol {
private static final Logger logger = LoggerFactory.getLogger(GossipProtocol.class);
private final String localNodeId;
private final ConcurrentHashMap<String, NodeState> nodeStates;
private final GossipConfig config;
private final ScheduledExecutorService scheduler;
private final ExecutorService gossipExecutor;
private final GossipMessageHandler messageHandler;
private final List<GossipListener> listeners;
public String getLocalNodeId() {
return localNodeId;
}
public ConcurrentHashMap<String, NodeState> getNodeStates() {
return nodeStates;
}
public GossipConfig getConfig() {
return config;
}
public ScheduledExecutorService getScheduler() {
return scheduler;
}
public ExecutorService getGossipExecutor() {
return gossipExecutor;
}
public GossipMessageHandler getMessageHandler() {
return messageHandler;
}
public List<GossipListener> getListeners() {
return listeners;
}
/**
* Node state in the cluster
*/
public static class NodeState implements Serializable {
private final String nodeId;
private final String host;
private final int port;
private volatile Status status;
private volatile long heartbeatCounter;
private volatile long lastUpdateTime;
private volatile int suspicionCounter;
private final Map<String, String> metadata;
public enum Status {
ALIVE, // Node is healthy
SUSPECTED, // Node might be down
DEAD, // Node is confirmed down
LEAVING, // Node is gracefully shutting down
JOINING // Node is joining the cluster
}
public NodeState(String nodeId, String host, int port) {
this.nodeId = nodeId;
this.host = host;
this.port = port;
this.status = Status.JOINING;
this.heartbeatCounter = 0;
this.lastUpdateTime = System.currentTimeMillis();
this.suspicionCounter = 0;
this.metadata = new ConcurrentHashMap<>();
}
public String getNodeId() { return nodeId; }
public String getHost() { return host; }
public int getPort() { return port; }
public Status getStatus() { return status; }
public long getHeartbeatCounter() { return heartbeatCounter; }
public long getLastUpdateTime() { return lastUpdateTime; }
public Map<String, String> getMetadata() { return metadata; }
public void setStatus(Status status) {
this.status = status;
this.lastUpdateTime = System.currentTimeMillis();
}
public void incrementHeartbeat() {
this.heartbeatCounter++;
this.lastUpdateTime = System.currentTimeMillis();
this.suspicionCounter = 0;
}
public void incrementSuspicion() {
this.suspicionCounter++;
this.lastUpdateTime = System.currentTimeMillis();
}
public int getSuspicionCounter() {
return suspicionCounter;
}
@Override
public String toString() {
return String.format("NodeState{id=%s, host=%s, port=%d, status=%s, heartbeat=%d}",
nodeId, host, port, status, heartbeatCounter);
}
}
/**
* Gossip configuration
*/
public static class GossipConfig {
private final long gossipIntervalMs; // How often to gossip
private final int gossipFanout; // Number of nodes to gossip to
private final long suspicionTimeoutMs; // Time before marking node as suspected
private final long failureTimeoutMs; // Time before marking node as dead
private final int maxSuspicionCount; // Max suspicion count before marking dead
private final int protocolPort; // Port for gossip communication
public GossipConfig(long gossipIntervalMs, int gossipFanout,
long suspicionTimeoutMs, long failureTimeoutMs,
int maxSuspicionCount, int protocolPort) {
this.gossipIntervalMs = gossipIntervalMs;
this.gossipFanout = gossipFanout;
this.suspicionTimeoutMs = suspicionTimeoutMs;
this.failureTimeoutMs = failureTimeoutMs;
this.maxSuspicionCount = maxSuspicionCount;
this.protocolPort = protocolPort;
}
public static GossipConfig defaultConfig() {
return new GossipConfig(
1000, // Gossip every 1 second
3, // Gossip to 3 random nodes
5000, // Suspect after 5 seconds
15000, // Mark dead after 15 seconds
3, // Max 3 suspicions before dead
7946 // Default gossip port
);
}
}
/**
* Gossip message types
*/
public static class GossipMessage implements Serializable {
public enum Type {
PING, // Heartbeat check
PING_REQ, // Indirect ping request
ACK, // Acknowledgment
ALIVE, // Node is alive announcement
SUSPECT, // Node suspected announcement
DEAD, // Node dead announcement
JOIN, // Join request
LEAVE, // Leave announcement
STATE_SYNC // Full state synchronization
}
private final Type type;
private final String fromNodeId;
private final String targetNodeId;
private final long timestamp;
private final Map<String, NodeState> nodeStates;
private final long heartbeatCounter;
public GossipMessage(Type type, String fromNodeId) {
this(type, fromNodeId, null, new HashMap<>(), 0);
}
public GossipMessage(Type type, String fromNodeId, String targetNodeId,
Map<String, NodeState> nodeStates, long heartbeatCounter) {
this.type = type;
this.fromNodeId = fromNodeId;
this.targetNodeId = targetNodeId;
this.timestamp = System.currentTimeMillis();
this.nodeStates = nodeStates;
this.heartbeatCounter = heartbeatCounter;
}
public Type getType() { return type; }
public String getFromNodeId() { return fromNodeId; }
public String getTargetNodeId() { return targetNodeId; }
public long getTimestamp() { return timestamp; }
public Map<String, NodeState> getNodeStates() { return nodeStates; }
public long getHeartbeatCounter() { return heartbeatCounter; }
}
/**
* Gossip event listener
*/
public interface GossipListener {
void onNodeJoined(NodeState node);
void onNodeLeft(NodeState node);
void onNodeSuspected(NodeState node);
void onNodeAlive(NodeState node);
void onNodeDead(NodeState node);
}
public GossipProtocol(String localNodeId, String host, int port, GossipConfig config) {
this.localNodeId = localNodeId;
this.config = config;
this.nodeStates = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(2, r -> {
Thread t = new Thread(r, "Gossip-Scheduler");
t.setDaemon(true);
return t;
});
this.gossipExecutor = Executors.newFixedThreadPool(config.gossipFanout, r -> {
Thread t = new Thread(r, "Gossip-Worker");
t.setDaemon(true);
return t;
});
this.messageHandler = new GossipMessageHandler(this);
this.listeners = new CopyOnWriteArrayList<>();
// Add local node
NodeState localNode = new NodeState(localNodeId, host, port);
localNode.setStatus(NodeState.Status.ALIVE);
nodeStates.put(localNodeId, localNode);
}
/**
* Start the gossip protocol
*/
public void start() {
logger.info("Starting gossip protocol for node: {}", localNodeId);
// Start gossip rounds
scheduler.scheduleAtFixedRate(
this::performGossipRound,
0,
config.gossipIntervalMs,
TimeUnit.MILLISECONDS
);
// Start failure detection
scheduler.scheduleAtFixedRate(
this::detectFailures,
config.suspicionTimeoutMs,
config.suspicionTimeoutMs / 2,
TimeUnit.MILLISECONDS
);
// Start message handler
messageHandler.start(config.protocolPort);
logger.info("Gossip protocol started on port {}", config.protocolPort);
}
/**
* Perform one round of gossip
*/
private void performGossipRound() {
try {
// Increment local heartbeat
NodeState localNode = nodeStates.get(localNodeId);
if (localNode != null) {
localNode.incrementHeartbeat();
}
// Select random nodes to gossip with
List<NodeState> targets = selectGossipTargets();
if (targets.isEmpty()) {
logger.debug("No nodes to gossip with");
return;
}
logger.debug("Gossiping with {} nodes", targets.size());
// Send gossip messages to selected nodes
for (NodeState target : targets) {
gossipExecutor.submit(() -> gossipWithNode(target));
}
} catch (Exception e) {
logger.error("Error in gossip round", e);
}
}
/**
* Select random nodes for gossip
*/
private List<NodeState> selectGossipTargets() {
List<NodeState> aliveNodes = new ArrayList<>();
for (NodeState node : nodeStates.values()) {
if (!node.getNodeId().equals(localNodeId) &&
node.getStatus() == NodeState.Status.ALIVE) {
aliveNodes.add(node);
}
}
// Shuffle and select fanout number of nodes
Collections.shuffle(aliveNodes);
return aliveNodes.subList(0, Math.min(config.gossipFanout, aliveNodes.size()));
}
/**
* Gossip with a specific node
*/
private void gossipWithNode(NodeState target) {
try {
// Create state sync message with current view
Map<String, NodeState> statesToSend = new HashMap<>();
for (Map.Entry<String, NodeState> entry : nodeStates.entrySet()) {
// Only send non-dead nodes
if (entry.getValue().getStatus() != NodeState.Status.DEAD) {
statesToSend.put(entry.getKey(), entry.getValue());
}
}
GossipMessage message = new GossipMessage(
GossipMessage.Type.STATE_SYNC,
localNodeId,
target.getNodeId(),
statesToSend,
nodeStates.get(localNodeId).getHeartbeatCounter()
);
// Send message
messageHandler.sendMessage(target.getHost(), target.getPort(), message);
logger.debug("Sent gossip to {}", target.getNodeId());
} catch (Exception e) {
logger.warn("Failed to gossip with {}: {}", target.getNodeId(), e.getMessage());
suspectNode(target.getNodeId());
}
}
/**
* Detect failed nodes
*/
private void detectFailures() {
long now = System.currentTimeMillis();
for (NodeState node : nodeStates.values()) {
if (node.getNodeId().equals(localNodeId)) {
continue; // Skip local node
}
long timeSinceUpdate = now - node.getLastUpdateTime();
switch (node.getStatus()) {
case ALIVE:
// Check if node should be suspected
if (timeSinceUpdate > config.suspicionTimeoutMs) {
suspectNode(node.getNodeId());
}
break;
case SUSPECTED:
// Check if node should be marked dead
if (timeSinceUpdate > config.failureTimeoutMs ||
node.getSuspicionCounter() >= config.maxSuspicionCount) {
markNodeDead(node.getNodeId());
}
break;
case DEAD:
// Remove dead nodes after some time
if (timeSinceUpdate > config.failureTimeoutMs * 3) {
removeNode(node.getNodeId());
}
break;
}
}
}
/**
* Mark a node as suspected
*/
private void suspectNode(String nodeId) {
NodeState node = nodeStates.get(nodeId);
if (node != null && node.getStatus() == NodeState.Status.ALIVE) {
node.setStatus(NodeState.Status.SUSPECTED);
node.incrementSuspicion();
logger.warn("Node {} is now SUSPECTED", nodeId);
notifyListeners(l -> l.onNodeSuspected(node));
}
}
/**
* Mark a node as dead
*/
private void markNodeDead(String nodeId) {
NodeState node = nodeStates.get(nodeId);
if (node != null && node.getStatus() != NodeState.Status.DEAD) {
node.setStatus(NodeState.Status.DEAD);
logger.error("Node {} is now DEAD", nodeId);
notifyListeners(l -> l.onNodeDead(node));
}
}
/**
* Remove a node from the cluster
*/
private void removeNode(String nodeId) {
NodeState node = nodeStates.remove(nodeId);
if (node != null) {
logger.info("Removed node {} from cluster", nodeId);
notifyListeners(l -> l.onNodeLeft(node));
}
}
/**
* Merge received state with local state
*/
public void mergeState(Map<String, NodeState> receivedStates) {
for (Map.Entry<String, NodeState> entry : receivedStates.entrySet()) {
String nodeId = entry.getKey();
NodeState receivedNode = entry.getValue();
NodeState localNode = nodeStates.get(nodeId);
if (localNode == null) {
// New node discovered
nodeStates.put(nodeId, receivedNode);
logger.info("Discovered new node: {}", receivedNode);
notifyListeners(l -> l.onNodeJoined(receivedNode));
} else {
// Merge states based on heartbeat counter
if (receivedNode.getHeartbeatCounter() > localNode.getHeartbeatCounter()) {
// Received state is newer
NodeState.Status oldStatus = localNode.getStatus();
localNode.setStatus(receivedNode.getStatus());
localNode.heartbeatCounter = receivedNode.getHeartbeatCounter();
localNode.lastUpdateTime = System.currentTimeMillis();
// Notify if status changed
if (oldStatus != receivedNode.getStatus()) {
if (receivedNode.getStatus() == NodeState.Status.ALIVE) {
logger.info("Node {} is now ALIVE", nodeId);
notifyListeners(l -> l.onNodeAlive(localNode));
}
}
}
}
}
}
/**
* Join a cluster via seed nodes
*/
public void join(List<String> seedNodes) {
logger.info("Joining cluster via seeds: {}", seedNodes);
for (String seed : seedNodes) {
try {
String[] parts = seed.split(":");
String host = parts[0];
int port = Integer.parseInt(parts[1]);
// Send join message
GossipMessage joinMsg = new GossipMessage(
GossipMessage.Type.JOIN,
localNodeId
);
messageHandler.sendMessage(host, port, joinMsg);
logger.info("Sent join request to {}", seed);
} catch (Exception e) {
logger.error("Failed to join via seed {}: {}", seed, e.getMessage());
}
}
}
/**
* Leave the cluster gracefully
*/
public void leave() {
logger.info("Leaving cluster gracefully");
NodeState localNode = nodeStates.get(localNodeId);
if (localNode != null) {
localNode.setStatus(NodeState.Status.LEAVING);
}
// Announce leaving to all nodes
GossipMessage leaveMsg = new GossipMessage(
GossipMessage.Type.LEAVE,
localNodeId
);
for (NodeState node : nodeStates.values()) {
if (!node.getNodeId().equals(localNodeId)) {
try {
messageHandler.sendMessage(node.getHost(), node.getPort(), leaveMsg);
} catch (Exception e) {
logger.warn("Failed to send leave message to {}", node.getNodeId());
}
}
}
}
/**
* Shutdown the gossip protocol
*/
public void shutdown() {
logger.info("Shutting down gossip protocol");
leave();
scheduler.shutdown();
gossipExecutor.shutdown();
messageHandler.shutdown();
try {
scheduler.awaitTermination(5, TimeUnit.SECONDS);
gossipExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Get current cluster state
*/
public Map<String, NodeState> getClusterState() {
return new HashMap<>(nodeStates);
}
/**
* Get alive nodes
*/
public List<NodeState> getAliveNodes() {
return nodeStates.values().stream()
.filter(n -> n.getStatus() == NodeState.Status.ALIVE)
.collect(java.util.stream.Collectors.toList());
}
/**
* Add gossip listener
*/
public void addListener(GossipListener listener) {
listeners.add(listener);
}
/**
* Notify all listeners
*/
private void notifyListeners(java.util.function.Consumer<GossipListener> action) {
for (GossipListener listener : listeners) {
try {
action.accept(listener);
} catch (Exception e) {
logger.error("Error notifying listener", e);
}
}
}
/**
* Get statistics
*/
public Map<String, Object> getStatistics() {
Map<String, Object> stats = new HashMap<>();
stats.put("localNodeId", localNodeId);
stats.put("totalNodes", nodeStates.size());
stats.put("aliveNodes", getAliveNodes().size());
stats.put("suspectedNodes", nodeStates.values().stream()
.filter(n -> n.getStatus() == NodeState.Status.SUSPECTED).count());
stats.put("deadNodes", nodeStates.values().stream()
.filter(n -> n.getStatus() == NodeState.Status.DEAD).count());
stats.put("gossipInterval", config.gossipIntervalMs);
stats.put("gossipFanout", config.gossipFanout);
return stats;
}
}