package com.cube.gossip;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.*;
import java.util.concurrent.*;

/**
 * Handles network communication for gossip protocol
 */
public class GossipMessageHandler {
    
    private static final Logger logger = LoggerFactory.getLogger(GossipMessageHandler.class);
    
    private final GossipProtocol gossipProtocol;
    private ServerSocket serverSocket;
    private ExecutorService listenerExecutor;
    private volatile boolean running;
    
    public GossipMessageHandler(GossipProtocol gossipProtocol) {
        this.gossipProtocol = gossipProtocol;
        this.running = false;
    }
    
    /**
     * Start listening for gossip messages
     */
    public void start(int port) {
        try {
            serverSocket = new ServerSocket(port);
            running = true;
            
            listenerExecutor = Executors.newCachedThreadPool(r -> {
                Thread t = new Thread(r, "Gossip-Listener");
                t.setDaemon(true);
                return t;
            });
            
            // Start accepting connections
            listenerExecutor.submit(this::acceptConnections);
            
            logger.info("Gossip message handler started on port {}", port);
            
        } catch (IOException e) {
            logger.error("Failed to start gossip message handler", e);
            throw new RuntimeException("Failed to start gossip listener", e);
        }
    }
    
    /**
     * Accept incoming connections
     */
    private void acceptConnections() {
        while (running) {
            try {
                Socket socket = serverSocket.accept();
                socket.setSoTimeout(5000); // 5 second timeout
                
                listenerExecutor.submit(() -> handleConnection(socket));
                
            } catch (SocketException e) {
                if (running) {
                    logger.error("Socket error in accept loop", e);
                }
                break;
            } catch (IOException e) {
                if (running) {
                    logger.error("Error accepting connection", e);
                }
            }
        }
    }
    
    /**
     * Handle an incoming connection
     */
    private void handleConnection(Socket socket) {
        try (
            ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
            ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())
        ) {
            // Read message
            GossipProtocol.GossipMessage message = 
                (GossipProtocol.GossipMessage) in.readObject();
            
            logger.debug("Received {} from {}", message.getType(), message.getFromNodeId());
            
            // Process message
            GossipProtocol.GossipMessage response = processMessage(message);
            
            // Send response if needed
            if (response != null) {
                out.writeObject(response);
                out.flush();
            }
            
        } catch (Exception e) {
            logger.warn("Error handling connection: {}", e.getMessage());
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                // Ignore
            }
        }
    }
    
    /**
     * Process a received message
     */
    private GossipProtocol.GossipMessage processMessage(GossipProtocol.GossipMessage message) {
        switch (message.getType()) {
            case STATE_SYNC:
                // Merge received state
                gossipProtocol.mergeState(message.getNodeStates());
                
                // Send ACK with our state
                return new GossipProtocol.GossipMessage(
                    GossipProtocol.GossipMessage.Type.ACK,
                    gossipProtocol.getLocalNodeId(),
                    message.getFromNodeId(),
                    gossipProtocol.getClusterState(),
                    0
                );
                
            case PING:
                // Respond with ACK
                return new GossipProtocol.GossipMessage(
                    GossipProtocol.GossipMessage.Type.ACK,
                    gossipProtocol.getLocalNodeId(),
                    message.getFromNodeId(),
                    null,
                    0
                );
                
            case JOIN:
                // Add joining node to cluster
                gossipProtocol.mergeState(message.getNodeStates());
                
                // Send full state to new node
                return new GossipProtocol.GossipMessage(
                    GossipProtocol.GossipMessage.Type.STATE_SYNC,
                    gossipProtocol.getLocalNodeId(),
                    message.getFromNodeId(),
                    gossipProtocol.getClusterState(),
                    0
                );
                
            case LEAVE:
                // Mark node as leaving
                logger.info("Node {} is leaving", message.getFromNodeId());
                return null;
                
            case ACK:
                // Acknowledgment received
                return null;
                
            default:
                logger.warn("Unknown message type: {}", message.getType());
                return null;
        }
    }
    
    /**
     * Send a message to a node
     */
    public void sendMessage(String host, int port, GossipProtocol.GossipMessage message) 
            throws IOException {
        
        Socket socket = null;
        try {
            socket = new Socket();
            socket.connect(new InetSocketAddress(host, port), 3000); // 3 second connection timeout
            socket.setSoTimeout(5000); // 5 second read timeout
            
            try (
                ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
                ObjectInputStream in = new ObjectInputStream(socket.getInputStream())
            ) {
                // Send message
                out.writeObject(message);
                out.flush();
                
                // Wait for response if needed
                if (message.getType() == GossipProtocol.GossipMessage.Type.STATE_SYNC ||
                    message.getType() == GossipProtocol.GossipMessage.Type.JOIN ||
                    message.getType() == GossipProtocol.GossipMessage.Type.PING) {
                    
                    try {
                        GossipProtocol.GossipMessage response = 
                            (GossipProtocol.GossipMessage) in.readObject();
                        
                        if (response.getType() == GossipProtocol.GossipMessage.Type.ACK ||
                            response.getType() == GossipProtocol.GossipMessage.Type.STATE_SYNC) {
                            // Merge response state
                            if (response.getNodeStates() != null) {
                                gossipProtocol.mergeState(response.getNodeStates());
                            }
                        }
                    } catch (SocketTimeoutException e) {
                        // No response, that's okay for some message types
                        logger.debug("No response from {}:{}", host, port);
                    }
                }
            }
            
        } catch (ConnectException e) {
            throw new IOException("Could not connect to " + host + ":" + port, e);
        } catch (SocketTimeoutException e) {
            throw new IOException("Timeout connecting to " + host + ":" + port, e);
        } catch (ClassNotFoundException e) {
            throw new IOException("Invalid response from " + host + ":" + port, e);
        } finally {
            if (socket != null && !socket.isClosed()) {
                try {
                    socket.close();
                } catch (IOException e) {
                    // Ignore
                }
            }
        }
    }
    
    /**
     * Shutdown the message handler
     */
    public void shutdown() {
        running = false;
        
        try {
            if (serverSocket != null && !serverSocket.isClosed()) {
                serverSocket.close();
            }
        } catch (IOException e) {
            logger.error("Error closing server socket", e);
        }
        
        if (listenerExecutor != null) {
            listenerExecutor.shutdown();
            try {
                if (!listenerExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                    listenerExecutor.shutdownNow();
                }
            } catch (InterruptedException e) {
                listenerExecutor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
}
