# Cube Database - Gossip Protocol Implementation

## Overview

Cube database now includes a **SWIM-based Gossip Protocol** for distributed cluster membership and failure detection!

## What is Gossip Protocol?

Gossip protocol is a decentralized, peer-to-peer communication pattern where nodes periodically exchange information with random neighbors, similar to how gossip spreads in social networks.

### Key Benefits:
✅ **Decentralized** - No single point of failure  
✅ **Scalable** - Works efficiently with thousands of nodes  
✅ **Eventually Consistent** - All nodes converge to same view  
✅ **Fault Tolerant** - Handles node failures gracefully  
✅ **Self-Healing** - Automatically detects and recovers from failures  

---

## Architecture

### SWIM Protocol (Scalable Weakly-consistent Infection-style Membership)

```
┌─────────────────────────────────────────────────────────┐
│              Gossip Protocol Architecture               │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────┐      ┌─────────┐      ┌─────────┐        │
│  │ Node A  │◄────►│ Node B  │◄────►│ Node C  │        │
│  │ ALIVE   │      │ ALIVE   │      │ SUSPECTED│        │
│  └─────────┘      └─────────┘      └─────────┘        │
│       ▲                ▲                 ▲             │
│       │    Gossip      │     Gossip      │             │
│       │   Messages     │    Messages     │             │
│       │                │                 │             │
│       ▼                ▼                 ▼             │
│  ┌─────────┐      ┌─────────┐      ┌─────────┐        │
│  │ Node D  │◄────►│ Node E  │◄────►│ Node F  │        │
│  │ ALIVE   │      │ ALIVE   │      │  DEAD   │        │
│  └─────────┘      └─────────┘      └─────────┘        │
│                                                         │
└─────────────────────────────────────────────────────────┘
```

### Node States

```
JOINING ──┐
          │
          ▼
        ALIVE ──────► SUSPECTED ──────► DEAD ──────► REMOVED
          ▲              │                            
          │              │                            
          └──────────────┘                            
         (Heartbeat)    (Timeout)     (Confirmed)
```

---

## Quick Start

### 1. Initialize Gossip Protocol

```java
import com.cube.gossip.GossipProtocol;
import com.cube.gossip.GossipProtocol.GossipConfig;

// Create configuration
GossipConfig config = new GossipConfig(
    1000,    // Gossip every 1 second
    3,       // Gossip to 3 random nodes
    5000,    // Suspect after 5 seconds
    15000,   // Mark dead after 15 seconds
    3,       // Max 3 suspicions before dead
    7946     // Gossip protocol port
);

// Or use defaults
GossipConfig config = GossipConfig.defaultConfig();

// Initialize gossip protocol
GossipProtocol gossip = new GossipProtocol(
    "node-1",          // Local node ID
    "localhost",       // Local host
    8080,              // Application port
    config             // Configuration
);

// Start gossip
gossip.start();
```

### 2. Join a Cluster

```java
// Specify seed nodes
List<String> seeds = Arrays.asList(
    "192.168.1.100:7946",
    "192.168.1.101:7946",
    "192.168.1.102:7946"
);

// Join the cluster
gossip.join(seeds);
```

### 3. Monitor Cluster Events

```java
// Add event listener
gossip.addListener(new GossipProtocol.GossipListener() {
    @Override
    public void onNodeJoined(GossipProtocol.NodeState node) {
        System.out.println("Node joined: " + node.getNodeId());
    }
    
    @Override
    public void onNodeLeft(GossipProtocol.NodeState node) {
        System.out.println("Node left: " + node.getNodeId());
    }
    
    @Override
    public void onNodeSuspected(GossipProtocol.NodeState node) {
        System.out.println("Node suspected: " + node.getNodeId());
    }
    
    @Override
    public void onNodeAlive(GossipProtocol.NodeState node) {
        System.out.println("Node recovered: " + node.getNodeId());
    }
    
    @Override
    public void onNodeDead(GossipProtocol.NodeState node) {
        System.out.println("Node confirmed dead: " + node.getNodeId());
    }
});
```

### 4. Query Cluster State

```java
// Get all alive nodes
List<NodeState> aliveNodes = gossip.getAliveNodes();
System.out.println("Alive nodes: " + aliveNodes.size());

// Get full cluster state
Map<String, NodeState> clusterState = gossip.getClusterState();

// Get statistics
Map<String, Object> stats = gossip.getStatistics();
System.out.println("Total nodes: " + stats.get("totalNodes"));
System.out.println("Alive nodes: " + stats.get("aliveNodes"));
System.out.println("Dead nodes: " + stats.get("deadNodes"));
```

### 5. Graceful Shutdown

```java
// Leave cluster gracefully
gossip.leave();

// Shutdown gossip protocol
gossip.shutdown();
```

---

## How It Works

### 1. Gossip Rounds

Every node periodically (default: 1 second):

```
1. Increment local heartbeat counter
2. Select 3 random alive nodes (fanout)
3. Send current cluster state to each
4. Receive and merge their cluster states
5. Update local view of the cluster
```

### 2. Failure Detection

```
Time 0: Node A is ALIVE
  │
  ▼
Time 5s: No heartbeat → SUSPECTED
  │
  ▼
Time 15s: Still no heartbeat → DEAD
  │
  ▼
Time 45s: Remove from cluster
```

### 3. State Merging

When receiving cluster state from another node:

```java
For each node in received state:
    If node is new:
        → Add to local cluster
        → Notify listeners (onNodeJoined)
    
    If node exists:
        Compare heartbeat counters
        If received counter > local counter:
            → Update local state
            → Update status (ALIVE/SUSPECTED/DEAD)
            → Notify listeners if status changed
```

### 4. Message Types

- **STATE_SYNC**: Full cluster state exchange
- **PING**: Heartbeat check
- **ACK**: Acknowledgment
- **JOIN**: New node joining
- **LEAVE**: Node leaving gracefully
- **ALIVE**: Node is alive announcement
- **SUSPECT**: Node suspected announcement
- **DEAD**: Node dead announcement

---

## Configuration Guide

### Gossip Interval
```java
gossipIntervalMs = 1000; // How often to gossip (milliseconds)
```
- **Lower** (500ms): Faster failure detection, more network traffic
- **Higher** (5000ms): Less network traffic, slower failure detection
- **Recommended**: 1000ms (1 second)

### Gossip Fanout
```java
gossipFanout = 3; // Number of nodes to gossip with each round
```
- **Lower** (1-2): Less network traffic, slower convergence
- **Higher** (5-10): Faster convergence, more network traffic
- **Recommended**: 3 for small clusters, 5 for large clusters

### Timeouts
```java
suspicionTimeoutMs = 5000;  // Time before marking node as suspected
failureTimeoutMs = 15000;   // Time before marking node as dead
```
- **Network latency**: Add 2-3x expected latency
- **Node restart time**: Set higher if nodes restart frequently
- **False positives**: Increase timeouts to reduce

### Max Suspicion Count
```java
maxSuspicionCount = 3; // Number of suspicions before marking dead
```
- Prevents single network glitch from marking node as dead
- Recommended: 3-5 suspicions

---

## Integration with Cube Database

### Complete Example

```java
package com.cube.examples;

import com.cube.gossip.GossipProtocol;
import com.cube.gossip.GossipProtocol.*;
import com.cube.storage.LSMStorageEngine;

public class CubeWithGossip {
    
    public static void main(String[] args) throws Exception {
        // Node configuration
        String nodeId = args[0];      // "node-1"
        String host = args[1];         // "192.168.1.100"
        int appPort = Integer.parseInt(args[2]);     // 8080
        int gossipPort = Integer.parseInt(args[3]);  // 7946
        
        // Initialize storage
        LSMStorageEngine storage = new LSMStorageEngine("/data/" + nodeId);
        
        // Initialize gossip
        GossipConfig config = GossipConfig.defaultConfig();
        GossipProtocol gossip = new GossipProtocol(nodeId, host, appPort, config);
        
        // Add cluster event handlers
        gossip.addListener(new GossipListener() {
            @Override
            public void onNodeJoined(NodeState node) {
                System.out.println("✓ Node joined: " + node.getNodeId());
                // Update routing tables, redistribute data, etc.
            }
            
            @Override
            public void onNodeLeft(NodeState node) {
                System.out.println("✗ Node left: " + node.getNodeId());
                // Remove from routing, rebalance data
            }
            
            @Override
            public void onNodeSuspected(NodeState node) {
                System.out.println("⚠ Node suspected: " + node.getNodeId());
                // Don't route new requests, but keep existing
            }
            
            @Override
            public void onNodeAlive(NodeState node) {
                System.out.println("✓ Node recovered: " + node.getNodeId());
                // Re-enable routing to this node
            }
            
            @Override
            public void onNodeDead(NodeState node) {
                System.out.println("✗ Node confirmed dead: " + node.getNodeId());
                // Trigger data replication, remove from cluster
            }
        });
        
        // Start gossip
        gossip.start();
        
        // Join cluster via seeds
        if (args.length > 4) {
            String seedsStr = args[4];  // "192.168.1.100:7946,192.168.1.101:7946"
            List<String> seeds = Arrays.asList(seedsStr.split(","));
            gossip.join(seeds);
        }
        
        // Monitor cluster
        while (true) {
            Thread.sleep(10000); // Every 10 seconds
            
            Map<String, Object> stats = gossip.getStatistics();
            System.out.println("\n=== Cluster Status ===");
            System.out.println("Total nodes: " + stats.get("totalNodes"));
            System.out.println("Alive nodes: " + stats.get("aliveNodes"));
            System.out.println("Suspected: " + stats.get("suspectedNodes"));
            System.out.println("Dead nodes: " + stats.get("deadNodes"));
            
            List<NodeState> alive = gossip.getAliveNodes();
            System.out.println("\nAlive nodes:");
            for (NodeState node : alive) {
                System.out.println("  - " + node.getNodeId() + " (" + 
                    node.getHost() + ":" + node.getPort() + ")");
            }
        }
    }
}
```

---

## Testing Scenarios

### Scenario 1: Start 3-Node Cluster

```bash
# Terminal 1: Start node 1
java -jar cube-db.jar \
  --node-id=node-1 \
  --host=192.168.1.100 \
  --port=8080 \
  --gossip-port=7946

# Terminal 2: Start node 2 and join
java -jar cube-db.jar \
  --node-id=node-2 \
  --host=192.168.1.101 \
  --port=8080 \
  --gossip-port=7946 \
  --seeds=192.168.1.100:7946

# Terminal 3: Start node 3 and join
java -jar cube-db.jar \
  --node-id=node-3 \
  --host=192.168.1.102 \
  --port=8080 \
  --gossip-port=7946 \
  --seeds=192.168.1.100:7946,192.168.1.101:7946
```

### Scenario 2: Simulate Node Failure

```bash
# Kill node 2
kill -9 <pid-of-node-2>

# Observe on node 1:
Time 0s: Node 2 stopped
Time 5s: Node 2 marked as SUSPECTED
Time 15s: Node 2 marked as DEAD

# Restart node 2
java -jar cube-db.jar --node-id=node-2 ... --seeds=192.168.1.100:7946

# Observe:
Time 0s: Node 2 sends JOIN
Time 1s: Node 2 marked as ALIVE
Time 2s: All nodes see node 2 as ALIVE
```

### Scenario 3: Network Partition

```bash
# Partition network between node1/node2 and node3
iptables -A INPUT -s 192.168.1.102 -j DROP

# Observe:
Node 1 & 2: See each other as ALIVE, node 3 as DEAD
Node 3: Sees itself as ALIVE, nodes 1 & 2 as DEAD

# Heal partition
iptables -D INPUT -s 192.168.1.102 -j DROP

# Observe:
All nodes exchange state and converge to consistent view
```

---

## Performance Characteristics

### Network Traffic

```
Per node per second:
= gossipFanout × messageSize

Example with fanout=3, message=10KB:
= 3 × 10KB = 30KB/s outbound
= 30KB/s × nodes_in_cluster inbound

For 100 nodes:
= 3MB/s total cluster traffic
```

### Convergence Time

```
Time to detect failure:
= suspicionTimeoutMs + (gossipIntervalMs × log(N))

Example with 100 nodes:
= 5000ms + (1000ms × 7) = 12 seconds
```

### Memory Usage

```
Per node:
= nodeCount × (nodeStateSize + heartbeatCounter + metadata)
= nodeCount × ~1KB

For 1000 nodes:
= ~1MB memory
```

---

## Troubleshooting

### High False Positive Rate

**Symptom**: Nodes frequently marked as SUSPECTED  
**Solutions**:
- Increase `suspicionTimeoutMs`
- Increase `gossipIntervalMs`
- Check network latency

### Slow Failure Detection

**Symptom**: Takes too long to detect failed nodes  
**Solutions**:
- Decrease `suspicionTimeoutMs`
- Decrease `gossipIntervalMs`
- Increase `gossipFanout`

### High Network Traffic

**Symptom**: Too much bandwidth used  
**Solutions**:
- Decrease `gossipFanout`
- Increase `gossipIntervalMs`
- Optimize message size

---

## Best Practices

✅ **Use Seed Nodes**: Maintain 3-5 stable seed nodes  
✅ **Monitor Cluster**: Track alive/dead node counts  
✅ **Graceful Shutdown**: Always call `leave()` before shutdown  
✅ **Tune Timeouts**: Based on network latency  
✅ **Handle Events**: Implement all listener methods  
✅ **Test Failures**: Regularly test node failures  

---

## Summary

✅ **Implemented**: SWIM-based gossip protocol  
✅ **Features**: Failure detection, cluster membership, state sync  
✅ **Scalable**: Handles thousands of nodes  
✅ **Fault Tolerant**: Self-healing and eventually consistent  
✅ **Easy Integration**: Simple API, event-driven  

**Cube database now has enterprise-grade cluster management!** 🎉
