Newer
Older
noctua / src / main / java / com / cube / replication / NetworkTopologyReplicationStrategy.java
@agalyaramadoss agalyaramadoss on 13 Feb 3 KB first commit
package com.cube.replication;

import com.cube.cluster.ClusterNode;
import java.util.*;

/**
 * Network topology aware replication strategy.
 * Places replicas across different racks and datacenters.
 */
public class NetworkTopologyReplicationStrategy implements ReplicationStrategy {
    
    private final Map<String, Integer> datacenterReplicationFactors;
    
    public NetworkTopologyReplicationStrategy(Map<String, Integer> dcReplicationFactors) {
        this.datacenterReplicationFactors = new HashMap<>(dcReplicationFactors);
    }
    
    @Override
    public List<ClusterNode> getReplicaNodes(String key, int replicationFactor, List<ClusterNode> availableNodes) {
        List<ClusterNode> replicas = new ArrayList<>();
        
        // Group nodes by datacenter
        Map<String, List<ClusterNode>> nodesByDc = new HashMap<>();
        for (ClusterNode node : availableNodes) {
            nodesByDc.computeIfAbsent(node.getDatacenter(), k -> new ArrayList<>()).add(node);
        }
        
        // Get start node using consistent hashing
        int startIndex = getStartIndex(key, availableNodes.size());
        ClusterNode primaryNode = availableNodes.get(startIndex);
        
        // Place replicas in each datacenter
        for (Map.Entry<String, Integer> entry : datacenterReplicationFactors.entrySet()) {
            String dc = entry.getKey();
            int dcRF = entry.getValue();
            
            List<ClusterNode> dcNodes = nodesByDc.getOrDefault(dc, new ArrayList<>());
            if (dcNodes.isEmpty()) {
                continue;
            }
            
            // Find starting node in this DC
            int dcStartIndex = 0;
            for (int i = 0; i < dcNodes.size(); i++) {
                if (dcNodes.get(i).equals(primaryNode)) {
                    dcStartIndex = i;
                    break;
                }
            }
            
            // Select replicas in different racks if possible
            Set<String> usedRacks = new HashSet<>();
            int selected = 0;
            int attempts = 0;
            
            while (selected < dcRF && attempts < dcNodes.size() * 2) {
                int nodeIndex = (dcStartIndex + attempts) % dcNodes.size();
                ClusterNode node = dcNodes.get(nodeIndex);
                
                // Prefer nodes in different racks
                if (!usedRacks.contains(node.getRack()) || usedRacks.size() >= dcRF) {
                    if (node.isAlive() && !replicas.contains(node)) {
                        replicas.add(node);
                        usedRacks.add(node.getRack());
                        selected++;
                    }
                }
                
                attempts++;
            }
        }
        
        return replicas;
    }
    
    private int getStartIndex(String key, int nodeCount) {
        int hash = key.hashCode();
        return Math.abs(hash) % nodeCount;
    }
    
    @Override
    public String getName() {
        return "NetworkTopologyStrategy";
    }
    
    public Map<String, Integer> getDatacenterReplicationFactors() {
        return new HashMap<>(datacenterReplicationFactors);
    }
}