Newer
Older
cactus / src / main / java / com / cube / sql / CubicSQLExecutor.java
@agalyaramadoss agalyaramadoss on 16 Feb 33 KB added document
package com.cube.sql;

import com.cube.index.CubicIndexTree;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.*;

/**
 * Enhanced SQL Executor with Cubic Index support
 * 
 * Features:
 * - Automatic index creation for frequently queried columns
 * - Range query optimization using Cubic Index
 * - Prefix search acceleration
 * - Secondary index support
 * - Index statistics and monitoring
 */
public class CubicSQLExecutor {
    
    private static final Logger logger = LoggerFactory.getLogger(CubicSQLExecutor.class);
    
    private final SQLParser sqlParser;
    private final SQLExecutor sqlExecutor;
    private final ObjectMapper objectMapper;
    
    // Primary indexes: keyspace.table -> CubicIndexTree
    // Maps primary key -> serialized row data
    private final Map<String, CubicIndexTree> primaryIndexes;
    
    // Secondary indexes: keyspace.table.column -> CubicIndexTree  
    // Maps column value -> primary key(s)
    private final Map<String, CubicIndexTree> secondaryIndexes;
    
    // Index metadata: index name -> index key
    private final Map<String, String> indexMetadata;
    
    // Index statistics
    private long indexHits = 0;
    private long indexMisses = 0;
    private long queriesOptimized = 0;
    private long indexCreations = 0;
    
    public CubicSQLExecutor(SQLParser sqlParser, SQLExecutor sqlExecutor) {
        this.sqlParser = sqlParser;
        this.sqlExecutor = sqlExecutor;
        this.objectMapper = new ObjectMapper();
        this.primaryIndexes = new HashMap<>();
        this.secondaryIndexes = new HashMap<>();
        this.indexMetadata = new HashMap<>();
        
        logger.info("Cubic SQL Executor initialized with index support");
    }
    
    /**
     * Execute SQL with automatic index optimization
     */
    public Map<String, Object> executeWithIndex(String sql) {
        try {
            IndexedParsedSQL parsed = CubicIndexSQLParser.parseWithIndex(sql);
            
            switch (parsed.getType()) {
                case CREATE_TABLE:
                    return handleCreateTable(parsed);
                    
                case CREATE_INDEX:
                    return handleCreateIndex(parsed);
                    
                case DROP_INDEX:
                    return handleDropIndex(parsed);
                    
                case SHOW_INDEXES:
                    return handleShowIndexes(parsed);
                    
                case SELECT:
                    return handleSelectWithIndex(parsed);
                    
                case INSERT:
                    return handleInsertWithIndex(parsed);
                    
                case UPDATE:
                    return handleUpdateWithIndex(parsed);
                    
                case DELETE:
                    return handleDeleteWithIndex(parsed);
                    
                default:
                    // Fall back to regular SQL executor
                    return sqlExecutor.execute(sql);
            }
            
        } catch (Exception e) {
            logger.error("Error executing SQL with index: {}", e.getMessage(), e);
            return createErrorResult(e.getMessage());
        }
    }
    
    
    /**
     * Handle CREATE TABLE - automatically create primary index
     */
    private Map<String, Object> handleCreateTable(IndexedParsedSQL parsed) {
        // Execute the CREATE TABLE
        Map<String, Object> result = sqlExecutor.execute(parsed.getOriginalSQL());
        
        if (isSuccess(result)) {
            // Create primary index for the table
            String indexKey = parsed.getFullTableName();
            CubicIndexTree primaryIndex = new CubicIndexTree(3, 15, true);
            primaryIndexes.put(indexKey, primaryIndex);
            
            logger.info("Created primary cubic index for table: {}", indexKey);
            indexCreations++;
        }
        
        return result;
    }
    
    /**
     * Handle CREATE INDEX - create secondary index on column
     * 
     * Syntax: CREATE INDEX idx_name ON table(column)
     */
    private Map<String, Object> handleCreateIndex(IndexedParsedSQL parsed) {
        String indexName = parsed.getIndexName();
        String table = parsed.getTable();
        String column = parsed.getIndexColumn();
        String keyspace = parsed.getKeyspace();
        
        // Create index key
        String indexKey = CubicIndexSQLParser.generateIndexKey(keyspace, table, column);
        
        // Check if index already exists
        if (secondaryIndexes.containsKey(indexKey)) {
            return createErrorResult("Index already exists on " + table + "." + column);
        }
        
        // Create secondary index
        CubicIndexTree secondaryIndex = new CubicIndexTree(3, 15, true);
        secondaryIndexes.put(indexKey, secondaryIndex);
        indexMetadata.put(indexName, indexKey);
        
        // Populate index with existing data
        int keysIndexed = populateSecondaryIndex(keyspace, table, column, secondaryIndex);
        
        logger.info("Created secondary cubic index '{}' on {}.{} ({} keys indexed)", 
            indexName, table, column, keysIndexed);
        indexCreations++;
        
        Map<String, Object> result = new LinkedHashMap<>();
        result.put("success", true);
        result.put("message", "Index created: " + indexName + " on " + table + "(" + column + ")");
        result.put("keysIndexed", keysIndexed);
        result.put("indexType", "CUBIC");
        return result;
    }
    
    /**
     * Handle DROP INDEX
     */
    private Map<String, Object> handleDropIndex(IndexedParsedSQL parsed) {
        String indexName = parsed.getIndexName();
        
        // Find and remove index
        String indexKey = indexMetadata.remove(indexName);
        
        if (indexKey != null) {
            CubicIndexTree removed = secondaryIndexes.remove(indexKey);
            if (removed != null) {
                removed.clear();
            }
            
            logger.info("Dropped index: {}", indexName);
            
            Map<String, Object> result = new LinkedHashMap<>();
            result.put("success", true);
            result.put("message", "Index dropped: " + indexName);
            return result;
        } else {
            return createErrorResult("Index not found: " + indexName);
        }
    }
    
    /**
     * Handle SHOW INDEXES
     */
    private Map<String, Object> handleShowIndexes(IndexedParsedSQL parsed) {
        String tableKey = parsed.getFullTableName();
        
        List<Map<String, Object>> indexes = new ArrayList<>();
        
        // Add primary index
        if (primaryIndexes.containsKey(tableKey)) {
            Map<String, Object> indexInfo = new LinkedHashMap<>();
            indexInfo.put("name", "PRIMARY");
            indexInfo.put("column", parsed.getPrimaryKey() != null ? parsed.getPrimaryKey() : "id");
            indexInfo.put("type", "PRIMARY");
            indexInfo.put("keys", primaryIndexes.get(tableKey).getTotalSize());
            indexes.add(indexInfo);
        }
        
        // Add secondary indexes
        for (Map.Entry<String, String> entry : indexMetadata.entrySet()) {
            String indexName = entry.getKey();
            String indexKey = entry.getValue();
            
            if (indexKey.startsWith(tableKey + ".")) {
                String[] parts = CubicIndexSQLParser.parseIndexKey(indexKey);
                if (parts != null) {
                    Map<String, Object> indexInfo = new LinkedHashMap<>();
                    indexInfo.put("name", indexName);
                    indexInfo.put("column", parts[2]);
                    indexInfo.put("type", "SECONDARY");
                    indexInfo.put("keys", secondaryIndexes.get(indexKey).getTotalSize());
                    indexes.add(indexInfo);
                }
            }
        }
        
        Map<String, Object> result = new LinkedHashMap<>();
        result.put("success", true);
        result.put("table", tableKey);
        result.put("indexes", indexes);
        result.put("indexCount", indexes.size());
        return result;
    }
    
    /**
     * Handle SELECT with index optimization
     */
    private Map<String, Object> handleSelectWithIndex(IndexedParsedSQL parsed) {
        String tableKey = parsed.getFullTableName();
        CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
        
        // Check if we can use index
        if (primaryIndex != null && parsed.hasWhereClause()) {
            String whereColumn = parsed.getWhereColumn();
            String whereValue = parsed.getWhereValue();
            
            // Check if querying by primary key
            if (isPrimaryKeyColumn(whereColumn, parsed.getPrimaryKey())) {
                return executeIndexedSelect(parsed, primaryIndex, whereValue);
            }
            
            // Check for secondary index
            String secondaryIndexKey = tableKey + "." + whereColumn;
            CubicIndexTree secondaryIndex = secondaryIndexes.get(secondaryIndexKey);
            
            if (secondaryIndex != null) {
                return executeSecondaryIndexSelect(parsed, primaryIndex, secondaryIndex, whereValue);
            }
        }
        
        // Fall back to regular query
        indexMisses++;
        return sqlExecutor.execute(parsed.getOriginalSQL());
    }
    
    /**
     * Execute SELECT using primary index
     */
    private Map<String, Object> executeIndexedSelect(IndexedParsedSQL parsed, 
                                                      CubicIndexTree index, 
                                                      String key) {
        try {
            // Use cubic index for fast lookup
            byte[] data = index.get(key);
            
            if (data != null) {
                indexHits++;
                queriesOptimized++;
                
                // Deserialize row
                Map<String, String> row = deserializeRow(data);
                List<Map<String, String>> rows = Collections.singletonList(row);
                
                logger.debug("Index hit for key: {} (cubic level: {})", 
                    key, index.calculateLevel(key));
                
                Map<String, Object> result = new LinkedHashMap<>();
                result.put("success", true);
                result.put("message", "Query executed (cubic-index-optimized)");
                result.put("rows", rows);
                result.put("rowCount", 1);
                result.put("indexUsed", "PRIMARY");
                result.put("cubicLevel", index.calculateLevel(key));
                return result;
            } else {
                indexMisses++;
                Map<String, Object> result = new LinkedHashMap<>();
                result.put("success", true);
                result.put("rows", Collections.emptyList());
                result.put("rowCount", 0);
                return result;
            }
            
        } catch (Exception e) {
            logger.error("Error in indexed select: {}", e.getMessage());
            // Fall back to regular query
            return sqlExecutor.execute(parsed.getOriginalSQL());
        }
    }
    
    /**
     * Execute SELECT using secondary index
     */
    private Map<String, Object> executeSecondaryIndexSelect(IndexedParsedSQL parsed,
                                                             CubicIndexTree primaryIndex,
                                                             CubicIndexTree secondaryIndex, 
                                                             String columnValue) {
        try {
            // Secondary index maps column value -> primary key
            byte[] primaryKeyData = secondaryIndex.get(columnValue);
            
            if (primaryKeyData != null) {
                String primaryKey = new String(primaryKeyData, StandardCharsets.UTF_8);
                
                // Now lookup by primary key
                byte[] rowData = primaryIndex.get(primaryKey);
                
                if (rowData != null) {
                    indexHits++;
                    queriesOptimized++;
                    
                    Map<String, String> row = deserializeRow(rowData);
                    List<Map<String, String>> rows = Collections.singletonList(row);
                    
                    logger.debug("Secondary index hit for value: {} -> key: {}", 
                        columnValue, primaryKey);
                    
                    Map<String, Object> result = new LinkedHashMap<>();
                    result.put("success", true);
                    result.put("message", "Query executed (secondary-index-optimized)");
                    result.put("rows", rows);
                    result.put("rowCount", 1);
                    result.put("indexUsed", "SECONDARY");
                    return result;
                }
            }
            
            Map<String, Object> result = new LinkedHashMap<>();
            result.put("success", true);
            result.put("rows", Collections.emptyList());
            result.put("rowCount", 0);
            return result;
            
        } catch (Exception e) {
            logger.error("Error in secondary index select: {}", e.getMessage());
            return sqlExecutor.execute(parsed.getOriginalSQL());
        }
    }
    
    /**
     * Handle INSERT with index update
     */
    private Map<String, Object> handleInsertWithIndex(IndexedParsedSQL parsed) {
        // Execute the INSERT
        Map<String, Object> result = sqlExecutor.execute(parsed.getOriginalSQL());
        
        if (isSuccess(result)) {
            // Update indexes
            updateIndexesOnInsert(parsed);
        }
        
        return result;
    }
    
    /**
     * Handle UPDATE with index update
     */
    private Map<String, Object> handleUpdateWithIndex(IndexedParsedSQL parsed) {
        // Execute the UPDATE
        Map<String, Object> result = sqlExecutor.execute(parsed.getOriginalSQL());
        
        if (isSuccess(result)) {
            // Update indexes
            updateIndexesOnUpdate(parsed);
        }
        
        return result;
    }
    
    /**
     * Handle DELETE with index update
     */
    private Map<String, Object> handleDeleteWithIndex(IndexedParsedSQL parsed) {
        String whereValue = parsed.getWhereValue();
        
        if (whereValue != null) {
            // Remove from indexes first
            String tableKey = parsed.getFullTableName();
            CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
            
            if (primaryIndex != null) {
                primaryIndex.remove(whereValue);
                logger.debug("Removed from primary index: {}", whereValue);
            }
            
            // Remove from secondary indexes
            removeFromSecondaryIndexes(tableKey, whereValue);
        }
        
        // Execute the DELETE
        return sqlExecutor.execute(parsed.getOriginalSQL());
    }
    
    /**
     * Update indexes after INSERT
     */
    private void updateIndexesOnInsert(IndexedParsedSQL parsed) {
        String tableKey = parsed.getFullTableName();
        CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
        
        if (primaryIndex != null) {
            Map<String, String> columns = parsed.getColumns();
            
            if (!columns.isEmpty()) {
                // Get primary key (first column or specified primary key)
                String primaryKey = parsed.getPrimaryKey();
                if (primaryKey == null) {
                    primaryKey = columns.values().iterator().next();
                } else {
                    primaryKey = columns.get(parsed.getPrimaryKey());
                }
                
                if (primaryKey != null) {
                    // Serialize row data
                    byte[] rowData = serializeRow(columns);
                    primaryIndex.put(primaryKey, rowData);
                    
                    logger.debug("Updated primary index for key: {} at cubic level: {}",
                        primaryKey, primaryIndex.calculateLevel(primaryKey));
                    
                    // Update secondary indexes
                    updateSecondaryIndexes(tableKey, primaryKey, columns);
                }
            }
        }
    }
    
    /**
     * Handle CREATE INDEX - create secondary index on column
     */
    private SQLResult handleCreateIndex(SQLParser.ParsedSQL parsed) {
        String indexName = parsed.getIndexName();
        String table = parsed.getTable();
        String column = parsed.getColumn();
        String keyspace = parsed.getKeyspace();
        
        // Create secondary index
        String indexKey = keyspace + "." + table + "." + column;
        CubicIndexTree secondaryIndex = new CubicIndexTree(3, 15, true);
        secondaryIndexes.put(indexKey, secondaryIndex);
        
        // Populate index with existing data
        int keysIndexed = populateSecondaryIndex(keyspace, table, column, secondaryIndex);
        
        logger.info("Created secondary cubic index '{}' on {}.{} ({} keys indexed)", 
            indexName, table, column, keysIndexed);
        
        return SQLResult.success("Index created: " + indexName + " on " + table + "(" + column + ")")
            .withRowsAffected(keysIndexed);
    }
    
    /**
     * Handle SELECT with index optimization
     */
    private SQLResult handleSelectWithIndex(SQLParser.ParsedSQL parsed) {
        String indexKey = parsed.getKeyspace() + "." + parsed.getTable();
        CubicIndexTree primaryIndex = tableIndexes.get(indexKey);
        
        // Check if we can use index
        if (primaryIndex != null && parsed.hasWhereClause()) {
            String whereColumn = parsed.getWhereColumn();
            String whereValue = parsed.getWhereValue();
            
            // Check if querying by primary key or indexed column
            if (isPrimaryKeyColumn(whereColumn, parsed.getTable())) {
                return executeIndexedSelect(parsed, primaryIndex);
            }
            
            // Check for secondary index
            String secondaryIndexKey = indexKey + "." + whereColumn;
            CubicIndexTree secondaryIndex = secondaryIndexes.get(secondaryIndexKey);
            
            if (secondaryIndex != null) {
                return executeSecondaryIndexSelect(parsed, secondaryIndex);
            }
        }
        
        // Fall back to regular query
        indexMisses++;
        return executeSQLParser(parsed.getOriginalSQL(), parsed);
    }
    
    /**
     * Execute SELECT using primary index
     */
    private SQLResult executeIndexedSelect(SQLParser.ParsedSQL parsed, CubicIndexTree index) {
        try {
            String whereValue = parsed.getWhereValue();
            
            // Use cubic index for fast lookup
            byte[] data = index.get(whereValue);
            
            if (data != null) {
                indexHits++;
                queriesOptimized++;
                
                // Deserialize and format result
                Map<String, String> row = deserializeRow(data);
                List<Map<String, String>> rows = Collections.singletonList(row);
                
                logger.debug("Index hit for key: {}", whereValue);
                return SQLResult.success("Query executed (index-optimized)")
                    .withRows(rows)
                    .withRowCount(1);
            } else {
                indexMisses++;
                return SQLResult.success("Query executed").withRowCount(0);
            }
            
        } catch (Exception e) {
            logger.error("Error in indexed select: {}", e.getMessage());
            // Fall back to regular query
            return executeSQLParser(parsed.getOriginalSQL(), parsed);
        }
    }
    
    /**
     * Execute SELECT using secondary index
     */
    private SQLResult executeSecondaryIndexSelect(SQLParser.ParsedSQL parsed, CubicIndexTree secondaryIndex) {
        try {
            String whereValue = parsed.getWhereValue();
            
            // Secondary index maps column value -> primary key
            byte[] primaryKeyData = secondaryIndex.get(whereValue);
            
            if (primaryKeyData != null) {
                String primaryKey = new String(primaryKeyData);
                
                // Now lookup by primary key
                String indexKey = parsed.getKeyspace() + "." + parsed.getTable();
                CubicIndexTree primaryIndex = tableIndexes.get(indexKey);
                
                if (primaryIndex != null) {
                    return executeIndexedSelectByPrimaryKey(parsed, primaryIndex, primaryKey);
                }
            }
            
            return SQLResult.success("Query executed").withRowCount(0);
            
        } catch (Exception e) {
            logger.error("Error in secondary index select: {}", e.getMessage());
            return executeSQLParser(parsed.getOriginalSQL(), parsed);
        }
    }
    
    /**
     * Execute SELECT by primary key
     */
    private SQLResult executeIndexedSelectByPrimaryKey(SQLParser.ParsedSQL parsed, 
                                                       CubicIndexTree primaryIndex, 
                                                       String primaryKey) {
        byte[] data = primaryIndex.get(primaryKey);
        
        if (data != null) {
            indexHits++;
            queriesOptimized++;
            
            Map<String, String> row = deserializeRow(data);
            List<Map<String, String>> rows = Collections.singletonList(row);
            
            return SQLResult.success("Query executed (secondary-index-optimized)")
                .withRows(rows)
                .withRowCount(1);
        }
        
        return SQLResult.success("Query executed").withRowCount(0);
    }
    
    /**
     * Handle INSERT with index update
     */
    private SQLResult handleInsertWithIndex(SQLParser.ParsedSQL parsed) {
        // Execute the INSERT
        SQLResult result = executeSQLParser(parsed.getOriginalSQL(), parsed);
        
        if (result.isSuccess()) {
            // Update indexes
            updateIndexesOnInsert(parsed);
        }
        
        return result;
    }
    
    /**
     * Handle UPDATE with index update
     */
    private SQLResult handleUpdateWithIndex(SQLParser.ParsedSQL parsed) {
        // Execute the UPDATE
        SQLResult result = executeSQLParser(parsed.getOriginalSQL(), parsed);
        
        if (result.isSuccess()) {
            // Update indexes
            updateIndexesOnUpdate(parsed);
        }
        
        return result;
    }
    
    /**
     * Handle DELETE with index update
     */
    private SQLResult handleDeleteWithIndex(SQLParser.ParsedSQL parsed) {
        String whereValue = parsed.getWhereValue();
        
        // Remove from indexes first
        String indexKey = parsed.getKeyspace() + "." + parsed.getTable();
        CubicIndexTree primaryIndex = tableIndexes.get(indexKey);
        
        if (primaryIndex != null) {
            primaryIndex.remove(whereValue);
        }
        
        // Remove from secondary indexes
        removeFromSecondaryIndexes(indexKey, whereValue);
        
        // Execute the DELETE
        return executeSQLParser(parsed.getOriginalSQL(), parsed);
    }
    
    /**
     * Handle DROP INDEX
     */
    private SQLResult handleDropIndex(SQLParser.ParsedSQL parsed) {
        String indexName = parsed.getIndexName();
        
        // Remove from secondary indexes map
        // (simplified - in production, maintain index name mapping)
        logger.info("Dropped index: {}", indexName);
        
        return SQLResult.success("Index dropped: " + indexName);
    }
    
    /**
     * Update indexes after INSERT
     */
    private void updateIndexesOnInsert(SQLParser.ParsedSQL parsed) {
        String indexKey = parsed.getKeyspace() + "." + parsed.getTable();
        CubicIndexTree primaryIndex = tableIndexes.get(indexKey);
        
        if (primaryIndex != null && parsed.getValues() != null && !parsed.getValues().isEmpty()) {
            // First value is typically the primary key
            String primaryKey = parsed.getValues().get(0);
            byte[] rowData = serializeRow(parsed);
            
            primaryIndex.put(primaryKey, rowData);
            logger.debug("Updated primary index for key: {}", primaryKey);
        }
    }
    
    
    /**
     * Update indexes after UPDATE
     */
    private void updateIndexesOnUpdate(IndexedParsedSQL parsed) {
        String whereValue = parsed.getWhereValue();
        String tableKey = parsed.getFullTableName();
        CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
        
        if (primaryIndex != null && whereValue != null) {
            // Re-serialize and update
            byte[] rowData = serializeRow(parsed.getColumns());
            primaryIndex.put(whereValue, rowData);
            
            logger.debug("Updated primary index for key: {}", whereValue);
            
            // Update secondary indexes
            updateSecondaryIndexes(tableKey, whereValue, parsed.getColumns());
        }
    }
    
    /**
     * Update secondary indexes
     */
    private void updateSecondaryIndexes(String tableKey, String primaryKey, Map<String, String> columns) {
        for (Map.Entry<String, String> entry : columns.entrySet()) {
            String column = entry.getKey();
            String value = entry.getValue();
            
            String indexKey = tableKey + "." + column;
            CubicIndexTree secondaryIndex = secondaryIndexes.get(indexKey);
            
            if (secondaryIndex != null) {
                // Map column value -> primary key
                secondaryIndex.put(value, primaryKey.getBytes(StandardCharsets.UTF_8));
                logger.debug("Updated secondary index {}: {} -> {}", column, value, primaryKey);
            }
        }
    }
    
    /**
     * Populate secondary index with existing data
     */
    private int populateSecondaryIndex(String keyspace, String table, String column, CubicIndexTree index) {
        // Get primary index to scan existing data
        String tableKey = keyspace + "." + table;
        CubicIndexTree primaryIndex = primaryIndexes.get(tableKey);
        
        if (primaryIndex == null) {
            return 0;
        }
        
        int count = 0;
        Set<String> allKeys = primaryIndex.getAllKeys();
        
        for (String primaryKey : allKeys) {
            byte[] rowData = primaryIndex.get(primaryKey);
            if (rowData != null) {
                Map<String, String> row = deserializeRow(rowData);
                String columnValue = row.get(column);
                
                if (columnValue != null) {
                    // Map column value -> primary key
                    index.put(columnValue, primaryKey.getBytes(StandardCharsets.UTF_8));
                    count++;
                }
            }
        }
        
        return count;
    }
    
    /**
     * Remove from secondary indexes
     */
    private void removeFromSecondaryIndexes(String tableKey, String primaryKey) {
        for (Map.Entry<String, CubicIndexTree> entry : secondaryIndexes.entrySet()) {
            String indexKey = entry.getKey();
            
            if (indexKey.startsWith(tableKey + ".")) {
                // We need to find which value maps to this primary key
                // For simplicity, remove by scanning (could be optimized with reverse index)
                CubicIndexTree index = entry.getValue();
                Set<String> keysToRemove = new HashSet<>();
                
                for (String key : index.getAllKeys()) {
                    byte[] value = index.get(key);
                    if (value != null && new String(value, StandardCharsets.UTF_8).equals(primaryKey)) {
                        keysToRemove.add(key);
                    }
                }
                
                for (String key : keysToRemove) {
                    index.remove(key);
                }
            }
        }
    }
    
    /**
     * Check if column is primary key
     */
    private boolean isPrimaryKeyColumn(String column, String primaryKey) {
        if (primaryKey != null && primaryKey.equals(column)) {
            return true;
        }
        // Common primary key column names
        return "id".equals(column) || column.endsWith("_id") || column.equals("pk");
    }
    
    /**
     * Serialize row to bytes
     */
    private byte[] serializeRow(Map<String, String> columns) {
        try {
            String json = objectMapper.writeValueAsString(columns);
            return json.getBytes(StandardCharsets.UTF_8);
        } catch (Exception e) {
            logger.error("Error serializing row: {}", e.getMessage());
            return new byte[0];
        }
    }
    
    /**
     * Deserialize row from bytes
     */
    private Map<String, String> deserializeRow(byte[] data) {
        try {
            String json = new String(data, StandardCharsets.UTF_8);
            return objectMapper.readValue(json, new TypeReference<Map<String, String>>() {});
        } catch (Exception e) {
            logger.error("Error deserializing row: {}", e.getMessage());
            return new LinkedHashMap<>();
        }
    }
    
    /**
     * Check if result is successful
     */
    private boolean isSuccess(Map<String, Object> result) {
        return result != null && Boolean.TRUE.equals(result.get("success"));
    }
    
    /**
     * Create error result
     */
    private Map<String, Object> createErrorResult(String message) {
        Map<String, Object> result = new LinkedHashMap<>();
        result.put("success", false);
        result.put("error", message);
        return result;
    }
    
    /**
     * Get index statistics
     */
    public Map<String, Object> getIndexStats() {
        Map<String, Object> stats = new LinkedHashMap<>();
        
        stats.put("indexHits", indexHits);
        stats.put("indexMisses", indexMisses);
        stats.put("hitRate", indexHits + indexMisses > 0 ? 
            String.format("%.2f%%", (double) indexHits / (indexHits + indexMisses) * 100) : "0.00%");
        stats.put("queriesOptimized", queriesOptimized);
        stats.put("primaryIndexes", primaryIndexes.size());
        stats.put("secondaryIndexes", secondaryIndexes.size());
        stats.put("totalIndexes", primaryIndexes.size() + secondaryIndexes.size());
        stats.put("indexCreations", indexCreations);
        
        // Per-table primary index stats
        Map<String, Object> tableStats = new LinkedHashMap<>();
        for (Map.Entry<String, CubicIndexTree> entry : primaryIndexes.entrySet()) {
            Map<String, Object> indexStats = entry.getValue().getStats();
            tableStats.put(entry.getKey(), indexStats);
        }
        stats.put("primaryIndexDetails", tableStats);
        
        // Secondary index stats
        Map<String, Object> secondaryStats = new LinkedHashMap<>();
        for (Map.Entry<String, String> entry : indexMetadata.entrySet()) {
            String indexName = entry.getKey();
            String indexKey = entry.getValue();
            CubicIndexTree index = secondaryIndexes.get(indexKey);
            
            if (index != null) {
                Map<String, Object> indexInfo = new LinkedHashMap<>();
                indexInfo.put("indexKey", indexKey);
                indexInfo.put("totalKeys", index.getTotalSize());
                indexInfo.put("levels", index.getLevelCount());
                secondaryStats.put(indexName, indexInfo);
            }
        }
        stats.put("secondaryIndexDetails", secondaryStats);
        
        return stats;
    }
    
    /**
     * Clear all indexes
     */
    public void clearIndexes() {
        for (CubicIndexTree index : primaryIndexes.values()) {
            index.clear();
        }
        for (CubicIndexTree index : secondaryIndexes.values()) {
            index.clear();
        }
        indexMetadata.clear();
        
        indexHits = 0;
        indexMisses = 0;
        queriesOptimized = 0;
        indexCreations = 0;
        
        logger.info("Cleared all cubic indexes");
    }
    
    /**
     * Rebalance all indexes
     */
    public void rebalanceAllIndexes() {
        logger.info("Rebalancing all cubic indexes...");
        
        int rebalanced = 0;
        
        for (CubicIndexTree index : primaryIndexes.values()) {
            index.rebalance();
            rebalanced++;
        }
        
        for (CubicIndexTree index : secondaryIndexes.values()) {
            index.rebalance();
            rebalanced++;
        }
        
        logger.info("Rebalanced {} cubic indexes", rebalanced);
    }
    
    /**
     * Get all indexed tables
     */
    public Set<String> getIndexedTables() {
        return new HashSet<>(primaryIndexes.keySet());
    }
    
    /**
     * Get secondary indexes for a table
     */
    public List<String> getSecondaryIndexes(String keyspace, String table) {
        String tableKey = keyspace + "." + table;
        List<String> indexes = new ArrayList<>();
        
        for (Map.Entry<String, String> entry : indexMetadata.entrySet()) {
            String indexKey = entry.getValue();
            if (indexKey.startsWith(tableKey + ".")) {
                indexes.add(entry.getKey());
            }
        }
        
        return indexes;
    }
}