package com.cube.gossip;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
/**
* Handles network communication for gossip protocol
*/
public class GossipMessageHandler {
private static final Logger logger = LoggerFactory.getLogger(GossipMessageHandler.class);
private final GossipProtocol gossipProtocol;
private ServerSocket serverSocket;
private ExecutorService listenerExecutor;
private volatile boolean running;
public GossipMessageHandler(GossipProtocol gossipProtocol) {
this.gossipProtocol = gossipProtocol;
this.running = false;
}
/**
* Start listening for gossip messages
*/
public void start(int port) {
try {
serverSocket = new ServerSocket(port);
running = true;
listenerExecutor = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "Gossip-Listener");
t.setDaemon(true);
return t;
});
// Start accepting connections
listenerExecutor.submit(this::acceptConnections);
logger.info("Gossip message handler started on port {}", port);
} catch (IOException e) {
logger.error("Failed to start gossip message handler", e);
throw new RuntimeException("Failed to start gossip listener", e);
}
}
/**
* Accept incoming connections
*/
private void acceptConnections() {
while (running) {
try {
Socket socket = serverSocket.accept();
socket.setSoTimeout(5000); // 5 second timeout
listenerExecutor.submit(() -> handleConnection(socket));
} catch (SocketException e) {
if (running) {
logger.error("Socket error in accept loop", e);
}
break;
} catch (IOException e) {
if (running) {
logger.error("Error accepting connection", e);
}
}
}
}
/**
* Handle an incoming connection
*/
private void handleConnection(Socket socket) {
try (
ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())
) {
// Read message
GossipProtocol.GossipMessage message =
(GossipProtocol.GossipMessage) in.readObject();
logger.debug("Received {} from {}", message.getType(), message.getFromNodeId());
// Process message
GossipProtocol.GossipMessage response = processMessage(message);
// Send response if needed
if (response != null) {
out.writeObject(response);
out.flush();
}
} catch (Exception e) {
logger.warn("Error handling connection: {}", e.getMessage());
} finally {
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}
}
/**
* Process a received message
*/
private GossipProtocol.GossipMessage processMessage(GossipProtocol.GossipMessage message) {
switch (message.getType()) {
case STATE_SYNC:
// Merge received state
gossipProtocol.mergeState(message.getNodeStates());
// Send ACK with our state
return new GossipProtocol.GossipMessage(
GossipProtocol.GossipMessage.Type.ACK,
gossipProtocol.getLocalNodeId(),
message.getFromNodeId(),
gossipProtocol.getClusterState(),
0
);
case PING:
// Respond with ACK
return new GossipProtocol.GossipMessage(
GossipProtocol.GossipMessage.Type.ACK,
gossipProtocol.getLocalNodeId(),
message.getFromNodeId(),
null,
0
);
case JOIN:
// Add joining node to cluster
gossipProtocol.mergeState(message.getNodeStates());
// Send full state to new node
return new GossipProtocol.GossipMessage(
GossipProtocol.GossipMessage.Type.STATE_SYNC,
gossipProtocol.getLocalNodeId(),
message.getFromNodeId(),
gossipProtocol.getClusterState(),
0
);
case LEAVE:
// Mark node as leaving
logger.info("Node {} is leaving", message.getFromNodeId());
return null;
case ACK:
// Acknowledgment received
return null;
default:
logger.warn("Unknown message type: {}", message.getType());
return null;
}
}
/**
* Send a message to a node
*/
public void sendMessage(String host, int port, GossipProtocol.GossipMessage message)
throws IOException {
Socket socket = null;
try {
socket = new Socket();
socket.connect(new InetSocketAddress(host, port), 3000); // 3 second connection timeout
socket.setSoTimeout(5000); // 5 second read timeout
try (
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream in = new ObjectInputStream(socket.getInputStream())
) {
// Send message
out.writeObject(message);
out.flush();
// Wait for response if needed
if (message.getType() == GossipProtocol.GossipMessage.Type.STATE_SYNC ||
message.getType() == GossipProtocol.GossipMessage.Type.JOIN ||
message.getType() == GossipProtocol.GossipMessage.Type.PING) {
try {
GossipProtocol.GossipMessage response =
(GossipProtocol.GossipMessage) in.readObject();
if (response.getType() == GossipProtocol.GossipMessage.Type.ACK ||
response.getType() == GossipProtocol.GossipMessage.Type.STATE_SYNC) {
// Merge response state
if (response.getNodeStates() != null) {
gossipProtocol.mergeState(response.getNodeStates());
}
}
} catch (SocketTimeoutException e) {
// No response, that's okay for some message types
logger.debug("No response from {}:{}", host, port);
}
}
}
} catch (ConnectException e) {
throw new IOException("Could not connect to " + host + ":" + port, e);
} catch (SocketTimeoutException e) {
throw new IOException("Timeout connecting to " + host + ":" + port, e);
} catch (ClassNotFoundException e) {
throw new IOException("Invalid response from " + host + ":" + port, e);
} finally {
if (socket != null && !socket.isClosed()) {
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}
}
}
/**
* Shutdown the message handler
*/
public void shutdown() {
running = false;
try {
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
} catch (IOException e) {
logger.error("Error closing server socket", e);
}
if (listenerExecutor != null) {
listenerExecutor.shutdown();
try {
if (!listenerExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
listenerExecutor.shutdownNow();
}
} catch (InterruptedException e) {
listenerExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}