Newer
Older
cactus / src / main / java / com / cube / sql / CubicSQLExecutor.java
@agalyaramadoss agalyaramadoss on 16 Feb 23 KB enhanced shell command with format
package com.cube.sql;

import com.cube.index.CubicIndexTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;

/**
 * Enhanced SQL Executor with Cubic Index support
 * Works with existing SQLParser and SQLExecutor
 */
public class CubicSQLExecutor {
    
    private static final Logger logger = LoggerFactory.getLogger(CubicSQLExecutor.class);
    
    private final SQLExecutor sqlExecutor;
    private final ObjectMapper objectMapper;
    
    // Primary indexes: keyspace.table -> CubicIndexTree
    private final Map<String, CubicIndexTree> primaryIndexes;
    
    // Secondary indexes: keyspace.table.column -> CubicIndexTree  
    private final Map<String, CubicIndexTree> secondaryIndexes;
    
    // Index metadata: index name -> index key (keyspace.table.column)
    private final Map<String, String> indexMetadata;
    
    // Statistics
    private long indexHits = 0;
    private long indexMisses = 0;
    private long queriesOptimized = 0;
    private long indexCreations = 0;
    
    public CubicSQLExecutor(SQLExecutor sqlExecutor) {
        this.sqlExecutor = sqlExecutor;
        this.objectMapper = new ObjectMapper();
        this.primaryIndexes = new HashMap<>();
        this.secondaryIndexes = new HashMap<>();
        this.indexMetadata = new HashMap<>();
        
        logger.info("Cubic SQL Executor initialized");
    }
    
    /**
     * Execute SQL with automatic index optimization
     */
    public SQLExecutor.SQLResult executeWithIndex(String sql) {
        try {
            // Check for index-specific commands
            String upperSQL = sql.toUpperCase().trim();
            
            if (upperSQL.startsWith("CREATE INDEX")) {
                return handleCreateIndex(sql);
            } else if (upperSQL.startsWith("DROP INDEX")) {
                return handleDropIndex(sql);
            } else if (upperSQL.startsWith("SHOW INDEXES")) {
                return handleShowIndexes(sql);
            }
            
            // Parse regular SQL
            SQLParser.ParsedSQL parsed = SQLParser.parse(sql);
            
            // Handle based on type
            switch (parsed.getType()) {
                case CREATE_TABLE:
                    return handleCreateTable(sql, parsed);
                    
                case SELECT:
                    return handleSelectWithIndex(sql, parsed);
                    
                case INSERT:
                    return handleInsertWithIndex(sql, parsed);
                    
                case UPDATE:
                    return handleUpdateWithIndex(sql, parsed);
                    
                case DELETE:
                    return handleDeleteWithIndex(sql, parsed);
                    
                default:
                    return sqlExecutor.execute(sql);
            }
            
        } catch (Exception e) {
            logger.error("Error executing SQL with index: {}", e.getMessage(), e);
            return SQLExecutor.SQLResult.error(e.getMessage());
        }
    }
    
    /**
     * Handle CREATE TABLE - automatically create primary index
     */
    private SQLExecutor.SQLResult handleCreateTable(String sql, SQLParser.ParsedSQL parsed) {
        // Execute the CREATE TABLE
        SQLExecutor.SQLResult result = sqlExecutor.execute(sql);
        
        if (result.isSuccess()) {
            // Create primary index
            String tableKey = parsed.getKeyspace() + "." + parsed.getTable();
            CubicIndexTree primaryIndex = new CubicIndexTree(3, 15, true);
            primaryIndexes.put(tableKey, primaryIndex);
            
            logger.info("Created primary cubic index for table: {}", tableKey);
            indexCreations++;
        }
        
        return result;
    }
    
    /**
     * Handle CREATE INDEX
     * Syntax: CREATE INDEX idx_name ON table(column)
     */
    private SQLExecutor.SQLResult handleCreateIndex(String sql) {
        try {
            // Simple regex parsing
            String pattern = "CREATE\\s+INDEX\\s+(\\w+)\\s+ON\\s+([\\w.]+)\\s*\\(\\s*(\\w+)\\s*\\)";
            java.util.regex.Pattern p = java.util.regex.Pattern.compile(pattern, java.util.regex.Pattern.CASE_INSENSITIVE);
            java.util.regex.Matcher m = p.matcher(sql);
            
            if (!m.find()) {
                return SQLExecutor.SQLResult.error("Invalid CREATE INDEX syntax");
            }
            
            String indexName = m.group(1);
            String tableName = m.group(2);
            String columnName = m.group(3);
            
            // Parse table name
            String keyspace = "default";
            String table = tableName;
            if (tableName.contains(".")) {
                String[] parts = tableName.split("\\.");
                keyspace = parts[0];
                table = parts[1];
            }
            
            // Create index key
            String indexKey = keyspace + "." + table + "." + columnName;
            
            // Check if exists
            if (secondaryIndexes.containsKey(indexKey)) {
                return SQLExecutor.SQLResult.error("Index already exists on " + table + "." + columnName);
            }
            
            // Create secondary index
            CubicIndexTree secondaryIndex = new CubicIndexTree(3, 15, true);
            secondaryIndexes.put(indexKey, secondaryIndex);
            indexMetadata.put(indexName, indexKey);
            
            // Populate with existing data
            int keysIndexed = populateSecondaryIndex(keyspace, table, columnName, secondaryIndex);
            
            logger.info("Created secondary index '{}' on {}.{} ({} keys)", 
                indexName, table, columnName, keysIndexed);
            indexCreations++;
            
            return SQLExecutor.SQLResult.success("Index created: " + indexName + 
                " on " + table + "(" + columnName + ") - " + keysIndexed + " keys indexed");
            
        } catch (Exception e) {
            logger.error("Error creating index: {}", e.getMessage());
            return SQLExecutor.SQLResult.error(e.getMessage());
        }
    }
    
    /**
     * Handle DROP INDEX
     */
    private SQLExecutor.SQLResult handleDropIndex(String sql) {
        try {
            String pattern = "DROP\\s+INDEX\\s+(\\w+)";
            java.util.regex.Pattern p = java.util.regex.Pattern.compile(pattern, java.util.regex.Pattern.CASE_INSENSITIVE);
            java.util.regex.Matcher m = p.matcher(sql);
            
            if (!m.find()) {
                return SQLExecutor.SQLResult.error("Invalid DROP INDEX syntax");
            }
            
            String indexName = m.group(1);
            String indexKey = indexMetadata.remove(indexName);
            
            if (indexKey != null) {
                CubicIndexTree removed = secondaryIndexes.remove(indexKey);
                if (removed != null) {
                    removed.clear();
                }
                logger.info("Dropped index: {}", indexName);
                return SQLExecutor.SQLResult.success("Index dropped: " + indexName);
            } else {
                return SQLExecutor.SQLResult.error("Index not found: " + indexName);
            }
            
        } catch (Exception e) {
            return SQLExecutor.SQLResult.error(e.getMessage());
        }
    }
    
    /**
     * Handle SHOW INDEXES
     */
    private SQLExecutor.SQLResult handleShowIndexes(String sql) {
        try {
            String pattern = "SHOW\\s+INDEXES\\s+ON\\s+([\\w.]+)";
            java.util.regex.Pattern p = java.util.regex.Pattern.compile(pattern, java.util.regex.Pattern.CASE_INSENSITIVE);
            java.util.regex.Matcher m = p.matcher(sql);
            
            if (!m.find()) {
                return SQLExecutor.SQLResult.error("Invalid SHOW INDEXES syntax");
            }
            
            String tableName = m.group(1);
            String keyspace = "default";
            String table = tableName;
            
            if (tableName.contains(".")) {
                String[] parts = tableName.split("\\.");
                keyspace = parts[0];
                table = parts[1];
            }
            
            String tableKey = keyspace + "." + table;
            
            StringBuilder result = new StringBuilder();
            result.append("Indexes on ").append(tableKey).append(":\n");
            
            // Primary index
            if (primaryIndexes.containsKey(tableKey)) {
                CubicIndexTree idx = primaryIndexes.get(tableKey);
                result.append("  PRIMARY: ").append(idx.getTotalSize()).append(" keys\n");
            }
            
            // Secondary indexes
            for (Map.Entry<String, String> entry : indexMetadata.entrySet()) {
                String idxName = entry.getKey();
                String idxKey = entry.getValue();
                
                if (idxKey.startsWith(tableKey + ".")) {
                    String column = idxKey.substring(tableKey.length() + 1);
                    CubicIndexTree idx = secondaryIndexes.get(idxKey);
                    if (idx != null) {
                        result.append("  ").append(idxName).append(" (").append(column)
                              .append("): ").append(idx.getTotalSize()).append(" keys\n");
                    }
                }
            }
            
            return SQLExecutor.SQLResult.success(result.toString());
            
        } catch (Exception e) {
            return SQLExecutor.SQLResult.error(e.getMessage());
        }
    }
    
    /**
     * Handle SELECT with index optimization
     */
    private SQLExecutor.SQLResult handleSelectWithIndex(String sql, SQLParser.ParsedSQL parsed) {
        String tableKey = parsed.getKeyspace() + "." + parsed.getTable();
        CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
        
        // Check if we can use index
        if (primaryIndex != null && parsed.getWhereClause() != null && !parsed.getWhereClause().isEmpty()) {
            String whereColumn = parsed.getWhereClause().keySet().iterator().next();
            String whereValue = parsed.getWhereClause().values().iterator().next();
            
            // Check if querying by primary key
            if (isPrimaryKey(whereColumn)) {
                return executeIndexedSelect(primaryIndex, whereValue, tableKey);
            }
            
            // Check for secondary index
            String secondaryIndexKey = tableKey + "." + whereColumn;
            CubicIndexTree secondaryIndex = secondaryIndexes.get(secondaryIndexKey);
            
            if (secondaryIndex != null) {
                return executeSecondaryIndexSelect(primaryIndex, secondaryIndex, whereValue, tableKey);
            }
        }
        
        // Fall back to regular query
        indexMisses++;
        return sqlExecutor.execute(sql);
    }
    
    /**
     * Execute SELECT using primary index
     */
    private SQLExecutor.SQLResult executeIndexedSelect(CubicIndexTree index, String key, String table) {
        try {
            byte[] data = index.get(key);
            
            if (data != null) {
                indexHits++;
                queriesOptimized++;
                
                Map<String, String> row = deserializeRow(data);
                List<Map<String, String>> rows = Collections.singletonList(row);
                
                logger.debug("Index hit for key: {} (level: {})", key, index.calculateLevel(key));
                
                return SQLExecutor.SQLResult.success("Query executed (cubic-index-optimized)")
                    .withRows(rows);
            } else {
                return SQLExecutor.SQLResult.success("Query executed").withRows(Collections.emptyList());
            }
            
        } catch (Exception e) {
            logger.error("Error in indexed select: {}", e.getMessage());
            indexMisses++;
            return SQLExecutor.SQLResult.error(e.getMessage());
        }
    }
    
    /**
     * Execute SELECT using secondary index
     */
    private SQLExecutor.SQLResult executeSecondaryIndexSelect(CubicIndexTree primaryIndex,
                                                               CubicIndexTree secondaryIndex, 
                                                               String columnValue,
                                                               String table) {
        try {
            byte[] primaryKeyData = secondaryIndex.get(columnValue);
            
            if (primaryKeyData != null) {
                String primaryKey = new String(primaryKeyData, StandardCharsets.UTF_8);
                byte[] rowData = primaryIndex.get(primaryKey);
                
                if (rowData != null) {
                    indexHits++;
                    queriesOptimized++;
                    
                    Map<String, String> row = deserializeRow(rowData);
                    List<Map<String, String>> rows = Collections.singletonList(row);
                    
                    logger.debug("Secondary index hit: {} -> {}", columnValue, primaryKey);
                    
                    return SQLExecutor.SQLResult.success("Query executed (secondary-index-optimized)")
                        .withRows(rows);
                }
            }
            
            return SQLExecutor.SQLResult.success("Query executed").withRows(Collections.emptyList());
            
        } catch (Exception e) {
            logger.error("Error in secondary index select: {}", e.getMessage());
            return SQLExecutor.SQLResult.error(e.getMessage());
        }
    }
    
    /**
     * Handle INSERT with index update
     */
    private SQLExecutor.SQLResult handleInsertWithIndex(String sql, SQLParser.ParsedSQL parsed) {
        SQLExecutor.SQLResult result = sqlExecutor.execute(sql);
        
        if (result.isSuccess()) {
            updateIndexesOnInsert(parsed);
        }
        
        return result;
    }
    
    /**
     * Handle UPDATE with index update
     */
    private SQLExecutor.SQLResult handleUpdateWithIndex(String sql, SQLParser.ParsedSQL parsed) {
        SQLExecutor.SQLResult result = sqlExecutor.execute(sql);
        
        if (result.isSuccess()) {
            updateIndexesOnUpdate(parsed);
        }
        
        return result;
    }
    
    /**
     * Handle DELETE with index update
     */
    private SQLExecutor.SQLResult handleDeleteWithIndex(String sql, SQLParser.ParsedSQL parsed) {
        // Remove from indexes first
        if (parsed.getWhereClause() != null && !parsed.getWhereClause().isEmpty()) {
            String whereValue = parsed.getWhereClause().values().iterator().next();
            String tableKey = parsed.getKeyspace() + "." + parsed.getTable();
            
            CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
            if (primaryIndex != null) {
                primaryIndex.remove(whereValue);
                logger.debug("Removed from primary index: {}", whereValue);
            }
            
            removeFromSecondaryIndexes(tableKey, whereValue);
        }
        
        return sqlExecutor.execute(sql);
    }
    
    /**
     * Update indexes after INSERT
     */
    private void updateIndexesOnInsert(SQLParser.ParsedSQL parsed) {
        String tableKey = parsed.getKeyspace() + "." + parsed.getTable();
        CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
        
        if (primaryIndex != null && parsed.getColumns() != null && !parsed.getColumns().isEmpty()) {
            Map<String, String> columns = parsed.getColumns();
            
            // Get primary key (first value)
            String primaryKey = columns.values().iterator().next();
            
            if (primaryKey != null) {
                byte[] rowData = serializeRow(columns);
                primaryIndex.put(primaryKey, rowData);
                
                logger.debug("Updated primary index for key: {} at level: {}",
                    primaryKey, primaryIndex.calculateLevel(primaryKey));
                
                // Update secondary indexes
                updateSecondaryIndexes(tableKey, primaryKey, columns);
            }
        }
    }
    
    /**
     * Update indexes after UPDATE
     */
    private void updateIndexesOnUpdate(SQLParser.ParsedSQL parsed) {
        if (parsed.getWhereClause() == null || parsed.getWhereClause().isEmpty()) {
            return;
        }
        
        String whereValue = parsed.getWhereClause().values().iterator().next();
        String tableKey = parsed.getKeyspace() + "." + parsed.getTable();
        CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
        
        if (primaryIndex != null && parsed.getColumns() != null) {
            byte[] rowData = serializeRow(parsed.getColumns());
            primaryIndex.put(whereValue, rowData);
            
            logger.debug("Updated primary index for key: {}", whereValue);
            updateSecondaryIndexes(tableKey, whereValue, parsed.getColumns());
        }
    }
    
    /**
     * Update secondary indexes
     */
    private void updateSecondaryIndexes(String tableKey, String primaryKey, Map<String, String> columns) {
        for (Map.Entry<String, String> entry : columns.entrySet()) {
            String column = entry.getKey();
            String value = entry.getValue();
            
            String indexKey = tableKey + "." + column;
            CubicIndexTree secondaryIndex = secondaryIndexes.get(indexKey);
            
            if (secondaryIndex != null) {
                secondaryIndex.put(value, primaryKey.getBytes(StandardCharsets.UTF_8));
                logger.debug("Updated secondary index {}: {} -> {}", column, value, primaryKey);
            }
        }
    }
    
    /**
     * Populate secondary index with existing data
     */
    private int populateSecondaryIndex(String keyspace, String table, String column, CubicIndexTree index) {
        String tableKey = keyspace + "." + table;
        CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
        
        if (primaryIndex == null) {
            return 0;
        }
        
        int count = 0;
        Set<String> allKeys = primaryIndex.getAllKeys();
        
        for (String primaryKey : allKeys) {
            byte[] rowData = primaryIndex.get(primaryKey);
            if (rowData != null) {
                Map<String, String> row = deserializeRow(rowData);
                String columnValue = row.get(column);
                
                if (columnValue != null) {
                    index.put(columnValue, primaryKey.getBytes(StandardCharsets.UTF_8));
                    count++;
                }
            }
        }
        
        return count;
    }
    
    /**
     * Remove from secondary indexes
     */
    private void removeFromSecondaryIndexes(String tableKey, String primaryKey) {
        for (Map.Entry<String, CubicIndexTree> entry : secondaryIndexes.entrySet()) {
            String indexKey = entry.getKey();
            
            if (indexKey.startsWith(tableKey + ".")) {
                CubicIndexTree index = entry.getValue();
                Set<String> keysToRemove = new HashSet<>();
                
                for (String key : index.getAllKeys()) {
                    byte[] value = index.get(key);
                    if (value != null && new String(value, StandardCharsets.UTF_8).equals(primaryKey)) {
                        keysToRemove.add(key);
                    }
                }
                
                for (String key : keysToRemove) {
                    index.remove(key);
                }
            }
        }
    }
    
    /**
     * Check if column is primary key
     */
    private boolean isPrimaryKey(String column) {
        return "id".equals(column) || column.endsWith("_id") || "pk".equals(column) || column.endsWith("_pk");
    }
    
    /**
     * Serialize row to bytes
     */
    private byte[] serializeRow(Map<String, String> columns) {
        try {
            String json = objectMapper.writeValueAsString(columns);
            return json.getBytes(StandardCharsets.UTF_8);
        } catch (Exception e) {
            logger.error("Error serializing row: {}", e.getMessage());
            return new byte[0];
        }
    }
    
    /**
     * Deserialize row from bytes
     */
    private Map<String, String> deserializeRow(byte[] data) {
        try {
            String json = new String(data, StandardCharsets.UTF_8);
            return objectMapper.readValue(json, new TypeReference<Map<String, String>>() {});
        } catch (Exception e) {
            logger.error("Error deserializing row: {}", e.getMessage());
            return new LinkedHashMap<>();
        }
    }
    
    /**
     * Get index statistics
     */
    public Map<String, Object> getIndexStats() {
        Map<String, Object> stats = new LinkedHashMap<>();
        
        stats.put("indexHits", indexHits);
        stats.put("indexMisses", indexMisses);
        stats.put("hitRate", indexHits + indexMisses > 0 ? 
            String.format("%.2f%%", (double) indexHits / (indexHits + indexMisses) * 100) : "0.00%");
        stats.put("queriesOptimized", queriesOptimized);
        stats.put("primaryIndexes", primaryIndexes.size());
        stats.put("secondaryIndexes", secondaryIndexes.size());
        stats.put("totalIndexes", primaryIndexes.size() + secondaryIndexes.size());
        stats.put("indexCreations", indexCreations);
        
        // Per-table stats
        Map<String, Object> tableStats = new LinkedHashMap<>();
        for (Map.Entry<String, CubicIndexTree> entry : primaryIndexes.entrySet()) {
            Map<String, Object> indexStats = entry.getValue().getStats();
            tableStats.put(entry.getKey(), indexStats);
        }
        stats.put("primaryIndexDetails", tableStats);
        
        // Secondary index stats
        Map<String, Object> secondaryStats = new LinkedHashMap<>();
        for (Map.Entry<String, String> entry : indexMetadata.entrySet()) {
            String indexName = entry.getKey();
            String indexKey = entry.getValue();
            CubicIndexTree index = secondaryIndexes.get(indexKey);
            
            if (index != null) {
                Map<String, Object> indexInfo = new LinkedHashMap<>();
                indexInfo.put("indexKey", indexKey);
                indexInfo.put("totalKeys", index.getTotalSize());
                indexInfo.put("levels", index.getLevelCount());
                secondaryStats.put(indexName, indexInfo);
            }
        }
        stats.put("secondaryIndexDetails", secondaryStats);
        
        return stats;
    }
    
    /**
     * Clear all indexes
     */
    public void clearIndexes() {
        for (CubicIndexTree index : primaryIndexes.values()) {
            index.clear();
        }
        for (CubicIndexTree index : secondaryIndexes.values()) {
            index.clear();
        }
        indexMetadata.clear();
        
        indexHits = 0;
        indexMisses = 0;
        queriesOptimized = 0;
        indexCreations = 0;
        
        logger.info("Cleared all cubic indexes");
    }
    
    /**
     * Rebalance all indexes
     */
    public void rebalanceAllIndexes() {
        logger.info("Rebalancing all cubic indexes...");
        
        int rebalanced = 0;
        for (CubicIndexTree index : primaryIndexes.values()) {
            index.rebalance();
            rebalanced++;
        }
        
        for (CubicIndexTree index : secondaryIndexes.values()) {
            index.rebalance();
            rebalanced++;
        }
        
        logger.info("Rebalanced {} cubic indexes", rebalanced);
    }
}