Cube database now includes a SWIM-based Gossip Protocol for distributed cluster membership and failure detection!
Gossip protocol is a decentralized, peer-to-peer communication pattern where nodes periodically exchange information with random neighbors, similar to how gossip spreads in social networks.
✅ Decentralized - No single point of failure
✅ Scalable - Works efficiently with thousands of nodes
✅ Eventually Consistent - All nodes converge to same view
✅ Fault Tolerant - Handles node failures gracefully
✅ Self-Healing - Automatically detects and recovers from failures
┌─────────────────────────────────────────────────────────┐ │ Gossip Protocol Architecture │ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Node A │◄────►│ Node B │◄────►│ Node C │ │ │ │ ALIVE │ │ ALIVE │ │ SUSPECTED│ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ ▲ ▲ ▲ │ │ │ Gossip │ Gossip │ │ │ │ Messages │ Messages │ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Node D │◄────►│ Node E │◄────►│ Node F │ │ │ │ ALIVE │ │ ALIVE │ │ DEAD │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ └─────────────────────────────────────────────────────────┘
JOINING ──┐
│
▼
ALIVE ──────► SUSPECTED ──────► DEAD ──────► REMOVED
▲ │
│ │
└──────────────┘
(Heartbeat) (Timeout) (Confirmed)
import com.cube.gossip.GossipProtocol;
import com.cube.gossip.GossipProtocol.GossipConfig;
// Create configuration
GossipConfig config = new GossipConfig(
1000, // Gossip every 1 second
3, // Gossip to 3 random nodes
5000, // Suspect after 5 seconds
15000, // Mark dead after 15 seconds
3, // Max 3 suspicions before dead
7946 // Gossip protocol port
);
// Or use defaults
GossipConfig config = GossipConfig.defaultConfig();
// Initialize gossip protocol
GossipProtocol gossip = new GossipProtocol(
"node-1", // Local node ID
"localhost", // Local host
8080, // Application port
config // Configuration
);
// Start gossip
gossip.start();
// Specify seed nodes
List<String> seeds = Arrays.asList(
"192.168.1.100:7946",
"192.168.1.101:7946",
"192.168.1.102:7946"
);
// Join the cluster
gossip.join(seeds);
// Add event listener
gossip.addListener(new GossipProtocol.GossipListener() {
@Override
public void onNodeJoined(GossipProtocol.NodeState node) {
System.out.println("Node joined: " + node.getNodeId());
}
@Override
public void onNodeLeft(GossipProtocol.NodeState node) {
System.out.println("Node left: " + node.getNodeId());
}
@Override
public void onNodeSuspected(GossipProtocol.NodeState node) {
System.out.println("Node suspected: " + node.getNodeId());
}
@Override
public void onNodeAlive(GossipProtocol.NodeState node) {
System.out.println("Node recovered: " + node.getNodeId());
}
@Override
public void onNodeDead(GossipProtocol.NodeState node) {
System.out.println("Node confirmed dead: " + node.getNodeId());
}
});
// Get all alive nodes
List<NodeState> aliveNodes = gossip.getAliveNodes();
System.out.println("Alive nodes: " + aliveNodes.size());
// Get full cluster state
Map<String, NodeState> clusterState = gossip.getClusterState();
// Get statistics
Map<String, Object> stats = gossip.getStatistics();
System.out.println("Total nodes: " + stats.get("totalNodes"));
System.out.println("Alive nodes: " + stats.get("aliveNodes"));
System.out.println("Dead nodes: " + stats.get("deadNodes"));
// Leave cluster gracefully gossip.leave(); // Shutdown gossip protocol gossip.shutdown();
Every node periodically (default: 1 second):
1. Increment local heartbeat counter 2. Select 3 random alive nodes (fanout) 3. Send current cluster state to each 4. Receive and merge their cluster states 5. Update local view of the cluster
Time 0: Node A is ALIVE │ ▼ Time 5s: No heartbeat → SUSPECTED │ ▼ Time 15s: Still no heartbeat → DEAD │ ▼ Time 45s: Remove from cluster
When receiving cluster state from another node:
For each node in received state:
If node is new:
→ Add to local cluster
→ Notify listeners (onNodeJoined)
If node exists:
Compare heartbeat counters
If received counter > local counter:
→ Update local state
→ Update status (ALIVE/SUSPECTED/DEAD)
→ Notify listeners if status changed
gossipIntervalMs = 1000; // How often to gossip (milliseconds)
gossipFanout = 3; // Number of nodes to gossip with each round
suspicionTimeoutMs = 5000; // Time before marking node as suspected failureTimeoutMs = 15000; // Time before marking node as dead
maxSuspicionCount = 3; // Number of suspicions before marking dead
package com.cube.examples;
import com.cube.gossip.GossipProtocol;
import com.cube.gossip.GossipProtocol.*;
import com.cube.storage.LSMStorageEngine;
public class CubeWithGossip {
public static void main(String[] args) throws Exception {
// Node configuration
String nodeId = args[0]; // "node-1"
String host = args[1]; // "192.168.1.100"
int appPort = Integer.parseInt(args[2]); // 8080
int gossipPort = Integer.parseInt(args[3]); // 7946
// Initialize storage
LSMStorageEngine storage = new LSMStorageEngine("/data/" + nodeId);
// Initialize gossip
GossipConfig config = GossipConfig.defaultConfig();
GossipProtocol gossip = new GossipProtocol(nodeId, host, appPort, config);
// Add cluster event handlers
gossip.addListener(new GossipListener() {
@Override
public void onNodeJoined(NodeState node) {
System.out.println("✓ Node joined: " + node.getNodeId());
// Update routing tables, redistribute data, etc.
}
@Override
public void onNodeLeft(NodeState node) {
System.out.println("✗ Node left: " + node.getNodeId());
// Remove from routing, rebalance data
}
@Override
public void onNodeSuspected(NodeState node) {
System.out.println("⚠ Node suspected: " + node.getNodeId());
// Don't route new requests, but keep existing
}
@Override
public void onNodeAlive(NodeState node) {
System.out.println("✓ Node recovered: " + node.getNodeId());
// Re-enable routing to this node
}
@Override
public void onNodeDead(NodeState node) {
System.out.println("✗ Node confirmed dead: " + node.getNodeId());
// Trigger data replication, remove from cluster
}
});
// Start gossip
gossip.start();
// Join cluster via seeds
if (args.length > 4) {
String seedsStr = args[4]; // "192.168.1.100:7946,192.168.1.101:7946"
List<String> seeds = Arrays.asList(seedsStr.split(","));
gossip.join(seeds);
}
// Monitor cluster
while (true) {
Thread.sleep(10000); // Every 10 seconds
Map<String, Object> stats = gossip.getStatistics();
System.out.println("\n=== Cluster Status ===");
System.out.println("Total nodes: " + stats.get("totalNodes"));
System.out.println("Alive nodes: " + stats.get("aliveNodes"));
System.out.println("Suspected: " + stats.get("suspectedNodes"));
System.out.println("Dead nodes: " + stats.get("deadNodes"));
List<NodeState> alive = gossip.getAliveNodes();
System.out.println("\nAlive nodes:");
for (NodeState node : alive) {
System.out.println(" - " + node.getNodeId() + " (" +
node.getHost() + ":" + node.getPort() + ")");
}
}
}
}
# Terminal 1: Start node 1 java -jar cube-db.jar \ --node-id=node-1 \ --host=192.168.1.100 \ --port=8080 \ --gossip-port=7946 # Terminal 2: Start node 2 and join java -jar cube-db.jar \ --node-id=node-2 \ --host=192.168.1.101 \ --port=8080 \ --gossip-port=7946 \ --seeds=192.168.1.100:7946 # Terminal 3: Start node 3 and join java -jar cube-db.jar \ --node-id=node-3 \ --host=192.168.1.102 \ --port=8080 \ --gossip-port=7946 \ --seeds=192.168.1.100:7946,192.168.1.101:7946
# Kill node 2 kill -9 <pid-of-node-2> # Observe on node 1: Time 0s: Node 2 stopped Time 5s: Node 2 marked as SUSPECTED Time 15s: Node 2 marked as DEAD # Restart node 2 java -jar cube-db.jar --node-id=node-2 ... --seeds=192.168.1.100:7946 # Observe: Time 0s: Node 2 sends JOIN Time 1s: Node 2 marked as ALIVE Time 2s: All nodes see node 2 as ALIVE
# Partition network between node1/node2 and node3 iptables -A INPUT -s 192.168.1.102 -j DROP # Observe: Node 1 & 2: See each other as ALIVE, node 3 as DEAD Node 3: Sees itself as ALIVE, nodes 1 & 2 as DEAD # Heal partition iptables -D INPUT -s 192.168.1.102 -j DROP # Observe: All nodes exchange state and converge to consistent view
Per node per second: = gossipFanout × messageSize Example with fanout=3, message=10KB: = 3 × 10KB = 30KB/s outbound = 30KB/s × nodes_in_cluster inbound For 100 nodes: = 3MB/s total cluster traffic
Time to detect failure: = suspicionTimeoutMs + (gossipIntervalMs × log(N)) Example with 100 nodes: = 5000ms + (1000ms × 7) = 12 seconds
Per node: = nodeCount × (nodeStateSize + heartbeatCounter + metadata) = nodeCount × ~1KB For 1000 nodes: = ~1MB memory
Symptom: Nodes frequently marked as SUSPECTED
Solutions:
suspicionTimeoutMsgossipIntervalMsSymptom: Takes too long to detect failed nodes
Solutions:
suspicionTimeoutMsgossipIntervalMsgossipFanoutSymptom: Too much bandwidth used
Solutions:
gossipFanoutgossipIntervalMs✅ Use Seed Nodes: Maintain 3-5 stable seed nodes
✅ Monitor Cluster: Track alive/dead node counts
✅ Graceful Shutdown: Always call leave() before shutdown
✅ Tune Timeouts: Based on network latency
✅ Handle Events: Implement all listener methods
✅ Test Failures: Regularly test node failures
✅ Implemented: SWIM-based gossip protocol
✅ Features: Failure detection, cluster membership, state sync
✅ Scalable: Handles thousands of nodes
✅ Fault Tolerant: Self-healing and eventually consistent
✅ Easy Integration: Simple API, event-driven
Cube database now has enterprise-grade cluster management! 🎉