package com.cube.replication;
import com.cube.cluster.ClusterNode;
import java.util.*;
/**
* Network topology aware replication strategy.
* Places replicas across different racks and datacenters.
*/
public class NetworkTopologyReplicationStrategy implements ReplicationStrategy {
private final Map<String, Integer> datacenterReplicationFactors;
public NetworkTopologyReplicationStrategy(Map<String, Integer> dcReplicationFactors) {
this.datacenterReplicationFactors = new HashMap<>(dcReplicationFactors);
}
@Override
public List<ClusterNode> getReplicaNodes(String key, int replicationFactor, List<ClusterNode> availableNodes) {
List<ClusterNode> replicas = new ArrayList<>();
// Group nodes by datacenter
Map<String, List<ClusterNode>> nodesByDc = new HashMap<>();
for (ClusterNode node : availableNodes) {
nodesByDc.computeIfAbsent(node.getDatacenter(), k -> new ArrayList<>()).add(node);
}
// Get start node using consistent hashing
int startIndex = getStartIndex(key, availableNodes.size());
ClusterNode primaryNode = availableNodes.get(startIndex);
// Place replicas in each datacenter
for (Map.Entry<String, Integer> entry : datacenterReplicationFactors.entrySet()) {
String dc = entry.getKey();
int dcRF = entry.getValue();
List<ClusterNode> dcNodes = nodesByDc.getOrDefault(dc, new ArrayList<>());
if (dcNodes.isEmpty()) {
continue;
}
// Find starting node in this DC
int dcStartIndex = 0;
for (int i = 0; i < dcNodes.size(); i++) {
if (dcNodes.get(i).equals(primaryNode)) {
dcStartIndex = i;
break;
}
}
// Select replicas in different racks if possible
Set<String> usedRacks = new HashSet<>();
int selected = 0;
int attempts = 0;
while (selected < dcRF && attempts < dcNodes.size() * 2) {
int nodeIndex = (dcStartIndex + attempts) % dcNodes.size();
ClusterNode node = dcNodes.get(nodeIndex);
// Prefer nodes in different racks
if (!usedRacks.contains(node.getRack()) || usedRacks.size() >= dcRF) {
if (node.isAlive() && !replicas.contains(node)) {
replicas.add(node);
usedRacks.add(node.getRack());
selected++;
}
}
attempts++;
}
}
return replicas;
}
private int getStartIndex(String key, int nodeCount) {
int hash = key.hashCode();
return Math.abs(hash) % nodeCount;
}
@Override
public String getName() {
return "NetworkTopologyStrategy";
}
public Map<String, Integer> getDatacenterReplicationFactors() {
return new HashMap<>(datacenterReplicationFactors);
}
}