package com.cube.storage;

import java.io.*;
import java.nio.file.*;
import java.util.*;

/**
 * SSTable (Sorted String Table) for Cube database.
 */
public class SSTable implements AutoCloseable {
    
    private static final int MAGIC_NUMBER = 0x43554245; // "CUBE"
    private static final int VERSION = 1;
    
    private final Path filePath;
    private final long creationTime;
    private final long size;
    private final int keyCount;
    
    private SSTable(Path filePath, long creationTime, long size, int keyCount) {
        this.filePath = filePath;
        this.creationTime = creationTime;
        this.size = size;
        this.keyCount = keyCount;
    }
    
    public static SSTable create(Path filePath, Map<String, byte[]> entries) throws IOException {
        long creationTime = System.currentTimeMillis();
        
        try (DataOutputStream dos = new DataOutputStream(
                new BufferedOutputStream(Files.newOutputStream(filePath)))) {
            
            dos.writeInt(MAGIC_NUMBER);
            dos.writeInt(VERSION);
            dos.writeInt(entries.size());
            dos.writeLong(creationTime);
            
            for (Map.Entry<String, byte[]> entry : entries.entrySet()) {
                byte[] keyBytes = entry.getKey().getBytes("UTF-8");
                dos.writeInt(keyBytes.length);
                dos.write(keyBytes);
                
                dos.writeInt(entry.getValue().length);
                dos.write(entry.getValue());
            }
            
            dos.flush();
        }
        
        long size = Files.size(filePath);
        return new SSTable(filePath, creationTime, size, entries.size());
    }
    
    public static SSTable open(Path filePath) throws IOException {
        try (DataInputStream dis = new DataInputStream(
                new BufferedInputStream(Files.newInputStream(filePath)))) {
            
            int magic = dis.readInt();
            if (magic != MAGIC_NUMBER) {
                throw new IOException("Invalid SSTable file");
            }
            
            int version = dis.readInt();
            int keyCount = dis.readInt();
            long creationTime = dis.readLong();
            
            long fileSize = Files.size(filePath);
            return new SSTable(filePath, creationTime, fileSize, keyCount);
        }
    }
    
    public byte[] get(String key) throws IOException {
        try (RandomAccessFile raf = new RandomAccessFile(filePath.toFile(), "r")) {
            raf.seek(4 + 4 + 4 + 8);
            
            int keysRead = 0;
            while (keysRead < keyCount && raf.getFilePointer() < raf.length()) {
                int keyLen = raf.readInt();
                byte[] keyBytes = new byte[keyLen];
                raf.readFully(keyBytes);
                String currentKey = new String(keyBytes, "UTF-8");
                
                int valueLen = raf.readInt();
                
                if (currentKey.equals(key)) {
                    byte[] value = new byte[valueLen];
                    raf.readFully(value);
                    return value;
                } else {
                    raf.skipBytes(valueLen);
                }
                
                keysRead++;
            }
        }
        
        return null;
    }
    
    public Collection<String> scan(String prefix) throws IOException {
        List<String> result = new ArrayList<>();
        
        try (RandomAccessFile raf = new RandomAccessFile(filePath.toFile(), "r")) {
            raf.seek(4 + 4 + 4 + 8);
            
            int keysRead = 0;
            while (keysRead < keyCount && raf.getFilePointer() < raf.length()) {
                int keyLen = raf.readInt();
                byte[] keyBytes = new byte[keyLen];
                raf.readFully(keyBytes);
                String currentKey = new String(keyBytes, "UTF-8");
                
                int valueLen = raf.readInt();
                raf.skipBytes(valueLen);
                
                if (currentKey.startsWith(prefix)) {
                    result.add(currentKey);
                }
                
                keysRead++;
            }
        }
        
        return result;
    }
    
    public Map<String, byte[]> scanEntries(String prefix) throws IOException {
        Map<String, byte[]> result = new TreeMap<>();
        
        try (RandomAccessFile raf = new RandomAccessFile(filePath.toFile(), "r")) {
            raf.seek(4 + 4 + 4 + 8);
            
            int keysRead = 0;
            while (keysRead < keyCount && raf.getFilePointer() < raf.length()) {
                int keyLen = raf.readInt();
                byte[] keyBytes = new byte[keyLen];
                raf.readFully(keyBytes);
                String currentKey = new String(keyBytes, "UTF-8");
                
                int valueLen = raf.readInt();
                byte[] value = new byte[valueLen];
                raf.readFully(value);
                
                if (currentKey.startsWith(prefix)) {
                    result.put(currentKey, value);
                }
                
                keysRead++;
            }
        }
        
        return result;
    }
    
    public Map<String, byte[]> getAll() throws IOException {
        return scanEntries("");
    }
    
    public void delete() throws IOException {
        Files.deleteIfExists(filePath);
    }
    
    public Path getFilePath() { return filePath; }
    public long getCreationTime() { return creationTime; }
    public long getSize() { return size; }
    public int getKeyCount() { return keyCount; }
    
    @Override
    public void close() throws IOException {
    }
    
    @Override
    public String toString() {
        return "SSTable{file=" + filePath.getFileName() + ", keys=" + keyCount + ", size=" + size + "}";
    }
}
