# Cube Database - Phase 2: Consistency & Replication ✅

## Overview

Phase 2 adds distributed database capabilities with tunable consistency levels, read repair, and hinted handoff - making Cube truly Cassandra-like!

## New Features

### 1. Tunable Consistency Levels

Control the trade-off between consistency, availability, and performance:

- **ANY** - Fastest writes, weakest consistency (accepts hints)
- **ONE** - One replica must respond
- **TWO** - Two replicas must respond  
- **THREE** - Three replicas must respond
- **QUORUM** - Majority of replicas ((RF/2) + 1)
- **ALL** - All replicas must respond (strongest consistency)
- **LOCAL_ONE** - One replica in local datacenter
- **LOCAL_QUORUM** - Quorum in local datacenter

### 2. Read Repair

Automatically detects and repairs inconsistencies during reads:
- Compares responses from all replicas
- Chooses the most recent value (highest timestamp)
- Asynchronously propagates correct value to stale replicas
- Configurable read repair probability (0-100%)

### 3. Hinted Handoff

Handles temporarily unavailable nodes:
- Stores writes as "hints" when target node is down
- Automatically replays hints when node recovers
- Configurable hint window and max hints per node
- Persists hints to disk for durability

### 4. Replication Strategies

**SimpleReplicationStrategy:**
- Places replicas on consecutive nodes
- Good for single-datacenter deployments
- Uses consistent hashing for key distribution

**NetworkTopologyStrategy:**
- Rack and datacenter aware
- Distributes replicas across racks for fault tolerance
- Supports multi-datacenter deployments
- Configurable replication factor per DC

## Architecture

```
┌─────────────────────────────────────────────────────────┐
│           Replication Coordinator                        │
├─────────────────────────────────────────────────────────┤
│                                                           │
│  Write Path:                                             │
│  ┌──────────┐                                            │
│  │  Client  │                                            │
│  └────┬─────┘                                            │
│       │ CL=QUORUM                                        │
│       ▼                                                  │
│  ┌──────────────┐                                        │
│  │ Coordinator  │                                        │
│  └───┬──┬───┬───┘                                        │
│      │  │   │ Write to RF=3 replicas                    │
│      ▼  ▼   ▼                                            │
│   Node1 Node2 Node3                                      │
│     ✓    ✓    ✗ (down)                                   │
│              │                                           │
│              ▼                                           │
│         [Hinted Handoff]                                 │
│         Store hint for Node3                             │
│                                                           │
│  Read Path with Read Repair:                            │
│  ┌──────────┐                                            │
│  │  Client  │                                            │
│  └────┬─────┘                                            │
│       │ CL=QUORUM                                        │
│       ▼                                                  │
│  ┌──────────────┐                                        │
│  │ Coordinator  │                                        │
│  └───┬──┬───┬───┘                                        │
│      │  │   │ Read from replicas                        │
│      ▼  ▼   ▼                                            │
│   Node1 Node2 Node3                                      │
│   v1,t1 v2,t2 v1,t1                                      │
│      │  │   │                                            │
│      └──┴───┘                                            │
│         │ Compare responses                              │
│         ▼                                                │
│    Choose v2 (newest)                                    │
│         │                                                │
│         ▼                                                │
│    [Read Repair]                                         │
│    Repair Node1 & Node3                                  │
│                                                           │
└─────────────────────────────────────────────────────────┘
```

## Usage Examples

### Consistency Levels

```java
import com.cube.consistency.ConsistencyLevel;
import com.cube.replication.ReplicationCoordinator;

// Write with QUORUM (strong consistency)
ReplicationCoordinator.WriteResult result = coordinator.write(
    "user:123",
    "Alice".getBytes(),
    ConsistencyLevel.QUORUM,
    clusterNodes
);

if (result.isSuccess()) {
    System.out.println("Wrote to " + result.getSuccessfulWrites() + " replicas");
}

// Read with ONE (fast, eventual consistency)
ReplicationCoordinator.ReadResult readResult = coordinator.read(
    "user:123",
    ConsistencyLevel.ONE,
    clusterNodes
);

if (readResult.isSuccess()) {
    String value = new String(readResult.getValue());
    System.out.println("Read value: " + value);
}

// Write with ALL (maximum consistency)
coordinator.write(
    "important:data",
    "critical".getBytes(),
    ConsistencyLevel.ALL,
    clusterNodes
);
```

### Hinted Handoff

```java
import com.cube.replication.HintedHandoffManager;

// Initialize hinted handoff
HintedHandoffManager hintedHandoff = new HintedHandoffManager(
    "/var/lib/cube/hints",  // Hints directory
    10000,                   // Max hints per node
    3600000                  // 1 hour hint window
);

// Store hint for unavailable node
hintedHandoff.storeHint(
    "node-2",                    // Target node
    "user:123",                  // Key
    "Alice".getBytes()           // Value
);

// Replay hints when node recovers
hintedHandoff.replayHintsForNode("node-2", hint -> {
    // Send hint to node over network
    return sendToNode(hint.getTargetNodeId(), hint.getKey(), hint.getValue());
});

// Get hint statistics
int totalHints = hintedHandoff.getTotalHintCount();
int node2Hints = hintedHandoff.getHintCount("node-2");
```

### Read Repair

```java
import com.cube.replication.ReadRepairManager;
import com.cube.replication.ReadRepairManager.ReadResponse;

// Initialize read repair with 10% probability
ReadRepairManager readRepair = new ReadRepairManager(10);

// Collect responses from replicas
List<ReadResponse> responses = new ArrayList<>();
responses.add(new ReadResponse(node1, "key1", "old".getBytes(), 1000));
responses.add(new ReadResponse(node2, "key1", "new".getBytes(), 2000)); // Newer
responses.add(new ReadResponse(node3, "key1", "old".getBytes(), 1000));

// Perform read repair
ReadRepairManager.ReadRepairResult result = readRepair.performReadRepairBlocking(
    responses,
    (node, key, value, timestamp) -> {
        // Repair the node
        sendRepairToNode(node, key, value, timestamp);
        return true;
    }
);

// Check result
if (result.isRepairNeeded()) {
    System.out.println("Repaired " + result.getRepairedNodes() + " nodes");
}

byte[] canonicalValue = result.getCanonicalValue(); // "new"
```

### Replication Strategies

**Simple Strategy:**
```java
import com.cube.replication.SimpleReplicationStrategy;

ReplicationStrategy strategy = new SimpleReplicationStrategy();

List<ClusterNode> replicas = strategy.getReplicaNodes(
    "user:123",      // Key
    3,               // Replication factor
    allNodes         // Available nodes
);

System.out.println("Replicas: " + replicas);
```

**Network Topology Strategy:**
```java
import com.cube.replication.NetworkTopologyReplicationStrategy;

// Configure replication per datacenter
Map<String, Integer> dcRF = new HashMap<>();
dcRF.put("us-east", 3);
dcRF.put("us-west", 2);
dcRF.put("eu-west", 2);

ReplicationStrategy strategy = new NetworkTopologyReplicationStrategy(dcRF);

List<ClusterNode> replicas = strategy.getReplicaNodes(
    "user:123",
    3,
    allNodes
);

// Will place 3 replicas in us-east, 2 in us-west, 2 in eu-west
```

### Complete Example

```java
import com.cube.cluster.ClusterNode;
import com.cube.consistency.ConsistencyLevel;
import com.cube.replication.*;
import com.cube.storage.LSMStorageEngine;

public class Phase2Example {
    public static void main(String[] args) throws Exception {
        // Initialize storage
        LSMStorageEngine storage = new LSMStorageEngine("/var/lib/cube/data");
        
        // Initialize components
        HintedHandoffManager hintedHandoff = new HintedHandoffManager(
            "/var/lib/cube/hints", 10000, 3600000);
        
        ReadRepairManager readRepair = new ReadRepairManager(10);
        
        ReplicationStrategy strategy = new SimpleReplicationStrategy();
        
        ReplicationCoordinator coordinator = new ReplicationCoordinator(
            storage,
            strategy,
            hintedHandoff,
            readRepair,
            3,      // RF=3
            5000,   // 5s write timeout
            3000    // 3s read timeout
        );
        
        // Define cluster
        List<ClusterNode> nodes = new ArrayList<>();
        nodes.add(new ClusterNode("node1", "10.0.0.1", 8080));
        nodes.add(new ClusterNode("node2", "10.0.0.2", 8080));
        nodes.add(new ClusterNode("node3", "10.0.0.3", 8080));
        
        // Strong consistency write
        ReplicationCoordinator.WriteResult writeResult = coordinator.write(
            "user:alice",
            "Alice Johnson".getBytes(),
            ConsistencyLevel.QUORUM,  // Wait for 2 of 3 replicas
            nodes
        );
        
        System.out.println("Write successful: " + writeResult.isSuccess());
        System.out.println("Replicas written: " + writeResult.getSuccessfulWrites());
        
        // Fast eventual consistency read
        ReplicationCoordinator.ReadResult readResult = coordinator.read(
            "user:alice",
            ConsistencyLevel.ONE,  // Read from first available replica
            nodes
        );
        
        if (readResult.isSuccess()) {
            String value = new String(readResult.getValue());
            System.out.println("Value: " + value);
            System.out.println("Read repair performed: " + readResult.isRepairPerformed());
        }
        
        // Get statistics
        Map<String, Object> stats = coordinator.getStats();
        System.out.println("Replication stats: " + stats);
        
        // Cleanup
        coordinator.shutdown();
        storage.close();
    }
}
```

## Configuration

### Consistency Level Selection Guide

| Use Case | Write CL | Read CL | Explanation |
|----------|----------|---------|-------------|
| **High Availability** | ONE | ONE | Fastest, eventual consistency |
| **Balanced** | QUORUM | QUORUM | Strong consistency, good performance |
| **Strong Consistency** | QUORUM | ALL | Ensure all reads see latest write |
| **Maximum Consistency** | ALL | ALL | Strictest, slowest |
| **Session Consistency** | ONE | QUORUM | Fast writes, consistent reads |

### Replication Factor Guidelines

- **RF=1**: No redundancy, single point of failure
- **RF=2**: Limited fault tolerance (1 node failure)
- **RF=3**: Good balance (2 node failures) - **recommended**
- **RF=5**: High availability (4 node failures)

### Read Repair Configuration

```java
// Always perform read repair
ReadRepairManager readRepair = new ReadRepairManager(100);

// 10% chance (probabilistic)
ReadRepairManager readRepair = new ReadRepairManager(10);

// Never perform read repair
ReadRepairManager readRepair = new ReadRepairManager(0);
```

### Hinted Handoff Configuration

```java
HintedHandoffManager hintedHandoff = new HintedHandoffManager(
    "/var/lib/cube/hints",  // Directory for hints
    10000,                   // Max hints per node (prevent overflow)
    3600000                  // Hint window: 1 hour (discard older hints)
);
```

## Performance Characteristics

### Consistency Level Impact

| CL | Write Latency | Read Latency | Consistency | Availability |
|----|---------------|--------------|-------------|--------------|
| ANY | Lowest | N/A | Weakest | Highest |
| ONE | Very Low | Very Low | Weak | High |
| QUORUM | Medium | Medium | Strong | Medium |
| ALL | Highest | Highest | Strongest | Lowest |

### Read Repair Overhead

- **0% chance**: No overhead, eventual consistency
- **10% chance**: ~10% of reads slightly slower, good balance
- **100% chance**: All reads check consistency, strongest guarantee

### Hinted Handoff

- **Storage**: ~1KB per hint
- **Replay**: Background process, minimal impact
- **Network**: Replayed when node recovers

## Testing

```bash
# Run Phase 2 tests
mvn test -Dtest=ReplicationTest

# Expected output:
[INFO] Tests run: 13, Failures: 0, Errors: 0, Skipped: 0
```

## Monitoring

```java
// Get replication statistics
Map<String, Object> stats = coordinator.getStats();

System.out.println("Replication Factor: " + stats.get("replicationFactor"));
System.out.println("Pending Hints: " + stats.get("pendingHints"));
System.out.println("Read Repair Stats: " + stats.get("readRepairStats"));
System.out.println("Active Tasks: " + stats.get("activeReplicationTasks"));
```

## Common Patterns

### Strong Consistency Pattern
```java
// Ensure readers always see latest write
coordinator.write(key, value, ConsistencyLevel.QUORUM, nodes);
coordinator.read(key, ConsistencyLevel.QUORUM, nodes);
```

### High Availability Pattern
```java
// Maximize availability with eventual consistency
coordinator.write(key, value, ConsistencyLevel.ONE, nodes);
coordinator.read(key, ConsistencyLevel.ONE, nodes);
```

### Session Consistency Pattern
```java
// Fast writes, but ensure reads are consistent
coordinator.write(key, value, ConsistencyLevel.ONE, nodes);
Thread.sleep(10); // Allow replication
coordinator.read(key, ConsistencyLevel.QUORUM, nodes);
```

## Troubleshooting

### "Not enough replicas available"
**Cause**: Fewer nodes than replication factor  
**Solution**: Reduce RF or add more nodes

### "Write timeout"
**Cause**: Nodes too slow or unreachable  
**Solution**: Increase write timeout or use lower consistency level

### "Too many hints"
**Cause**: Node down for extended period  
**Solution**: Investigate node issues, consider manual repair

### "Read repair conflicts"
**Cause**: Network partitions or clock skew  
**Solution**: Use NTP for time sync, check network stability

## Next Steps - Phase 3

- [ ] Bloom Filters for faster negative lookups
- [ ] Compression (Snappy, LZ4)
- [ ] Leveled compaction strategy
- [ ] Anti-entropy repair (Merkle trees)
- [ ] Streaming for node replacement

---

**Phase 2 Complete! Cube is now a true distributed database!** 🎉

**Key Achievements:**
- ✅ Tunable consistency levels
- ✅ Read repair for consistency
- ✅ Hinted handoff for availability
- ✅ Multiple replication strategies
- ✅ Comprehensive testing
