Newer
Older
cactus / GOSSIP_PROTOCOL_GUIDE.md
@agalyaramadoss agalyaramadoss on 16 Feb 14 KB added document

Cube Database - Gossip Protocol Implementation

Overview

Cube database now includes a SWIM-based Gossip Protocol for distributed cluster membership and failure detection!

What is Gossip Protocol?

Gossip protocol is a decentralized, peer-to-peer communication pattern where nodes periodically exchange information with random neighbors, similar to how gossip spreads in social networks.

Key Benefits:

Decentralized - No single point of failure
Scalable - Works efficiently with thousands of nodes
Eventually Consistent - All nodes converge to same view
Fault Tolerant - Handles node failures gracefully
Self-Healing - Automatically detects and recovers from failures


Architecture

SWIM Protocol (Scalable Weakly-consistent Infection-style Membership)

┌─────────────────────────────────────────────────────────┐
│              Gossip Protocol Architecture               │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────┐      ┌─────────┐      ┌─────────┐        │
│  │ Node A  │◄────►│ Node B  │◄────►│ Node C  │        │
│  │ ALIVE   │      │ ALIVE   │      │ SUSPECTED│        │
│  └─────────┘      └─────────┘      └─────────┘        │
│       ▲                ▲                 ▲             │
│       │    Gossip      │     Gossip      │             │
│       │   Messages     │    Messages     │             │
│       │                │                 │             │
│       ▼                ▼                 ▼             │
│  ┌─────────┐      ┌─────────┐      ┌─────────┐        │
│  │ Node D  │◄────►│ Node E  │◄────►│ Node F  │        │
│  │ ALIVE   │      │ ALIVE   │      │  DEAD   │        │
│  └─────────┘      └─────────┘      └─────────┘        │
│                                                         │
└─────────────────────────────────────────────────────────┘

Node States

JOINING ──┐
          │
          ▼
        ALIVE ──────► SUSPECTED ──────► DEAD ──────► REMOVED
          ▲              │                            
          │              │                            
          └──────────────┘                            
         (Heartbeat)    (Timeout)     (Confirmed)

Quick Start

1. Initialize Gossip Protocol

import com.cube.gossip.GossipProtocol;
import com.cube.gossip.GossipProtocol.GossipConfig;

// Create configuration
GossipConfig config = new GossipConfig(
    1000,    // Gossip every 1 second
    3,       // Gossip to 3 random nodes
    5000,    // Suspect after 5 seconds
    15000,   // Mark dead after 15 seconds
    3,       // Max 3 suspicions before dead
    7946     // Gossip protocol port
);

// Or use defaults
GossipConfig config = GossipConfig.defaultConfig();

// Initialize gossip protocol
GossipProtocol gossip = new GossipProtocol(
    "node-1",          // Local node ID
    "localhost",       // Local host
    8080,              // Application port
    config             // Configuration
);

// Start gossip
gossip.start();

2. Join a Cluster

// Specify seed nodes
List<String> seeds = Arrays.asList(
    "192.168.1.100:7946",
    "192.168.1.101:7946",
    "192.168.1.102:7946"
);

// Join the cluster
gossip.join(seeds);

3. Monitor Cluster Events

// Add event listener
gossip.addListener(new GossipProtocol.GossipListener() {
    @Override
    public void onNodeJoined(GossipProtocol.NodeState node) {
        System.out.println("Node joined: " + node.getNodeId());
    }
    
    @Override
    public void onNodeLeft(GossipProtocol.NodeState node) {
        System.out.println("Node left: " + node.getNodeId());
    }
    
    @Override
    public void onNodeSuspected(GossipProtocol.NodeState node) {
        System.out.println("Node suspected: " + node.getNodeId());
    }
    
    @Override
    public void onNodeAlive(GossipProtocol.NodeState node) {
        System.out.println("Node recovered: " + node.getNodeId());
    }
    
    @Override
    public void onNodeDead(GossipProtocol.NodeState node) {
        System.out.println("Node confirmed dead: " + node.getNodeId());
    }
});

4. Query Cluster State

// Get all alive nodes
List<NodeState> aliveNodes = gossip.getAliveNodes();
System.out.println("Alive nodes: " + aliveNodes.size());

// Get full cluster state
Map<String, NodeState> clusterState = gossip.getClusterState();

// Get statistics
Map<String, Object> stats = gossip.getStatistics();
System.out.println("Total nodes: " + stats.get("totalNodes"));
System.out.println("Alive nodes: " + stats.get("aliveNodes"));
System.out.println("Dead nodes: " + stats.get("deadNodes"));

5. Graceful Shutdown

// Leave cluster gracefully
gossip.leave();

// Shutdown gossip protocol
gossip.shutdown();

How It Works

1. Gossip Rounds

Every node periodically (default: 1 second):

1. Increment local heartbeat counter
2. Select 3 random alive nodes (fanout)
3. Send current cluster state to each
4. Receive and merge their cluster states
5. Update local view of the cluster

2. Failure Detection

Time 0: Node A is ALIVE
  │
  ▼
Time 5s: No heartbeat → SUSPECTED
  │
  ▼
Time 15s: Still no heartbeat → DEAD
  │
  ▼
Time 45s: Remove from cluster

3. State Merging

When receiving cluster state from another node:

For each node in received state:
    If node is new:
        → Add to local cluster
        → Notify listeners (onNodeJoined)
    
    If node exists:
        Compare heartbeat counters
        If received counter > local counter:
            → Update local state
            → Update status (ALIVE/SUSPECTED/DEAD)
            → Notify listeners if status changed

4. Message Types

  • STATE_SYNC: Full cluster state exchange
  • PING: Heartbeat check
  • ACK: Acknowledgment
  • JOIN: New node joining
  • LEAVE: Node leaving gracefully
  • ALIVE: Node is alive announcement
  • SUSPECT: Node suspected announcement
  • DEAD: Node dead announcement

Configuration Guide

Gossip Interval

gossipIntervalMs = 1000; // How often to gossip (milliseconds)
  • Lower (500ms): Faster failure detection, more network traffic
  • Higher (5000ms): Less network traffic, slower failure detection
  • Recommended: 1000ms (1 second)

Gossip Fanout

gossipFanout = 3; // Number of nodes to gossip with each round
  • Lower (1-2): Less network traffic, slower convergence
  • Higher (5-10): Faster convergence, more network traffic
  • Recommended: 3 for small clusters, 5 for large clusters

Timeouts

suspicionTimeoutMs = 5000;  // Time before marking node as suspected
failureTimeoutMs = 15000;   // Time before marking node as dead
  • Network latency: Add 2-3x expected latency
  • Node restart time: Set higher if nodes restart frequently
  • False positives: Increase timeouts to reduce

Max Suspicion Count

maxSuspicionCount = 3; // Number of suspicions before marking dead
  • Prevents single network glitch from marking node as dead
  • Recommended: 3-5 suspicions

Integration with Cube Database

Complete Example

package com.cube.examples;

import com.cube.gossip.GossipProtocol;
import com.cube.gossip.GossipProtocol.*;
import com.cube.storage.LSMStorageEngine;

public class CubeWithGossip {
    
    public static void main(String[] args) throws Exception {
        // Node configuration
        String nodeId = args[0];      // "node-1"
        String host = args[1];         // "192.168.1.100"
        int appPort = Integer.parseInt(args[2]);     // 8080
        int gossipPort = Integer.parseInt(args[3]);  // 7946
        
        // Initialize storage
        LSMStorageEngine storage = new LSMStorageEngine("/data/" + nodeId);
        
        // Initialize gossip
        GossipConfig config = GossipConfig.defaultConfig();
        GossipProtocol gossip = new GossipProtocol(nodeId, host, appPort, config);
        
        // Add cluster event handlers
        gossip.addListener(new GossipListener() {
            @Override
            public void onNodeJoined(NodeState node) {
                System.out.println("✓ Node joined: " + node.getNodeId());
                // Update routing tables, redistribute data, etc.
            }
            
            @Override
            public void onNodeLeft(NodeState node) {
                System.out.println("✗ Node left: " + node.getNodeId());
                // Remove from routing, rebalance data
            }
            
            @Override
            public void onNodeSuspected(NodeState node) {
                System.out.println("⚠ Node suspected: " + node.getNodeId());
                // Don't route new requests, but keep existing
            }
            
            @Override
            public void onNodeAlive(NodeState node) {
                System.out.println("✓ Node recovered: " + node.getNodeId());
                // Re-enable routing to this node
            }
            
            @Override
            public void onNodeDead(NodeState node) {
                System.out.println("✗ Node confirmed dead: " + node.getNodeId());
                // Trigger data replication, remove from cluster
            }
        });
        
        // Start gossip
        gossip.start();
        
        // Join cluster via seeds
        if (args.length > 4) {
            String seedsStr = args[4];  // "192.168.1.100:7946,192.168.1.101:7946"
            List<String> seeds = Arrays.asList(seedsStr.split(","));
            gossip.join(seeds);
        }
        
        // Monitor cluster
        while (true) {
            Thread.sleep(10000); // Every 10 seconds
            
            Map<String, Object> stats = gossip.getStatistics();
            System.out.println("\n=== Cluster Status ===");
            System.out.println("Total nodes: " + stats.get("totalNodes"));
            System.out.println("Alive nodes: " + stats.get("aliveNodes"));
            System.out.println("Suspected: " + stats.get("suspectedNodes"));
            System.out.println("Dead nodes: " + stats.get("deadNodes"));
            
            List<NodeState> alive = gossip.getAliveNodes();
            System.out.println("\nAlive nodes:");
            for (NodeState node : alive) {
                System.out.println("  - " + node.getNodeId() + " (" + 
                    node.getHost() + ":" + node.getPort() + ")");
            }
        }
    }
}

Testing Scenarios

Scenario 1: Start 3-Node Cluster

# Terminal 1: Start node 1
java -jar cube-db.jar \
  --node-id=node-1 \
  --host=192.168.1.100 \
  --port=8080 \
  --gossip-port=7946

# Terminal 2: Start node 2 and join
java -jar cube-db.jar \
  --node-id=node-2 \
  --host=192.168.1.101 \
  --port=8080 \
  --gossip-port=7946 \
  --seeds=192.168.1.100:7946

# Terminal 3: Start node 3 and join
java -jar cube-db.jar \
  --node-id=node-3 \
  --host=192.168.1.102 \
  --port=8080 \
  --gossip-port=7946 \
  --seeds=192.168.1.100:7946,192.168.1.101:7946

Scenario 2: Simulate Node Failure

# Kill node 2
kill -9 <pid-of-node-2>

# Observe on node 1:
Time 0s: Node 2 stopped
Time 5s: Node 2 marked as SUSPECTED
Time 15s: Node 2 marked as DEAD

# Restart node 2
java -jar cube-db.jar --node-id=node-2 ... --seeds=192.168.1.100:7946

# Observe:
Time 0s: Node 2 sends JOIN
Time 1s: Node 2 marked as ALIVE
Time 2s: All nodes see node 2 as ALIVE

Scenario 3: Network Partition

# Partition network between node1/node2 and node3
iptables -A INPUT -s 192.168.1.102 -j DROP

# Observe:
Node 1 & 2: See each other as ALIVE, node 3 as DEAD
Node 3: Sees itself as ALIVE, nodes 1 & 2 as DEAD

# Heal partition
iptables -D INPUT -s 192.168.1.102 -j DROP

# Observe:
All nodes exchange state and converge to consistent view

Performance Characteristics

Network Traffic

Per node per second:
= gossipFanout × messageSize

Example with fanout=3, message=10KB:
= 3 × 10KB = 30KB/s outbound
= 30KB/s × nodes_in_cluster inbound

For 100 nodes:
= 3MB/s total cluster traffic

Convergence Time

Time to detect failure:
= suspicionTimeoutMs + (gossipIntervalMs × log(N))

Example with 100 nodes:
= 5000ms + (1000ms × 7) = 12 seconds

Memory Usage

Per node:
= nodeCount × (nodeStateSize + heartbeatCounter + metadata)
= nodeCount × ~1KB

For 1000 nodes:
= ~1MB memory

Troubleshooting

High False Positive Rate

Symptom: Nodes frequently marked as SUSPECTED
Solutions:

  • Increase suspicionTimeoutMs
  • Increase gossipIntervalMs
  • Check network latency

Slow Failure Detection

Symptom: Takes too long to detect failed nodes
Solutions:

  • Decrease suspicionTimeoutMs
  • Decrease gossipIntervalMs
  • Increase gossipFanout

High Network Traffic

Symptom: Too much bandwidth used
Solutions:

  • Decrease gossipFanout
  • Increase gossipIntervalMs
  • Optimize message size

Best Practices

Use Seed Nodes: Maintain 3-5 stable seed nodes
Monitor Cluster: Track alive/dead node counts
Graceful Shutdown: Always call leave() before shutdown
Tune Timeouts: Based on network latency
Handle Events: Implement all listener methods
Test Failures: Regularly test node failures


Summary

Implemented: SWIM-based gossip protocol
Features: Failure detection, cluster membership, state sync
Scalable: Handles thousands of nodes
Fault Tolerant: Self-healing and eventually consistent
Easy Integration: Simple API, event-driven

Cube database now has enterprise-grade cluster management! 🎉