diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..9a874b5 --- /dev/null +++ b/.DS_Store Binary files differ diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..ab1f416 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,10 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Ignored default folder with query files +/queries/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/compiler.xml b/.idea/compiler.xml new file mode 100644 index 0000000..8da4db7 --- /dev/null +++ b/.idea/compiler.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..63e9001 --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml new file mode 100644 index 0000000..712ab9d --- /dev/null +++ b/.idea/jarRepositories.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..5e4e294 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,12 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/BUILD.md b/BUILD.md new file mode 100644 index 0000000..ff027ef --- /dev/null +++ b/BUILD.md @@ -0,0 +1,322 @@ +# CubeCactus - Build Instructions + +## Prerequisites Installation + +### Ubuntu/Debian +```bash +# Update package list +sudo apt update + +# Install JDK 21 +sudo apt install openjdk-21-jdk + +# Install Maven +sudo apt install maven + +# Verify installations +java -version # Should show "21" +javac -version # Should show "21" +mvn -version # Should show "3.6" or higher +``` + +### macOS +```bash +# Install Homebrew (if not already installed) +/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" + +# Install Java 21 +brew install openjdk@21 + +# Install Maven +brew install maven + +# Link Java +sudo ln -sfn /usr/local/opt/openjdk@21/libexec/openjdk.jdk \ + /Library/Java/JavaVirtualMachines/openjdk-21.jdk + +# Verify +java -version +mvn -version +``` + +### Windows +```bash +# 1. Download and install JDK 21 from: +# https://adoptium.net/ + +# 2. Download and install Maven from: +# https://maven.apache.org/download.cgi + +# 3. Add to PATH: +# JAVA_HOME=C:\Program Files\Java\jdk-21 +# MAVEN_HOME=C:\Program Files\Apache\maven +# PATH=%JAVA_HOME%\bin;%MAVEN_HOME%\bin + +# 4. Verify in Command Prompt: +java -version +mvn -version +``` + +--- + +## Build Steps + +### Step 1: Navigate to Source +```bash +cd cubecactus-source/ +``` + +### Step 2: Build with Maven +```bash +# Full build with tests +mvn clean package + +# Skip tests (faster) +mvn clean package -DskipTests + +# Verbose output +mvn clean package -X +``` + +### Step 3: Verify Build +```bash +# Check if JAR was created +ls -lh target/cubecactus-1.0.0.jar + +# Should show ~50MB file +``` + +--- + +## Build Output + +Successful build creates: +``` +target/ +├── cubecactus-1.0.0.jar # Executable JAR (with dependencies) +├── cubecactus-1.0.0.jar.original # Without dependencies +├── classes/ # Compiled .class files +├── generated-sources/ # Generated code +├── maven-archiver/ # Build metadata +└── maven-status/ # Build status +``` + +--- + +## Running the Application + +### Method 1: Run JAR Directly +```bash +java -jar target/cubecactus-1.0.0.jar +``` + +### Method 2: Run with Maven +```bash +mvn spring-boot:run +``` + +### Method 3: Run with Custom Port +```bash +java -Dserver.port=9090 -jar target/cubecactus-1.0.0.jar +``` + +### Method 4: Run with Custom Configuration +```bash +java -jar target/cubecactus-1.0.0.jar \ + --server.port=8080 \ + --cube.node.id=node-1 \ + --cube.data.dir=/var/lib/cubecactus +``` + +--- + +## Verify Installation + +```bash +# Wait a few seconds for startup, then: + +# Check health +curl http://localhost:8080/api/v1/health + +# Should return: +# {"status":"UP"} + +# Test SQL API +curl -X POST http://localhost:8080/api/v1/sql/execute \ + -H "Content-Type: application/json" \ + -d '{"sql": "CREATE TABLE test (id TEXT PRIMARY KEY, value TEXT)"}' +``` + +--- + +## Build Profiles + +### Development Profile +```bash +mvn clean package -Pdev -DskipTests +``` + +### Production Profile +```bash +mvn clean package -Pprod +``` + +### Docker Profile +```bash +mvn clean package -Pdocker +``` + +--- + +## Advanced Build Options + +### Parallel Build +```bash +# Use all CPU cores +mvn clean package -T 1C + +# Use specific number of threads +mvn clean package -T 4 +``` + +### Offline Build +```bash +# Use cached dependencies +mvn clean package -o +``` + +### Update Dependencies +```bash +# Force update from remote repositories +mvn clean package -U +``` + +### Skip Specific Tests +```bash +mvn clean package -Dtest=!GossipProtocolTest +``` + +--- + +## Troubleshooting Build Issues + +### Issue: "Failed to execute goal" +```bash +# Solution: Clean and rebuild +rm -rf target/ +mvn clean install -U +``` + +### Issue: "Cannot find symbol" +```bash +# Solution: Ensure Java 21 +java -version # Must show 21 or higher + +# Verify Maven uses correct Java +mvn -version # Check "Java version" line +``` + +### Issue: "Connection timed out" +```bash +# Solution: Configure Maven proxy (if behind firewall) +# Edit ~/.m2/settings.xml: + + + proxy.example.com + 8080 + + +``` + +### Issue: "Insufficient memory" +```bash +# Solution: Increase Maven memory +export MAVEN_OPTS="-Xmx2048m -XX:MaxPermSize=512m" +mvn clean package +``` + +--- + +## Clean Build + +```bash +# Remove all build artifacts +mvn clean + +# Deep clean (including cached dependencies) +rm -rf target/ ~/.m2/repository/com/cube/ +mvn clean install +``` + +--- + +## IDE-Specific Build + +### IntelliJ IDEA +``` +1. Import project (pom.xml) +2. Build → Build Project (Ctrl+F9) +3. Run → Run 'CubeApplication' (Shift+F10) +``` + +### Eclipse +``` +1. Import → Maven → Existing Maven Project +2. Project → Build Project +3. Run As → Java Application +``` + +### VS Code +``` +1. Open folder in VS Code +2. Maven sidebar → Execute → clean package +3. Run → Start Debugging (F5) +``` + +--- + +## Continuous Integration + +### GitHub Actions +```yaml +name: Build +on: [push] +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-java@v2 + with: + java-version: '21' + - run: mvn clean package +``` + +### Jenkins +```groovy +pipeline { + agent any + stages { + stage('Build') { + steps { + sh 'mvn clean package' + } + } + } +} +``` + +--- + +## Summary + +✅ **Install:** JDK 21 + Maven 3.6+ +✅ **Build:** `mvn clean package` +✅ **Run:** `java -jar target/cubecactus-1.0.0.jar` +✅ **Test:** `curl http://localhost:8080/api/v1/health` + +**Build time:** ~2-5 minutes (first build downloads dependencies) +**JAR size:** ~50 MB +**Memory:** ~512 MB minimum + +🌵 **Happy Building!** diff --git a/git.sh b/git.sh new file mode 100644 index 0000000..066b23c --- /dev/null +++ b/git.sh @@ -0,0 +1,6 @@ +touch README.md +git init +git add README.md +git commit -m "first commit" +git remote add origin https://gitbucket.heaerie.com/git/agalyadoss/cactus.git +git push -u origin main diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..243b1a6 --- /dev/null +++ b/pom.xml @@ -0,0 +1,95 @@ + + + 4.0.0 + + com.cube + cubecactus + 1.0.0 + jar + + CubeCactus Database + Distributed column-family database with SQL support, gossip protocol, and cloud-native deployment + + + 21 + 21 + 21 + UTF-8 + + 3.2.0 + 5.10.1 + + + + org.springframework.boot + spring-boot-starter-parent + 3.2.0 + + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter + + + + + com.fasterxml.jackson.core + jackson-databind + + + + + org.slf4j + slf4j-api + + + + + org.junit.jupiter + junit-jupiter + test + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 21 + 21 + 21 + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0 + + + + diff --git a/src/main/java/com/cube/CubeApplication.java b/src/main/java/com/cube/CubeApplication.java new file mode 100644 index 0000000..7a78ba2 --- /dev/null +++ b/src/main/java/com/cube/CubeApplication.java @@ -0,0 +1,37 @@ +package com.cube; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import com.cube.storage.LSMStorageEngine; +import com.cube.cql.QueryExecutor; +import com.cube.sql.SQLExecutor; + +import java.io.IOException; + +/** + * Cube Database - Main Application + */ +@SpringBootApplication +public class CubeApplication { + + public static void main(String[] args) { + SpringApplication.run(CubeApplication.class, args); + } + + @Bean + public LSMStorageEngine storageEngine() throws IOException { + String dataDir = System.getProperty("cube.datadir", "/tmp/cube-data"); + return new LSMStorageEngine(dataDir); + } + + @Bean + public QueryExecutor queryExecutor(LSMStorageEngine storageEngine) { + return new QueryExecutor(storageEngine); + } + + @Bean + public SQLExecutor sqlExecutor(QueryExecutor queryExecutor, LSMStorageEngine storageEngine) { + return new SQLExecutor(queryExecutor, storageEngine); + } +} diff --git a/src/main/java/com/cube/api/CubeController.java b/src/main/java/com/cube/api/CubeController.java new file mode 100644 index 0000000..74fd7de --- /dev/null +++ b/src/main/java/com/cube/api/CubeController.java @@ -0,0 +1,202 @@ +package com.cube.api; + +import com.cube.storage.LSMStorageEngine; +import com.cube.storage.StorageEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.*; + +/** + * REST API Controller for Cube database + */ +@RestController +@RequestMapping("/api/v1") +public class CubeController { + + private static final Logger logger = LoggerFactory.getLogger(CubeController.class); + + @Autowired + private LSMStorageEngine storageEngine; + + @PostMapping("/put") + public ResponseEntity> put(@RequestBody Map request) { + try { + String key = request.get("key"); + String value = request.get("value"); + + if (key == null || value == null) { + return ResponseEntity.badRequest().body(Map.of( + "success", false, + "message", "Key and value are required" + )); + } + + storageEngine.put(key, value.getBytes()); + + return ResponseEntity.ok(Map.of( + "success", true, + "message", "Value stored successfully", + "key", key + )); + + } catch (Exception e) { + logger.error("Put operation failed", e); + return ResponseEntity.internalServerError().body(Map.of( + "success", false, + "message", "Error: " + e.getMessage() + )); + } + } + + @GetMapping("/get/{key}") + public ResponseEntity> get(@PathVariable String key) { + try { + byte[] value = storageEngine.get(key); + + if (value == null) { + return ResponseEntity.ok(Map.of( + "success", true, + "found", false, + "key", key + )); + } + + return ResponseEntity.ok(Map.of( + "success", true, + "found", true, + "key", key, + "value", new String(value) + )); + + } catch (Exception e) { + logger.error("Get operation failed", e); + return ResponseEntity.internalServerError().body(Map.of( + "success", false, + "message", "Error: " + e.getMessage() + )); + } + } + + @DeleteMapping("/delete/{key}") + public ResponseEntity> delete(@PathVariable String key) { + try { + storageEngine.delete(key); + + return ResponseEntity.ok(Map.of( + "success", true, + "message", "Key deleted", + "key", key + )); + + } catch (Exception e) { + logger.error("Delete operation failed", e); + return ResponseEntity.internalServerError().body(Map.of( + "success", false, + "message", "Error: " + e.getMessage() + )); + } + } + + @GetMapping("/scan") + public ResponseEntity> scan(@RequestParam String prefix) { + try { + Map results = new LinkedHashMap<>(); + + Iterator> entries = storageEngine.scanEntries(prefix); + + while (entries.hasNext()) { + Map.Entry entry = entries.next(); + results.put(entry.getKey(), new String(entry.getValue())); + } + + return ResponseEntity.ok(Map.of( + "success", true, + "prefix", prefix, + "count", results.size(), + "results", results + )); + + } catch (Exception e) { + logger.error("Scan operation failed", e); + return ResponseEntity.internalServerError().body(Map.of( + "success", false, + "message", "Error: " + e.getMessage() + )); + } + } + + @GetMapping("/stats") + public ResponseEntity> stats() { + try { + StorageEngine.StorageStats stats = storageEngine.getStats(); + + return ResponseEntity.ok(Map.of( + "success", true, + "stats", Map.of( + "totalKeys", stats.getTotalKeys(), + "totalSize", stats.getTotalSize(), + "memtableSize", stats.getMemtableSize(), + "sstableCount", stats.getSstableCount() + ) + )); + + } catch (Exception e) { + logger.error("Stats operation failed", e); + return ResponseEntity.internalServerError().body(Map.of( + "success", false, + "message", "Error: " + e.getMessage() + )); + } + } + + @PostMapping("/flush") + public ResponseEntity> flush() { + try { + storageEngine.flush(); + + return ResponseEntity.ok(Map.of( + "success", true, + "message", "Flush completed" + )); + + } catch (Exception e) { + logger.error("Flush operation failed", e); + return ResponseEntity.internalServerError().body(Map.of( + "success", false, + "message", "Error: " + e.getMessage() + )); + } + } + + @PostMapping("/compact") + public ResponseEntity> compact() { + try { + storageEngine.compact(); + + return ResponseEntity.ok(Map.of( + "success", true, + "message", "Compaction completed" + )); + + } catch (Exception e) { + logger.error("Compaction operation failed", e); + return ResponseEntity.internalServerError().body(Map.of( + "success", false, + "message", "Error: " + e.getMessage() + )); + } + } + + @GetMapping("/health") + public ResponseEntity> health() { + return ResponseEntity.ok(Map.of( + "status", "UP", + "database", "Cube DB", + "version", "1.0.0" + )); + } +} diff --git a/src/main/java/com/cube/api/SQLController.java b/src/main/java/com/cube/api/SQLController.java new file mode 100644 index 0000000..333af99 --- /dev/null +++ b/src/main/java/com/cube/api/SQLController.java @@ -0,0 +1,142 @@ +package com.cube.api; + +import com.cube.sql.SQLExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.*; + +/** + * SQL API Controller - Execute SQL statements via REST API + */ +@RestController +@RequestMapping("/api/v1/sql") +public class SQLController { + + private static final Logger logger = LoggerFactory.getLogger(SQLController.class); + + @Autowired(required = false) + private SQLExecutor sqlExecutor; + + /** + * Execute SQL statement + * + * POST /api/v1/sql/execute + * Body: { "sql": "SELECT * FROM users.profiles WHERE id = 'user-1'" } + */ + @PostMapping("/execute") + public ResponseEntity> execute(@RequestBody Map request) { + if (sqlExecutor == null) { + return ResponseEntity.status(503).body(Map.of( + "success", false, + "message", "SQL executor not initialized" + )); + } + + String sql = request.get("sql"); + if (sql == null || sql.trim().isEmpty()) { + return ResponseEntity.badRequest().body(Map.of( + "success", false, + "message", "SQL statement is required" + )); + } + + try { + logger.info("Executing SQL: {}", sql); + + SQLExecutor.SQLResult result = sqlExecutor.execute(sql); + + Map response = new LinkedHashMap<>(); + response.put("success", result.isSuccess()); + response.put("message", result.getMessage()); + + if (result.getRows() != null && !result.getRows().isEmpty()) { + response.put("rows", result.getRows()); + response.put("rowCount", result.getRows().size()); + } + + if (result.getRowsAffected() > 0) { + response.put("rowsAffected", result.getRowsAffected()); + } + + return ResponseEntity.ok(response); + + } catch (Exception e) { + logger.error("SQL execution error", e); + return ResponseEntity.internalServerError().body(Map.of( + "success", false, + "message", "Error: " + e.getMessage() + )); + } + } + + /** + * Execute multiple SQL statements (batch) + * + * POST /api/v1/sql/batch + * Body: { "statements": ["INSERT ...", "UPDATE ...", "SELECT ..."] } + */ + @PostMapping("/batch") + public ResponseEntity> executeBatch(@RequestBody Map> request) { + if (sqlExecutor == null) { + return ResponseEntity.status(503).body(Map.of( + "success", false, + "message", "SQL executor not initialized" + )); + } + + List statements = request.get("statements"); + if (statements == null || statements.isEmpty()) { + return ResponseEntity.badRequest().body(Map.of( + "success", false, + "message", "SQL statements are required" + )); + } + + try { + List> results = new ArrayList<>(); + int successCount = 0; + int failureCount = 0; + + for (String sql : statements) { + SQLExecutor.SQLResult result = sqlExecutor.execute(sql); + + Map resultMap = new LinkedHashMap<>(); + resultMap.put("sql", sql); + resultMap.put("success", result.isSuccess()); + resultMap.put("message", result.getMessage()); + + if (result.getRows() != null && !result.getRows().isEmpty()) { + resultMap.put("rowCount", result.getRows().size()); + } + + results.add(resultMap); + + if (result.isSuccess()) { + successCount++; + } else { + failureCount++; + } + } + + Map response = new LinkedHashMap<>(); + response.put("success", failureCount == 0); + response.put("total", statements.size()); + response.put("successful", successCount); + response.put("failed", failureCount); + response.put("results", results); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + logger.error("Batch execution error", e); + return ResponseEntity.internalServerError().body(Map.of( + "success", false, + "message", "Error: " + e.getMessage() + )); + } + } +} diff --git a/src/main/java/com/cube/cluster/ClusterNode.java b/src/main/java/com/cube/cluster/ClusterNode.java new file mode 100644 index 0000000..8a397b9 --- /dev/null +++ b/src/main/java/com/cube/cluster/ClusterNode.java @@ -0,0 +1,140 @@ +package com.cube.cluster; + +import java.time.Instant; +import java.util.Objects; + +/** + * Represents a node in the Cube database cluster. + */ +public class ClusterNode { + + private final String nodeId; + private final String host; + private final int port; + private final String datacenter; + private final String rack; + + private volatile NodeState state; + private volatile Instant lastHeartbeat; + private volatile long version; // For gossip protocol + + public enum NodeState { + ALIVE, // Node is responding + SUSPECTED, // Node may be down + DEAD, // Node is confirmed down + LEAVING, // Node is leaving cluster + JOINING // Node is joining cluster + } + + public ClusterNode(String nodeId, String host, int port) { + this(nodeId, host, port, "dc1", "rack1"); + } + + public ClusterNode(String nodeId, String host, int port, String datacenter, String rack) { + this.nodeId = nodeId; + this.host = host; + this.port = port; + this.datacenter = datacenter; + this.rack = rack; + this.state = NodeState.ALIVE; + this.lastHeartbeat = Instant.now(); + this.version = 0; + } + + public String getNodeId() { + return nodeId; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getDatacenter() { + return datacenter; + } + + public String getRack() { + return rack; + } + + public NodeState getState() { + return state; + } + + public void setState(NodeState state) { + this.state = state; + } + + public Instant getLastHeartbeat() { + return lastHeartbeat; + } + + public void updateHeartbeat() { + this.lastHeartbeat = Instant.now(); + this.state = NodeState.ALIVE; + } + + public long getVersion() { + return version; + } + + public void incrementVersion() { + this.version++; + } + + /** + * Get the endpoint address (host:port) + */ + public String getEndpoint() { + return host + ":" + port; + } + + /** + * Check if node is alive + */ + public boolean isAlive() { + return state == NodeState.ALIVE; + } + + /** + * Check if node is in same datacenter + */ + public boolean isInSameDatacenter(ClusterNode other) { + return this.datacenter.equals(other.datacenter); + } + + /** + * Check if node is in same rack + */ + public boolean isInSameRack(ClusterNode other) { + return this.rack.equals(other.rack); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterNode that = (ClusterNode) o; + return Objects.equals(nodeId, that.nodeId); + } + + @Override + public int hashCode() { + return Objects.hash(nodeId); + } + + @Override + public String toString() { + return "ClusterNode{" + + "nodeId='" + nodeId + '\'' + + ", endpoint=" + getEndpoint() + + ", state=" + state + + ", dc=" + datacenter + + ", rack=" + rack + + '}'; + } +} diff --git a/src/main/java/com/cube/cluster/ClusterUtils.java b/src/main/java/com/cube/cluster/ClusterUtils.java new file mode 100644 index 0000000..aa00e3c --- /dev/null +++ b/src/main/java/com/cube/cluster/ClusterUtils.java @@ -0,0 +1,389 @@ +package com.cube.cluster; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +/** + * Cluster utilities for managing node topology, health monitoring, and membership. + */ +public class ClusterUtils { + + private static final Logger logger = LoggerFactory.getLogger(ClusterUtils.class); + + /** + * Node health checker - monitors node health and updates state + */ + public static class HealthChecker { + private final Map nodes; + private final ScheduledExecutorService scheduler; + private final long checkIntervalMs; + private final long timeoutMs; + + public HealthChecker(Map nodes, long checkIntervalMs, long timeoutMs) { + this.nodes = nodes; + this.checkIntervalMs = checkIntervalMs; + this.timeoutMs = timeoutMs; + this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "HealthChecker"); + t.setDaemon(true); + return t; + }); + } + + public void start() { + scheduler.scheduleAtFixedRate( + this::checkAllNodes, + 0, + checkIntervalMs, + TimeUnit.MILLISECONDS + ); + logger.info("Health checker started (interval: {}ms, timeout: {}ms)", + checkIntervalMs, timeoutMs); + } + + private void checkAllNodes() { + long now = System.currentTimeMillis(); + + for (ClusterNode node : nodes.values()) { + Duration timeSinceHeartbeat = Duration.between( + node.getLastHeartbeat(), Instant.now()); + + if (timeSinceHeartbeat.toMillis() > timeoutMs) { + if (node.getState() == ClusterNode.NodeState.ALIVE) { + node.setState(ClusterNode.NodeState.SUSPECTED); + logger.warn("Node {} marked as SUSPECTED (no heartbeat for {}ms)", + node.getNodeId(), timeSinceHeartbeat.toMillis()); + } else if (node.getState() == ClusterNode.NodeState.SUSPECTED && + timeSinceHeartbeat.toMillis() > timeoutMs * 2) { + node.setState(ClusterNode.NodeState.DEAD); + logger.error("Node {} marked as DEAD", node.getNodeId()); + } + } else if (node.getState() != ClusterNode.NodeState.ALIVE) { + node.setState(ClusterNode.NodeState.ALIVE); + logger.info("Node {} recovered to ALIVE", node.getNodeId()); + } + } + } + + public void shutdown() { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + /** + * Cluster topology information + */ + public static class Topology { + private final List nodes; + private final Map> datacenterNodes; + private final Map> rackNodes; + + public Topology(List nodes) { + this.nodes = new ArrayList<>(nodes); + this.datacenterNodes = new HashMap<>(); + this.rackNodes = new HashMap<>(); + + buildTopology(); + } + + private void buildTopology() { + for (ClusterNode node : nodes) { + // Group by datacenter + datacenterNodes.computeIfAbsent(node.getDatacenter(), k -> new ArrayList<>()) + .add(node); + + // Group by rack + String rackKey = node.getDatacenter() + ":" + node.getRack(); + rackNodes.computeIfAbsent(rackKey, k -> new ArrayList<>()) + .add(node); + } + } + + public List getAllNodes() { + return Collections.unmodifiableList(nodes); + } + + public List getAliveNodes() { + return nodes.stream() + .filter(ClusterNode::isAlive) + .collect(Collectors.toList()); + } + + public List getNodesByDatacenter(String datacenter) { + return Collections.unmodifiableList( + datacenterNodes.getOrDefault(datacenter, Collections.emptyList())); + } + + public List getNodesByRack(String datacenter, String rack) { + String key = datacenter + ":" + rack; + return Collections.unmodifiableList( + rackNodes.getOrDefault(key, Collections.emptyList())); + } + + public Set getDatacenters() { + return Collections.unmodifiableSet(datacenterNodes.keySet()); + } + + public int getTotalNodeCount() { + return nodes.size(); + } + + public int getAliveNodeCount() { + return (int) nodes.stream().filter(ClusterNode::isAlive).count(); + } + + public void printTopology() { + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ Cluster Topology ║"); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.printf("║ Total Nodes: %-44d ║%n", getTotalNodeCount()); + System.out.printf("║ Alive Nodes: %-44d ║%n", getAliveNodeCount()); + System.out.printf("║ Datacenters: %-44d ║%n", getDatacenters().size()); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + + for (String dc : getDatacenters()) { + List dcNodes = getNodesByDatacenter(dc); + System.out.printf("║ Datacenter: %-46s ║%n", dc); + + Map rackGroups = dcNodes.stream() + .collect(Collectors.groupingBy(ClusterNode::getRack, Collectors.counting())); + + for (Map.Entry entry : rackGroups.entrySet()) { + System.out.printf("║ Rack %-10s %-37s ║%n", + entry.getKey() + ":", entry.getValue() + " nodes"); + + List rackNodes = getNodesByRack(dc, entry.getKey()); + for (ClusterNode node : rackNodes) { + String status = node.isAlive() ? "✓" : "✗"; + System.out.printf("║ %s %-20s %-28s ║%n", + status, node.getNodeId(), node.getEndpoint()); + } + } + } + + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + } + } + + /** + * Token ring for consistent hashing + */ + public static class TokenRing { + private final TreeMap ring; + private final int virtualNodes; + + public TokenRing(List nodes, int virtualNodes) { + this.ring = new TreeMap<>(); + this.virtualNodes = virtualNodes; + + for (ClusterNode node : nodes) { + addNode(node); + } + } + + public void addNode(ClusterNode node) { + for (int i = 0; i < virtualNodes; i++) { + String key = node.getNodeId() + ":" + i; + long hash = hashToLong(key); + ring.put(hash, node); + } + } + + public void removeNode(ClusterNode node) { + for (int i = 0; i < virtualNodes; i++) { + String key = node.getNodeId() + ":" + i; + long hash = hashToLong(key); + ring.remove(hash); + } + } + + public ClusterNode getNodeForKey(String key) { + if (ring.isEmpty()) { + return null; + } + + long hash = hashToLong(key); + Map.Entry entry = ring.ceilingEntry(hash); + + if (entry == null) { + entry = ring.firstEntry(); + } + + return entry.getValue(); + } + + public List getNodesForKey(String key, int count) { + List result = new ArrayList<>(); + Set seen = new HashSet<>(); + + if (ring.isEmpty()) { + return result; + } + + long hash = hashToLong(key); + + for (Map.Entry entry : ring.tailMap(hash).entrySet()) { + ClusterNode node = entry.getValue(); + if (!seen.contains(node)) { + result.add(node); + seen.add(node); + if (result.size() >= count) { + return result; + } + } + } + + // Wrap around + for (Map.Entry entry : ring.headMap(hash).entrySet()) { + ClusterNode node = entry.getValue(); + if (!seen.contains(node)) { + result.add(node); + seen.add(node); + if (result.size() >= count) { + return result; + } + } + } + + return result; + } + + private long hashToLong(String key) { + return (long) key.hashCode() & 0xffffffffL; + } + + public int size() { + return ring.size(); + } + + public void printRing() { + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ Token Ring ║"); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.printf("║ Virtual Nodes per Node: %-34d ║%n", virtualNodes); + System.out.printf("║ Total Tokens: %-34d ║%n", ring.size()); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + + Map nodeCounts = new HashMap<>(); + for (ClusterNode node : ring.values()) { + nodeCounts.put(node, nodeCounts.getOrDefault(node, 0L) + 1); + } + + for (Map.Entry entry : nodeCounts.entrySet()) { + System.out.printf("║ %-30s %27d ║%n", + entry.getKey().getNodeId(), entry.getValue()); + } + + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + } + } + + /** + * Cluster statistics aggregator + */ + public static class StatsAggregator { + + public static Map aggregateClusterStats(List nodes) { + Map stats = new HashMap<>(); + + stats.put("totalNodes", nodes.size()); + stats.put("aliveNodes", nodes.stream().filter(ClusterNode::isAlive).count()); + stats.put("deadNodes", nodes.stream() + .filter(n -> n.getState() == ClusterNode.NodeState.DEAD).count()); + stats.put("suspectedNodes", nodes.stream() + .filter(n -> n.getState() == ClusterNode.NodeState.SUSPECTED).count()); + + Map dcDistribution = nodes.stream() + .collect(Collectors.groupingBy(ClusterNode::getDatacenter, Collectors.counting())); + stats.put("datacenterDistribution", dcDistribution); + + Map stateDistribution = nodes.stream() + .collect(Collectors.groupingBy(n -> n.getState().name(), Collectors.counting())); + stats.put("stateDistribution", stateDistribution); + + return stats; + } + + public static void printClusterStats(Map stats) { + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ Cluster Statistics ║"); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.printf("║ Total Nodes: %-40d ║%n", stats.get("totalNodes")); + System.out.printf("║ Alive Nodes: %-40d ║%n", stats.get("aliveNodes")); + System.out.printf("║ Dead Nodes: %-40d ║%n", stats.get("deadNodes")); + System.out.printf("║ Suspected Nodes: %-40d ║%n", stats.get("suspectedNodes")); + + @SuppressWarnings("unchecked") + Map dcDist = (Map) stats.get("datacenterDistribution"); + if (!dcDist.isEmpty()) { + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.println("║ Datacenter Distribution: ║"); + for (Map.Entry entry : dcDist.entrySet()) { + System.out.printf("║ %-20s %-35s ║%n", + entry.getKey() + ":", entry.getValue() + " nodes"); + } + } + + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + } + } + + /** + * Node discovery helper + */ + public static class NodeDiscovery { + + /** + * Discover nodes from a seed list + */ + public static List discoverFromSeeds(List seedAddresses) { + List discovered = new ArrayList<>(); + + for (String seed : seedAddresses) { + String[] parts = seed.split(":"); + if (parts.length != 2) { + logger.warn("Invalid seed address: {}", seed); + continue; + } + + try { + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + + String nodeId = "node-" + host + "-" + port; + ClusterNode node = new ClusterNode(nodeId, host, port); + discovered.add(node); + + logger.info("Discovered node: {} ({}:{})", nodeId, host, port); + } catch (NumberFormatException e) { + logger.error("Invalid port in seed address: {}", seed); + } + } + + return discovered; + } + + /** + * Generate seed list from nodes + */ + public static List generateSeedList(List nodes) { + return nodes.stream() + .map(n -> n.getHost() + ":" + n.getPort()) + .collect(Collectors.toList()); + } + } +} diff --git a/src/main/java/com/cube/consistency/ConsistencyLevel.java b/src/main/java/com/cube/consistency/ConsistencyLevel.java new file mode 100644 index 0000000..6141c8c --- /dev/null +++ b/src/main/java/com/cube/consistency/ConsistencyLevel.java @@ -0,0 +1,119 @@ +package com.cube.consistency; + +/** + * Consistency levels for read and write operations in Cube database. + * Similar to Apache Cassandra's consistency levels. + */ +public enum ConsistencyLevel { + + /** + * ONE - Only one replica must respond. + * Fastest but least consistent. + */ + ONE(1, "Requires response from 1 replica"), + + /** + * TWO - Two replicas must respond. + * Balanced consistency and performance. + */ + TWO(2, "Requires response from 2 replicas"), + + /** + * THREE - Three replicas must respond. + */ + THREE(3, "Requires response from 3 replicas"), + + /** + * QUORUM - Majority of replicas must respond. + * (Replication Factor / 2) + 1 + * Strong consistency for most use cases. + */ + QUORUM(-1, "Requires response from majority of replicas"), + + /** + * ALL - All replicas must respond. + * Strongest consistency but slowest. + */ + ALL(-2, "Requires response from all replicas"), + + /** + * LOCAL_ONE - One replica in local datacenter. + * Used in multi-datacenter deployments. + */ + LOCAL_ONE(1, "Requires response from 1 local replica"), + + /** + * LOCAL_QUORUM - Quorum in local datacenter. + */ + LOCAL_QUORUM(-1, "Requires response from local quorum"), + + /** + * ANY - At least one node must respond (may be hinted handoff). + * Fastest writes, weakest consistency. + */ + ANY(1, "Requires response from any node (including hints)"); + + private final int requiredReplicas; + private final String description; + + ConsistencyLevel(int requiredReplicas, String description) { + this.requiredReplicas = requiredReplicas; + this.description = description; + } + + /** + * Calculate the number of required responses for a given replication factor. + * + * @param replicationFactor Total number of replicas + * @return Number of responses required + */ + public int getRequiredResponses(int replicationFactor) { + switch (this) { + case QUORUM: + case LOCAL_QUORUM: + return (replicationFactor / 2) + 1; + case ALL: + return replicationFactor; + case ANY: + case ONE: + case LOCAL_ONE: + return 1; + case TWO: + return Math.min(2, replicationFactor); + case THREE: + return Math.min(3, replicationFactor); + default: + return requiredReplicas; + } + } + + /** + * Check if this consistency level requires quorum. + */ + public boolean isQuorum() { + return this == QUORUM || this == LOCAL_QUORUM; + } + + /** + * Check if this consistency level is local to datacenter. + */ + public boolean isLocal() { + return this == LOCAL_ONE || this == LOCAL_QUORUM; + } + + /** + * Check if hinted handoff is acceptable for this level. + */ + public boolean allowsHints() { + return this == ANY; + } + + public String getDescription() { + return description; + } + + @Override + public String toString() { + return name() + " (" + description + ")"; + } +} diff --git a/src/main/java/com/cube/cql/CQLParser.java b/src/main/java/com/cube/cql/CQLParser.java new file mode 100644 index 0000000..c00300d --- /dev/null +++ b/src/main/java/com/cube/cql/CQLParser.java @@ -0,0 +1,23 @@ +package com.cube.cql; + +/** + * Minimal CQL parser stub to support SQL -> CQL flow for compilation. + */ +public class CQLParser { + + public static class ParsedQuery { + private final String cql; + + public ParsedQuery(String cql) { + this.cql = cql; + } + + public String getCql() { + return cql; + } + } + + public static ParsedQuery parse(String cql) { + return new ParsedQuery(cql); + } +} diff --git a/src/main/java/com/cube/cql/QueryExecutor.java b/src/main/java/com/cube/cql/QueryExecutor.java new file mode 100644 index 0000000..aacfebe --- /dev/null +++ b/src/main/java/com/cube/cql/QueryExecutor.java @@ -0,0 +1,269 @@ +package com.cube.cql; + +import com.cube.sql.SQLParser; +import com.cube.storage.StorageEngine; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; + +/** + * Simple QueryExecutor implementation backed by the `StorageEngine`. + * Supports a small subset of operations: CREATE TABLE (store PK), INSERT, SELECT, UPDATE, DELETE. + */ +public class QueryExecutor { + + private final StorageEngine storage; + // map of "keyspace.table" -> primaryKeyColumn + private final Map primaryKeys = new HashMap<>(); + + public QueryExecutor(StorageEngine storage) { + this.storage = storage; + } + + public Result execute(CQLParser.ParsedQuery parsedQuery) { + try { + String cql = parsedQuery.getCql(); + SQLParser.ParsedSQL sql = SQLParser.parse(cql); + + switch (sql.getType()) { + case CREATE_TABLE: + return handleCreateTable(sql); + case INSERT: + return handleInsert(sql); + case SELECT: + return handleSelect(sql); + case UPDATE: + return handleUpdate(sql); + case DELETE: + return handleDelete(sql); + case DROP_TABLE: + return handleDropTable(sql); + default: + return new Result(false, "Unsupported CQL type: " + sql.getType(), Collections.emptyList(), 0); + } + + } catch (Exception e) { + return new Result(false, "Execution error: " + e.getMessage(), Collections.emptyList(), 0); + } + } + + private Result handleCreateTable(SQLParser.ParsedSQL sql) { + String key = sql.getKeyspace() + "." + sql.getTable(); + String pk = sql.getPrimaryKey(); + if (pk == null) { + // fallback: first column + if (!sql.getColumnDefinitions().isEmpty()) { + pk = sql.getColumnDefinitions().keySet().iterator().next(); + } + } + if (pk != null) { + primaryKeys.put(key, pk); + } + return new Result(true, "Table created: " + key, Collections.emptyList(), 0); + } + + private Result handleDropTable(SQLParser.ParsedSQL sql) { + String key = sql.getKeyspace() + "." + sql.getTable(); + primaryKeys.remove(key); + return new Result(true, "Table dropped: " + key, Collections.emptyList(), 0); + } + + private Result handleInsert(SQLParser.ParsedSQL sql) throws IOException { + String tableKey = sql.getKeyspace() + "." + sql.getTable(); + Map cols = sql.getColumns(); + String pkCol = primaryKeys.getOrDefault(tableKey, cols.keySet().iterator().next()); + String pkVal = cols.get(pkCol); + if (pkVal == null) { + return new Result(false, "Primary key value missing for column: " + pkCol, Collections.emptyList(), 0); + } + + String storageKey = storageKey(sql.getKeyspace(), sql.getTable(), pkVal); + // serialize as simple newline-separated key=value UTF-8 + StringBuilder sb = new StringBuilder(); + for (Map.Entry e : cols.entrySet()) { + sb.append(e.getKey()).append("=").append(e.getValue() == null ? "" : e.getValue()).append('\n'); + } + storage.put(storageKey, sb.toString().getBytes(StandardCharsets.UTF_8)); + return new Result(true, "Inserted", Collections.emptyList(), 1); + } + + private Result handleSelect(SQLParser.ParsedSQL sql) throws IOException { + List> rows = new ArrayList<>(); + + Map where = sql.getWhereClause(); + if (where != null && !where.isEmpty()) { + // If primary key present, do direct get + String tableKey = sql.getKeyspace() + "." + sql.getTable(); + String pkCol = primaryKeys.get(tableKey); + if (pkCol != null && where.containsKey(pkCol)) { + String pkVal = where.get(pkCol); + String storageKey = storageKey(sql.getKeyspace(), sql.getTable(), pkVal); + byte[] value = storage.get(storageKey); + if (value != null) { + Map row = decodeRow(value); + rows.add(filterColumns(row, sql.getSelectColumns())); + } + return new Result(true, "OK", rows, rows.size()); + } + } + + // Full scan of table + String prefix = storagePrefix(sql.getKeyspace(), sql.getTable()); + Iterator> it = storage.scanEntries(prefix); + while (it.hasNext()) { + Map.Entry entry = it.next(); + Map row = decodeRow(entry.getValue()); + if (matchesWhere(row, sql.getWhereClause())) { + rows.add(filterColumns(row, sql.getSelectColumns())); + } + } + + return new Result(true, "OK", rows, rows.size()); + } + + private Result handleUpdate(SQLParser.ParsedSQL sql) throws IOException { + int updated = 0; + Map where = sql.getWhereClause(); + if (where != null && !where.isEmpty()) { + String tableKey = sql.getKeyspace() + "." + sql.getTable(); + String pkCol = primaryKeys.get(tableKey); + if (pkCol != null && where.containsKey(pkCol)) { + String pkVal = where.get(pkCol); + String storageKey = storageKey(sql.getKeyspace(), sql.getTable(), pkVal); + byte[] value = storage.get(storageKey); + if (value != null) { + Map row = decodeRow(value); + // apply updates + for (Map.Entry e : sql.getColumns().entrySet()) { + row.put(e.getKey(), e.getValue() == null ? null : e.getValue().getBytes(StandardCharsets.UTF_8)); + } + storage.put(storageKey, encodeRow(row)); + updated++; + } + return new Result(true, "Updated", Collections.emptyList(), updated); + } + } + + // Otherwise scan and update matching rows + String prefix = storagePrefix(sql.getKeyspace(), sql.getTable()); + Iterator> it = storage.scanEntries(prefix); + while (it.hasNext()) { + Map.Entry entry = it.next(); + Map row = decodeRow(entry.getValue()); + if (matchesWhere(row, sql.getWhereClause())) { + for (Map.Entry e : sql.getColumns().entrySet()) { + row.put(e.getKey(), e.getValue() == null ? null : e.getValue().getBytes(StandardCharsets.UTF_8)); + } + storage.put(entry.getKey(), encodeRow(row)); + updated++; + } + } + + return new Result(true, "Updated", Collections.emptyList(), updated); + } + + private Result handleDelete(SQLParser.ParsedSQL sql) throws IOException { + int deleted = 0; + Map where = sql.getWhereClause(); + String tableKey = sql.getKeyspace() + "." + sql.getTable(); + String pkCol = primaryKeys.get(tableKey); + + if (pkCol != null && where != null && where.containsKey(pkCol)) { + String pkVal = where.get(pkCol); + String storageKey = storageKey(sql.getKeyspace(), sql.getTable(), pkVal); + if (storage.delete(storageKey)) deleted++; + return new Result(true, "Deleted", Collections.emptyList(), deleted); + } + + String prefix = storagePrefix(sql.getKeyspace(), sql.getTable()); + Iterator> it = storage.scanEntries(prefix); + List keysToDelete = new ArrayList<>(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + Map row = decodeRow(entry.getValue()); + if (matchesWhere(row, sql.getWhereClause())) { + keysToDelete.add(entry.getKey()); + } + } + for (String k : keysToDelete) { + if (storage.delete(k)) deleted++; + } + + return new Result(true, "Deleted", Collections.emptyList(), deleted); + } + + private boolean matchesWhere(Map row, Map where) { + if (where == null || where.isEmpty()) return true; + for (Map.Entry cond : where.entrySet()) { + byte[] val = row.get(cond.getKey()); + String sval = val == null ? null : new String(val, StandardCharsets.UTF_8); + if (!Objects.equals(sval, cond.getValue())) return false; + } + return true; + } + + private Map filterColumns(Map row, List selectCols) { + if (selectCols == null || selectCols.isEmpty() || (selectCols.size() == 1 && "*".equals(selectCols.get(0)))) { + return row; + } + Map out = new LinkedHashMap<>(); + for (String c : selectCols) { + out.put(c, row.get(c)); + } + return out; + } + + private String storageKey(String keyspace, String table, String pkVal) { + return keyspace + ":" + table + ":" + pkVal; + } + + private String storagePrefix(String keyspace, String table) { + return keyspace + ":" + table + ":"; + } + + private Map decodeRow(byte[] bytes) { + Map map = new LinkedHashMap<>(); + if (bytes == null || bytes.length == 0) return map; + String s = new String(bytes, StandardCharsets.UTF_8); + String[] lines = s.split("\n"); + for (String line : lines) { + if (line.isEmpty()) continue; + int idx = line.indexOf('='); + if (idx <= 0) continue; + String k = line.substring(0, idx); + String v = line.substring(idx + 1); + map.put(k, v.getBytes(StandardCharsets.UTF_8)); + } + return map; + } + + private byte[] encodeRow(Map row) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry e : row.entrySet()) { + sb.append(e.getKey()).append("="); + if (e.getValue() != null) sb.append(new String(e.getValue(), StandardCharsets.UTF_8)); + sb.append('\n'); + } + return sb.toString().getBytes(StandardCharsets.UTF_8); + } + + public static class Result { + private final boolean success; + private final String message; + private final List> rows; + private final int rowsAffected; + + public Result(boolean success, String message, List> rows, int rowsAffected) { + this.success = success; + this.message = message; + this.rows = rows != null ? rows : new ArrayList<>(); + this.rowsAffected = rowsAffected; + } + + public boolean isSuccess() { return success; } + public String getMessage() { return message; } + public List> getRows() { return rows; } + public int getRowsAffected() { return rowsAffected; } + } +} diff --git a/src/main/java/com/cube/examples/CubeExamples.java b/src/main/java/com/cube/examples/CubeExamples.java new file mode 100644 index 0000000..e623253 --- /dev/null +++ b/src/main/java/com/cube/examples/CubeExamples.java @@ -0,0 +1,157 @@ +package com.cube.examples; + +import com.cube.storage.LSMStorageEngine; +import com.cube.storage.StorageEngine; + +import java.util.*; + +/** + * Runnable examples for Cube database + */ +public class CubeExamples { + + public static void main(String[] args) throws Exception { + System.out.println("=== Cube Database Examples ===\n"); + + example1_BasicOperations(); + example2_PrefixScanning(); + example3_BatchOperations(); + example4_Statistics(); + + System.out.println("\n=== All examples completed! ==="); + } + + /** + * Example 1: Basic put, get, update, delete + */ + public static void example1_BasicOperations() throws Exception { + System.out.println("Example 1: Basic Operations"); + System.out.println("---------------------------"); + + LSMStorageEngine storage = new LSMStorageEngine("/tmp/cube-example1"); + + // Put + storage.put("user:1", "Alice".getBytes()); + storage.put("user:2", "Bob".getBytes()); + System.out.println("✓ Stored 2 users"); + + // Get + String user1 = new String(storage.get("user:1")); + System.out.println("✓ Retrieved user:1 = " + user1); + + // Update + storage.put("user:1", "Alice Johnson".getBytes()); + String updated = new String(storage.get("user:1")); + System.out.println("✓ Updated user:1 = " + updated); + + // Delete + storage.delete("user:2"); + System.out.println("✓ Deleted user:2"); + + storage.close(); + System.out.println(); + } + + /** + * Example 2: Prefix scanning + */ + public static void example2_PrefixScanning() throws Exception { + System.out.println("Example 2: Prefix Scanning"); + System.out.println("--------------------------"); + + LSMStorageEngine storage = new LSMStorageEngine("/tmp/cube-example2"); + + // Store hierarchical data + storage.put("user:1:name", "Alice".getBytes()); + storage.put("user:1:email", "alice@example.com".getBytes()); + storage.put("user:1:age", "30".getBytes()); + storage.put("user:2:name", "Bob".getBytes()); + storage.put("user:2:email", "bob@example.com".getBytes()); + storage.put("product:1:name", "Laptop".getBytes()); + + // Scan for user:1 prefix + System.out.println("Scanning for prefix 'user:1':"); + Iterator> entries = storage.scanEntries("user:1"); + + while (entries.hasNext()) { + Map.Entry entry = entries.next(); + System.out.println(" " + entry.getKey() + " = " + new String(entry.getValue())); + } + + storage.close(); + System.out.println(); + } + + /** + * Example 3: Batch operations + */ + public static void example3_BatchOperations() throws Exception { + System.out.println("Example 3: Batch Operations"); + System.out.println("----------------------------"); + + LSMStorageEngine storage = new LSMStorageEngine("/tmp/cube-example3"); + + // Insert 1000 records + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < 1000; i++) { + String key = "item:" + i; + String value = "value:" + i; + storage.put(key, value.getBytes()); + } + + long endTime = System.currentTimeMillis(); + System.out.println("✓ Inserted 1000 records in " + (endTime - startTime) + "ms"); + + // Read them back + startTime = System.currentTimeMillis(); + int count = 0; + + for (int i = 0; i < 1000; i++) { + byte[] value = storage.get("item:" + i); + if (value != null) { + count++; + } + } + + endTime = System.currentTimeMillis(); + System.out.println("✓ Read " + count + " records in " + (endTime - startTime) + "ms"); + + storage.close(); + System.out.println(); + } + + /** + * Example 4: Storage statistics + */ + public static void example4_Statistics() throws Exception { + System.out.println("Example 4: Storage Statistics"); + System.out.println("------------------------------"); + + LSMStorageEngine storage = new LSMStorageEngine("/tmp/cube-example4"); + + // Insert data + for (int i = 0; i < 100; i++) { + storage.put("key:" + i, ("value:" + i).getBytes()); + } + + // Get stats + StorageEngine.StorageStats stats = storage.getStats(); + + System.out.println("Storage Statistics:"); + System.out.println(" Total Keys: " + stats.getTotalKeys()); + System.out.println(" Total Size: " + stats.getTotalSize() + " bytes"); + System.out.println(" MemTable Size: " + stats.getMemtableSize() + " bytes"); + System.out.println(" SSTable Count: " + stats.getSstableCount()); + + // Flush to disk + storage.flush(); + System.out.println("\nAfter flush:"); + stats = storage.getStats(); + System.out.println(" MemTable Size: " + stats.getMemtableSize() + " bytes"); + System.out.println(" SSTable Count: " + stats.getSstableCount()); + + storage.close(); + System.out.println(); + } +} diff --git a/src/main/java/com/cube/examples/CubicIndexExamples.java b/src/main/java/com/cube/examples/CubicIndexExamples.java new file mode 100644 index 0000000..c10461e --- /dev/null +++ b/src/main/java/com/cube/examples/CubicIndexExamples.java @@ -0,0 +1,278 @@ +package com.cube.examples; + +import com.cube.index.*; +import com.cube.storage.LSMStorageEngine; + +import java.util.*; + +/** + * Runnable examples demonstrating Cubic Index System + */ +public class CubicIndexExamples { + + public static void main(String[] args) throws Exception { + System.out.println("=== Cubic Index System Examples ===\n"); + + example1_CubicNumbers(); + example2_BasicIndexing(); + example3_SideDistribution(); + example4_MultiLevelIndex(); + example5_PrefixAndRangeSearch(); + example6_IntegratedStorage(); + + System.out.println("\n=== All cubic index examples completed! ==="); + } + + /** + * Example 1: Understanding Cubic Numbers + */ + public static void example1_CubicNumbers() { + System.out.println("Example 1: Cubic Numbers (N³×6)"); + System.out.println("----------------------------------"); + + System.out.println("Cubic Index Progression:"); + for (int n = 1; n <= 10; n++) { + long index = CubicIndexNode.calculateCubicIndex(n); + System.out.printf(" Level %d: %d³×6 = %d%n", n, n, index); + } + + System.out.println("\nNotice the exponential growth:"); + System.out.println(" Level 1→2: 6→48 (8x increase)"); + System.out.println(" Level 2→3: 48→162 (3.4x increase)"); + System.out.println(" Level 5→6: 750→1296 (1.7x increase)"); + + System.out.println(); + } + + /** + * Example 2: Basic Indexing Operations + */ + public static void example2_BasicIndexing() { + System.out.println("Example 2: Basic Cubic Indexing"); + System.out.println("--------------------------------"); + + // Create a cubic index node at level 3 + CubicIndexNode node = new CubicIndexNode(3); + + System.out.println("Created node at Level 3:"); + System.out.println(" Index Value: " + node.getIndexValue()); + System.out.println(" Formula: 3³×6 = " + (3*3*3*6)); + + // Add data + System.out.println("\nAdding data..."); + node.put("user:alice", "Alice Johnson".getBytes()); + node.put("user:bob", "Bob Smith".getBytes()); + node.put("product:laptop", "Dell XPS 13".getBytes()); + + System.out.println("✓ Added 3 items"); + System.out.println(" Total keys: " + node.getTotalSize()); + + // Retrieve data + System.out.println("\nRetrieving data..."); + String alice = new String(node.get("user:alice")); + System.out.println(" user:alice → " + alice); + + // Show statistics + Map stats = node.getStats(); + System.out.println("\nNode Statistics:"); + System.out.println(" " + stats); + + System.out.println(); + } + + /** + * Example 3: 6-Sided Distribution + */ + public static void example3_SideDistribution() { + System.out.println("Example 3: 6-Sided Data Distribution"); + System.out.println("-------------------------------------"); + + CubicIndexNode node = new CubicIndexNode(2); + + System.out.println("The 6 sides of a cubic node:"); + for (CubicIndexNode.Side side : CubicIndexNode.Side.values()) { + System.out.println(" " + side.name() + " (index: " + side.getIndex() + ")"); + } + + // Add data and see distribution + System.out.println("\nAdding 60 keys..."); + for (int i = 0; i < 60; i++) { + node.put("key-" + i, ("value-" + i).getBytes()); + } + + System.out.println("\nDistribution across sides:"); + for (CubicIndexNode.Side side : CubicIndexNode.Side.values()) { + int count = node.getSide(side).size(); + System.out.printf(" %-6s: %2d keys ", side.name(), count); + System.out.println("█".repeat(count / 2)); + } + + System.out.println("\nTotal: " + node.getTotalSize() + " keys"); + System.out.println("Average per side: " + (node.getTotalSize() / 6.0)); + + System.out.println(); + } + + /** + * Example 4: Multi-Level Index + */ + public static void example4_MultiLevelIndex() { + System.out.println("Example 4: Multi-Level Cubic Index"); + System.out.println("-----------------------------------"); + + CubicIndexTree tree = new CubicIndexTree(5, 20, true); + + System.out.println("Created tree with 5 levels:"); + System.out.println(" Level 1: Capacity = 6"); + System.out.println(" Level 2: Capacity = 48"); + System.out.println(" Level 3: Capacity = 162"); + System.out.println(" Level 4: Capacity = 384"); + System.out.println(" Level 5: Capacity = 750"); + + // Add data + System.out.println("\nAdding data across levels..."); + String[] datasets = { + "users", "products", "orders", "sessions", "events" + }; + + for (String dataset : datasets) { + for (int i = 0; i < 20; i++) { + String key = dataset + ":" + i; + tree.put(key, ("data-" + i).getBytes()); + } + } + + System.out.println("✓ Added " + tree.getTotalSize() + " keys"); + + // Show distribution + System.out.println("\nDistribution across levels:"); + for (int level = 1; level <= tree.getLevelCount(); level++) { + CubicIndexNode node = tree.getLevel(level); + if (node != null) { + System.out.printf(" Level %d: %3d keys%n", level, node.getTotalSize()); + } + } + + System.out.println(); + } + + /** + * Example 5: Prefix and Range Search + */ + public static void example5_PrefixAndRangeSearch() { + System.out.println("Example 5: Advanced Search Operations"); + System.out.println("--------------------------------------"); + + CubicIndexTree tree = new CubicIndexTree(); + + // Add hierarchical data + System.out.println("Adding hierarchical data..."); + tree.put("user:1:name", "Alice".getBytes()); + tree.put("user:1:email", "alice@example.com".getBytes()); + tree.put("user:1:age", "30".getBytes()); + tree.put("user:2:name", "Bob".getBytes()); + tree.put("user:2:email", "bob@example.com".getBytes()); + tree.put("product:laptop:1", "Dell XPS".getBytes()); + tree.put("product:laptop:2", "MacBook Pro".getBytes()); + tree.put("product:mouse:1", "Logitech MX".getBytes()); + + // Prefix search + System.out.println("\nPrefix Search 'user:1':"); + List user1 = tree.searchPrefix("user:1"); + for (String key : user1) { + System.out.println(" → " + key); + } + + System.out.println("\nPrefix Search 'product:laptop':"); + List laptops = tree.searchPrefix("product:laptop"); + for (String key : laptops) { + System.out.println(" → " + key); + } + + // Add sequential keys for range search + System.out.println("\nAdding sequential keys..."); + for (int i = 0; i < 20; i++) { + tree.put(String.format("seq:%03d", i), ("data-" + i).getBytes()); + } + + System.out.println("\nRange Search 'seq:005' to 'seq:010':"); + List range = tree.searchRange("seq:005", "seq:010"); + for (String key : range) { + System.out.println(" → " + key); + } + + System.out.println(); + } + + /** + * Example 6: Integrated Storage with Cubic Index + */ + public static void example6_IntegratedStorage() throws Exception { + System.out.println("Example 6: Cubic-Indexed Storage Engine"); + System.out.println("----------------------------------------"); + + // Create backing storage + LSMStorageEngine lsmStorage = new LSMStorageEngine("/tmp/cube-indexed-example"); + + // Wrap with cubic index + CubicIndexedStorage storage = new CubicIndexedStorage(lsmStorage); + + System.out.println("Created cubic-indexed storage"); + System.out.println(" Backing: LSM Storage Engine"); + System.out.println(" Index: Cubic Index Tree"); + + // Write data + System.out.println("\nWriting data..."); + for (int i = 0; i < 100; i++) { + String key = "item:" + i; + String value = "Item " + i + " - " + UUID.randomUUID(); + storage.put(key, value.getBytes()); + } + + System.out.println("✓ Wrote 100 items"); + + // Read with index acceleration + System.out.println("\nReading data (using cubic index)..."); + long startTime = System.nanoTime(); + + byte[] value = storage.get("item:42"); + + long endTime = System.nanoTime(); + double timeMs = (endTime - startTime) / 1_000_000.0; + + System.out.println("✓ Retrieved: " + new String(value).substring(0, 20) + "..."); + System.out.println(" Time: " + String.format("%.3f", timeMs) + " ms"); + + // Prefix search + System.out.println("\nPrefix search for 'item:1'..."); + Iterator results = storage.scan("item:1"); + int count = 0; + while (results.hasNext() && count < 5) { + System.out.println(" → " + results.next()); + count++; + } + System.out.println(" (showing first " + count + " results)"); + + // Show statistics + System.out.println("\nIndex Statistics:"); + Map indexStats = storage.getIndexStats(); + System.out.println(" Total Levels: " + indexStats.get("totalLevels")); + System.out.println(" Total Keys: " + indexStats.get("totalKeys")); + + @SuppressWarnings("unchecked") + Map sideDist = (Map) indexStats.get("sideDistribution"); + System.out.println("\n Side Distribution:"); + for (Map.Entry entry : sideDist.entrySet()) { + System.out.printf(" %-6s: %d keys%n", entry.getKey(), entry.getValue()); + } + + // Visual structure + System.out.println("\nCubic Index Structure:"); + storage.printIndexStructure(); + + // Cleanup + storage.close(); + + System.out.println(); + } +} diff --git a/src/main/java/com/cube/examples/Phase2Examples.java b/src/main/java/com/cube/examples/Phase2Examples.java new file mode 100644 index 0000000..1e1f389 --- /dev/null +++ b/src/main/java/com/cube/examples/Phase2Examples.java @@ -0,0 +1,291 @@ +package com.cube.examples; + +import com.cube.cluster.ClusterNode; +import com.cube.consistency.ConsistencyLevel; +import com.cube.replication.*; +import com.cube.storage.LSMStorageEngine; + +import java.util.*; + +/** + * Runnable examples demonstrating Phase 2 features + */ +public class Phase2Examples { + + public static void main(String[] args) throws Exception { + System.out.println("=== Cube Database Phase 2 Examples ===\n"); + + example1_ConsistencyLevels(); + example2_HintedHandoff(); + example3_ReadRepair(); + example4_ReplicationStrategies(); + example5_CompleteWorkflow(); + + System.out.println("\n=== All Phase 2 examples completed! ==="); + } + + /** + * Example 1: Consistency Levels + */ + public static void example1_ConsistencyLevels() { + System.out.println("Example 1: Consistency Levels"); + System.out.println("-------------------------------"); + + int rf = 3; // Replication Factor + + System.out.println("With RF=" + rf + ":"); + System.out.println(" ONE requires: " + ConsistencyLevel.ONE.getRequiredResponses(rf) + " response(s)"); + System.out.println(" TWO requires: " + ConsistencyLevel.TWO.getRequiredResponses(rf) + " response(s)"); + System.out.println(" QUORUM requires: " + ConsistencyLevel.QUORUM.getRequiredResponses(rf) + " response(s)"); + System.out.println(" ALL requires: " + ConsistencyLevel.ALL.getRequiredResponses(rf) + " response(s)"); + + System.out.println("\nConsistency Level Properties:"); + System.out.println(" QUORUM is quorum: " + ConsistencyLevel.QUORUM.isQuorum()); + System.out.println(" ANY allows hints: " + ConsistencyLevel.ANY.allowsHints()); + System.out.println(" LOCAL_ONE is local: " + ConsistencyLevel.LOCAL_ONE.isLocal()); + + System.out.println(); + } + + /** + * Example 2: Hinted Handoff + */ + public static void example2_HintedHandoff() throws Exception { + System.out.println("Example 2: Hinted Handoff"); + System.out.println("--------------------------"); + + HintedHandoffManager hintedHandoff = new HintedHandoffManager( + "/tmp/cube-hints-example", + 1000, // Max 1000 hints per node + 3600000 // 1 hour window + ); + + // Simulate storing hints for unavailable nodes + System.out.println("Storing hints for unavailable nodes..."); + hintedHandoff.storeHint("node-2", "user:1", "Alice".getBytes()); + hintedHandoff.storeHint("node-2", "user:2", "Bob".getBytes()); + hintedHandoff.storeHint("node-3", "user:3", "Charlie".getBytes()); + + System.out.println("✓ Stored 3 hints"); + System.out.println(" Hints for node-2: " + hintedHandoff.getHintCount("node-2")); + System.out.println(" Hints for node-3: " + hintedHandoff.getHintCount("node-3")); + System.out.println(" Total hints: " + hintedHandoff.getTotalHintCount()); + + // Simulate replaying hints + System.out.println("\nReplaying hints for node-2..."); + int[] replayed = {0}; + hintedHandoff.replayHintsForNode("node-2", hint -> { + System.out.println(" → Replaying hint: key=" + hint.getKey()); + replayed[0]++; + return true; // Simulate successful replay + }); + + System.out.println("✓ Replayed " + replayed[0] + " hints"); + System.out.println(" Remaining hints for node-2: " + hintedHandoff.getHintCount("node-2")); + + hintedHandoff.shutdown(); + System.out.println(); + } + + /** + * Example 3: Read Repair + */ + public static void example3_ReadRepair() throws Exception { + System.out.println("Example 3: Read Repair"); + System.out.println("-----------------------"); + + ReadRepairManager readRepair = new ReadRepairManager(100); // 100% chance + + // Create cluster nodes + ClusterNode node1 = new ClusterNode("node-1", "host1", 8080); + ClusterNode node2 = new ClusterNode("node-2", "host2", 8080); + ClusterNode node3 = new ClusterNode("node-3", "host3", 8080); + + // Simulate responses with inconsistent values + System.out.println("Received responses from replicas:"); + List responses = new ArrayList<>(); + responses.add(new ReadRepairManager.ReadResponse(node1, "user:1", "Alice_v1".getBytes(), 1000)); + responses.add(new ReadRepairManager.ReadResponse(node2, "user:1", "Alice_v2".getBytes(), 2000)); // Newest + responses.add(new ReadRepairManager.ReadResponse(node3, "user:1", "Alice_v1".getBytes(), 1000)); + + System.out.println(" node-1: value=Alice_v1, timestamp=1000"); + System.out.println(" node-2: value=Alice_v2, timestamp=2000 (newest)"); + System.out.println(" node-3: value=Alice_v1, timestamp=1000"); + + // Detect conflicts + List conflicts = readRepair.detectConflicts(responses); + System.out.println("\n✓ Detected " + conflicts.size() + " conflicting responses"); + + // Perform read repair + System.out.println("\nPerforming read repair..."); + List repairedNodes = new ArrayList<>(); + + ReadRepairManager.ReadRepairResult result = readRepair.performReadRepairBlocking( + responses, + (node, key, value, timestamp) -> { + System.out.println(" → Repairing " + node.getNodeId() + " with newest value"); + repairedNodes.add(node.getNodeId()); + return true; + } + ); + + System.out.println("\n✓ Read repair completed:"); + System.out.println(" Canonical value: " + new String(result.getCanonicalValue())); + System.out.println(" Repair needed: " + result.isRepairNeeded()); + System.out.println(" Nodes repaired: " + result.getRepairedNodes()); + System.out.println(" Repaired nodes: " + repairedNodes); + + readRepair.shutdown(); + System.out.println(); + } + + /** + * Example 4: Replication Strategies + */ + public static void example4_ReplicationStrategies() { + System.out.println("Example 4: Replication Strategies"); + System.out.println("----------------------------------"); + + // Create cluster nodes + List nodes = new ArrayList<>(); + nodes.add(new ClusterNode("node-1", "10.0.0.1", 8080, "dc1", "rack1")); + nodes.add(new ClusterNode("node-2", "10.0.0.2", 8080, "dc1", "rack2")); + nodes.add(new ClusterNode("node-3", "10.0.0.3", 8080, "dc1", "rack3")); + nodes.add(new ClusterNode("node-4", "10.0.0.4", 8080, "dc2", "rack1")); + nodes.add(new ClusterNode("node-5", "10.0.0.5", 8080, "dc2", "rack2")); + + // Simple Strategy + System.out.println("Simple Replication Strategy:"); + SimpleReplicationStrategy simpleStrategy = new SimpleReplicationStrategy(); + List simpleReplicas = simpleStrategy.getReplicaNodes("user:123", 3, nodes); + + System.out.println(" Replicas for 'user:123' (RF=3):"); + for (ClusterNode node : simpleReplicas) { + System.out.println(" - " + node.getNodeId() + " (" + node.getEndpoint() + ")"); + } + + // Network Topology Strategy + System.out.println("\nNetwork Topology Strategy:"); + Map dcRF = new HashMap<>(); + dcRF.put("dc1", 2); + dcRF.put("dc2", 2); + + NetworkTopologyReplicationStrategy ntsStrategy = + new NetworkTopologyReplicationStrategy(dcRF); + + List ntsReplicas = ntsStrategy.getReplicaNodes("user:123", 3, nodes); + + System.out.println(" Replicas for 'user:123' (dc1=2, dc2=2):"); + for (ClusterNode node : ntsReplicas) { + System.out.println(" - " + node.getNodeId() + + " (dc=" + node.getDatacenter() + + ", rack=" + node.getRack() + ")"); + } + + System.out.println(); + } + + /** + * Example 5: Complete Workflow + */ + public static void example5_CompleteWorkflow() throws Exception { + System.out.println("Example 5: Complete Replication Workflow"); + System.out.println("-----------------------------------------"); + + // Initialize components + LSMStorageEngine storage = new LSMStorageEngine("/tmp/cube-phase2-example"); + + HintedHandoffManager hintedHandoff = new HintedHandoffManager( + "/tmp/cube-phase2-hints", 1000, 3600000); + + ReadRepairManager readRepair = new ReadRepairManager(10); + + ReplicationStrategy strategy = new SimpleReplicationStrategy(); + + ReplicationCoordinator coordinator = new ReplicationCoordinator( + storage, + strategy, + hintedHandoff, + readRepair, + 3, // RF=3 + 5000, // 5s write timeout + 3000 // 3s read timeout + ); + + // Create cluster + List nodes = new ArrayList<>(); + nodes.add(new ClusterNode("node-1", "localhost", 8080)); + + System.out.println("Cluster initialized:"); + System.out.println(" Nodes: " + nodes.size()); + System.out.println(" Replication Factor: 3"); + System.out.println(" Strategy: " + strategy.getName()); + + // Perform writes with different consistency levels + System.out.println("\n--- Write Operations ---"); + + System.out.println("Writing 'user:alice' with CL=ONE..."); + ReplicationCoordinator.WriteResult writeOne = coordinator.write( + "user:alice", + "Alice Johnson".getBytes(), + ConsistencyLevel.ONE, + nodes + ); + System.out.println("✓ Success: " + writeOne.isSuccess() + + ", replicas: " + writeOne.getSuccessfulWrites()); + + System.out.println("\nWriting 'user:bob' with CL=QUORUM..."); + ReplicationCoordinator.WriteResult writeQuorum = coordinator.write( + "user:bob", + "Bob Smith".getBytes(), + ConsistencyLevel.QUORUM, + nodes + ); + System.out.println("✓ Success: " + writeQuorum.isSuccess() + + ", replicas: " + writeQuorum.getSuccessfulWrites()); + + // Perform reads + System.out.println("\n--- Read Operations ---"); + + System.out.println("Reading 'user:alice' with CL=ONE..."); + ReplicationCoordinator.ReadResult readOne = coordinator.read( + "user:alice", + ConsistencyLevel.ONE, + nodes + ); + + if (readOne.isSuccess()) { + System.out.println("✓ Value: " + new String(readOne.getValue())); + System.out.println(" Responses: " + readOne.getResponsesReceived()); + System.out.println(" Read repair: " + readOne.isRepairPerformed()); + } + + System.out.println("\nReading 'user:bob' with CL=QUORUM..."); + ReplicationCoordinator.ReadResult readQuorum = coordinator.read( + "user:bob", + ConsistencyLevel.QUORUM, + nodes + ); + + if (readQuorum.isSuccess()) { + System.out.println("✓ Value: " + new String(readQuorum.getValue())); + System.out.println(" Responses: " + readQuorum.getResponsesReceived()); + } + + // Show statistics + System.out.println("\n--- Replication Statistics ---"); + Map stats = coordinator.getStats(); + System.out.println("Replication Factor: " + stats.get("replicationFactor")); + System.out.println("Write Timeout: " + stats.get("writeTimeoutMs") + "ms"); + System.out.println("Read Timeout: " + stats.get("readTimeoutMs") + "ms"); + System.out.println("Pending Hints: " + stats.get("pendingHints")); + System.out.println("Replication Strategy: " + stats.get("replicationStrategy")); + + // Cleanup + coordinator.shutdown(); + storage.close(); + + System.out.println("\n✓ Workflow completed successfully"); + System.out.println(); + } +} diff --git a/src/main/java/com/cube/examples/SQLExamples.java b/src/main/java/com/cube/examples/SQLExamples.java new file mode 100644 index 0000000..5dcaabe --- /dev/null +++ b/src/main/java/com/cube/examples/SQLExamples.java @@ -0,0 +1,250 @@ +package com.cube.examples; + +import com.cube.sql.SQLParser; +import com.cube.sql.SQLExecutor; +import com.cube.cql.QueryExecutor; +import com.cube.storage.LSMStorageEngine; + +import java.io.IOException; +import java.util.*; + +/** + * SQL Examples - Demonstrates all SQL features + */ +public class SQLExamples { + + public static void main(String[] args) throws IOException { + System.out.println("═══════════════════════════════════════════════════════════"); + System.out.println(" Cube Database - SQL Examples"); + System.out.println("═══════════════════════════════════════════════════════════"); + System.out.println(); + + // Initialize storage and executor + LSMStorageEngine storage = new LSMStorageEngine("/tmp/cube-sql-examples"); + QueryExecutor queryExecutor = new QueryExecutor(storage); + SQLExecutor sqlExecutor = new SQLExecutor(queryExecutor, storage); + + try { + example1_BasicCRUD(sqlExecutor); + example2_UserManagement(sqlExecutor); + example3_ProductCatalog(sqlExecutor); + example4_AdvancedQueries(sqlExecutor); + example5_BatchOperations(sqlExecutor); + + System.out.println("\n═══════════════════════════════════════════════════════════"); + System.out.println(" All SQL examples completed successfully! ✓"); + System.out.println("═══════════════════════════════════════════════════════════\n"); + + } finally { + storage.close(); + } + } + + /** + * Example 1: Basic CRUD Operations + */ + private static void example1_BasicCRUD(SQLExecutor executor) { + System.out.println("Example 1: Basic CRUD Operations"); + System.out.println("─────────────────────────────────────────────────────────────"); + + // CREATE TABLE + System.out.println("\n1. CREATE TABLE"); + String createSQL = "CREATE TABLE demo.users (id TEXT PRIMARY KEY, name TEXT, email TEXT)"; + System.out.println("SQL: " + createSQL); + SQLExecutor.SQLResult result = executor.execute(createSQL); + System.out.println("✓ " + result.getMessage()); + + // INSERT + System.out.println("\n2. INSERT"); + String insertSQL = "INSERT INTO demo.users (id, name, email) VALUES ('u1', 'Alice', 'alice@example.com')"; + System.out.println("SQL: " + insertSQL); + result = executor.execute(insertSQL); + System.out.println("✓ " + result.getMessage()); + + // SELECT + System.out.println("\n3. SELECT"); + String selectSQL = "SELECT * FROM demo.users WHERE id = 'u1'"; + System.out.println("SQL: " + selectSQL); + result = executor.execute(selectSQL); + if (result.isSuccess() && !result.getRows().isEmpty()) { + System.out.println("✓ Found: " + result.getRows().get(0)); + } + + // UPDATE + System.out.println("\n4. UPDATE"); + String updateSQL = "UPDATE demo.users SET name = 'Alice Johnson' WHERE id = 'u1'"; + System.out.println("SQL: " + updateSQL); + result = executor.execute(updateSQL); + System.out.println("✓ " + result.getMessage()); + + // DELETE + System.out.println("\n5. DELETE"); + String deleteSQL = "DELETE FROM demo.users WHERE id = 'u1'"; + System.out.println("SQL: " + deleteSQL); + result = executor.execute(deleteSQL); + System.out.println("✓ " + result.getMessage()); + + System.out.println(); + } + + /** + * Example 2: User Management System + */ + private static void example2_UserManagement(SQLExecutor executor) { + System.out.println("Example 2: User Management System"); + System.out.println("─────────────────────────────────────────────────────────────"); + + // Create users table + executor.execute("CREATE TABLE app.users (id TEXT PRIMARY KEY, username TEXT, email TEXT, status TEXT, created_at TEXT)"); + + // Add users + System.out.println("\nAdding users..."); + executor.execute("INSERT INTO app.users (id, username, email, status, created_at) VALUES ('u001', 'alice', 'alice@example.com', 'active', '2024-01-15')"); + executor.execute("INSERT INTO app.users (id, username, email, status, created_at) VALUES ('u002', 'bob', 'bob@example.com', 'active', '2024-01-16')"); + executor.execute("INSERT INTO app.users (id, username, email, status, created_at) VALUES ('u003', 'charlie', 'charlie@example.com', 'inactive', '2024-01-17')"); + System.out.println("✓ 3 users added"); + + // Query user + System.out.println("\nQuerying user u001..."); + SQLExecutor.SQLResult result = executor.execute("SELECT * FROM app.users WHERE id = 'u001'"); + if (!result.getRows().isEmpty()) { + Map user = result.getRows().get(0); + System.out.println(" Username: " + user.get("username")); + System.out.println(" Email: " + user.get("email")); + System.out.println(" Status: " + user.get("status")); + } + + // Update user status + System.out.println("\nUpdating user status..."); + executor.execute("UPDATE app.users SET status = 'suspended' WHERE id = 'u002'"); + System.out.println("✓ User u002 suspended"); + + System.out.println(); + } + + /** + * Example 3: Product Catalog + */ + private static void example3_ProductCatalog(SQLExecutor executor) { + System.out.println("Example 3: Product Catalog"); + System.out.println("─────────────────────────────────────────────────────────────"); + + // Create products table + executor.execute("CREATE TABLE shop.products (sku TEXT PRIMARY KEY, name TEXT, price TEXT, category TEXT, stock TEXT)"); + + // Add products + System.out.println("\nAdding products..."); + executor.execute("INSERT INTO shop.products (sku, name, price, category, stock) VALUES ('LAPTOP-001', 'MacBook Pro 16', '2499.99', 'Electronics', '10')"); + executor.execute("INSERT INTO shop.products (sku, name, price, category, stock) VALUES ('MOUSE-001', 'Wireless Mouse', '29.99', 'Accessories', '100')"); + executor.execute("INSERT INTO shop.products (sku, name, price, category, stock) VALUES ('KEYBOARD-001', 'Mechanical Keyboard', '149.99', 'Accessories', '50')"); + System.out.println("✓ 3 products added"); + + // Query product + System.out.println("\nQuerying laptop..."); + SQLExecutor.SQLResult result = executor.execute("SELECT * FROM shop.products WHERE sku = 'LAPTOP-001'"); + if (!result.getRows().isEmpty()) { + Map product = result.getRows().get(0); + System.out.println(" Product: " + product.get("name")); + System.out.println(" Price: $" + product.get("price")); + System.out.println(" Stock: " + product.get("stock") + " units"); + } + + // Update price + System.out.println("\nApplying discount..."); + executor.execute("UPDATE shop.products SET price = '2299.99' WHERE sku = 'LAPTOP-001'"); + System.out.println("✓ Price updated"); + + // Update stock + System.out.println("\nUpdating stock after sale..."); + executor.execute("UPDATE shop.products SET stock = '9' WHERE sku = 'LAPTOP-001'"); + System.out.println("✓ Stock updated"); + + System.out.println(); + } + + /** + * Example 4: Advanced Queries + */ + private static void example4_AdvancedQueries(SQLExecutor executor) { + System.out.println("Example 4: Advanced Queries"); + System.out.println("─────────────────────────────────────────────────────────────"); + + // Create table + executor.execute("CREATE TABLE data.records (id TEXT PRIMARY KEY, type TEXT, value TEXT, status TEXT)"); + + // Insert test data + executor.execute("INSERT INTO data.records (id, type, value, status) VALUES ('r1', 'A', '100', 'active')"); + executor.execute("INSERT INTO data.records (id, type, value, status) VALUES ('r2', 'B', '200', 'active')"); + executor.execute("INSERT INTO data.records (id, type, value, status) VALUES ('r3', 'A', '300', 'inactive')"); + + // Select specific columns + System.out.println("\nSelecting specific columns..."); + SQLExecutor.SQLResult result = executor.execute("SELECT id, type, value FROM data.records WHERE id = 'r1'"); + if (!result.getRows().isEmpty()) { + Map row = result.getRows().get(0); + System.out.println(" ID: " + row.get("id")); + System.out.println(" Type: " + row.get("type")); + System.out.println(" Value: " + row.get("value")); + } + + // Update multiple columns + System.out.println("\nUpdating multiple columns..."); + executor.execute("UPDATE data.records SET type = 'C', value = '150', status = 'pending' WHERE id = 'r1'"); + System.out.println("✓ Multiple columns updated"); + + // Verify + result = executor.execute("SELECT * FROM data.records WHERE id = 'r1'"); + if (!result.getRows().isEmpty()) { + System.out.println(" New values: " + result.getRows().get(0)); + } + + System.out.println(); + } + + /** + * Example 5: Batch Operations + */ + private static void example5_BatchOperations(SQLExecutor executor) { + System.out.println("Example 5: Batch Operations"); + System.out.println("─────────────────────────────────────────────────────────────"); + + System.out.println("\nExecuting multiple operations..."); + + // Create table + executor.execute("CREATE TABLE batch.data (id TEXT PRIMARY KEY, value TEXT)"); + + // Batch inserts + int count = 0; + for (int i = 1; i <= 5; i++) { + String sql = String.format("INSERT INTO batch.data (id, value) VALUES ('item-%d', 'value-%d')", i, i * 100); + SQLExecutor.SQLResult result = executor.execute(sql); + if (result.isSuccess()) { + count++; + } + } + System.out.println("✓ Inserted " + count + " records"); + + // Batch updates + count = 0; + for (int i = 1; i <= 3; i++) { + String sql = String.format("UPDATE batch.data SET value = 'updated-%d' WHERE id = 'item-%d'", i, i); + SQLExecutor.SQLResult result = executor.execute(sql); + if (result.isSuccess()) { + count++; + } + } + System.out.println("✓ Updated " + count + " records"); + + // Query results + System.out.println("\nFinal data:"); + for (int i = 1; i <= 5; i++) { + SQLExecutor.SQLResult result = executor.execute("SELECT * FROM batch.data WHERE id = 'item-" + i + "'"); + if (!result.getRows().isEmpty()) { + Map row = result.getRows().get(0); + System.out.println(" " + row.get("id") + " = " + row.get("value")); + } + } + + System.out.println(); + } +} diff --git a/src/main/java/com/cube/gossip/GossipMessageHandler.java b/src/main/java/com/cube/gossip/GossipMessageHandler.java new file mode 100644 index 0000000..befb350 --- /dev/null +++ b/src/main/java/com/cube/gossip/GossipMessageHandler.java @@ -0,0 +1,252 @@ +package com.cube.gossip; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.*; +import java.util.concurrent.*; + +/** + * Handles network communication for gossip protocol + */ +public class GossipMessageHandler { + + private static final Logger logger = LoggerFactory.getLogger(GossipMessageHandler.class); + + private final GossipProtocol gossipProtocol; + private ServerSocket serverSocket; + private ExecutorService listenerExecutor; + private volatile boolean running; + + public GossipMessageHandler(GossipProtocol gossipProtocol) { + this.gossipProtocol = gossipProtocol; + this.running = false; + } + + /** + * Start listening for gossip messages + */ + public void start(int port) { + try { + serverSocket = new ServerSocket(port); + running = true; + + listenerExecutor = Executors.newCachedThreadPool(r -> { + Thread t = new Thread(r, "Gossip-Listener"); + t.setDaemon(true); + return t; + }); + + // Start accepting connections + listenerExecutor.submit(this::acceptConnections); + + logger.info("Gossip message handler started on port {}", port); + + } catch (IOException e) { + logger.error("Failed to start gossip message handler", e); + throw new RuntimeException("Failed to start gossip listener", e); + } + } + + /** + * Accept incoming connections + */ + private void acceptConnections() { + while (running) { + try { + Socket socket = serverSocket.accept(); + socket.setSoTimeout(5000); // 5 second timeout + + listenerExecutor.submit(() -> handleConnection(socket)); + + } catch (SocketException e) { + if (running) { + logger.error("Socket error in accept loop", e); + } + break; + } catch (IOException e) { + if (running) { + logger.error("Error accepting connection", e); + } + } + } + } + + /** + * Handle an incoming connection + */ + private void handleConnection(Socket socket) { + try ( + ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); + ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()) + ) { + // Read message + GossipProtocol.GossipMessage message = + (GossipProtocol.GossipMessage) in.readObject(); + + logger.debug("Received {} from {}", message.getType(), message.getFromNodeId()); + + // Process message + GossipProtocol.GossipMessage response = processMessage(message); + + // Send response if needed + if (response != null) { + out.writeObject(response); + out.flush(); + } + + } catch (Exception e) { + logger.warn("Error handling connection: {}", e.getMessage()); + } finally { + try { + socket.close(); + } catch (IOException e) { + // Ignore + } + } + } + + /** + * Process a received message + */ + private GossipProtocol.GossipMessage processMessage(GossipProtocol.GossipMessage message) { + switch (message.getType()) { + case STATE_SYNC: + // Merge received state + gossipProtocol.mergeState(message.getNodeStates()); + + // Send ACK with our state + return new GossipProtocol.GossipMessage( + GossipProtocol.GossipMessage.Type.ACK, + gossipProtocol.getLocalNodeId(), + message.getFromNodeId(), + gossipProtocol.getClusterState(), + 0 + ); + + case PING: + // Respond with ACK + return new GossipProtocol.GossipMessage( + GossipProtocol.GossipMessage.Type.ACK, + gossipProtocol.getLocalNodeId(), + message.getFromNodeId(), + null, + 0 + ); + + case JOIN: + // Add joining node to cluster + gossipProtocol.mergeState(message.getNodeStates()); + + // Send full state to new node + return new GossipProtocol.GossipMessage( + GossipProtocol.GossipMessage.Type.STATE_SYNC, + gossipProtocol.getLocalNodeId(), + message.getFromNodeId(), + gossipProtocol.getClusterState(), + 0 + ); + + case LEAVE: + // Mark node as leaving + logger.info("Node {} is leaving", message.getFromNodeId()); + return null; + + case ACK: + // Acknowledgment received + return null; + + default: + logger.warn("Unknown message type: {}", message.getType()); + return null; + } + } + + /** + * Send a message to a node + */ + public void sendMessage(String host, int port, GossipProtocol.GossipMessage message) + throws IOException { + + Socket socket = null; + try { + socket = new Socket(); + socket.connect(new InetSocketAddress(host, port), 3000); // 3 second connection timeout + socket.setSoTimeout(5000); // 5 second read timeout + + try ( + ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream in = new ObjectInputStream(socket.getInputStream()) + ) { + // Send message + out.writeObject(message); + out.flush(); + + // Wait for response if needed + if (message.getType() == GossipProtocol.GossipMessage.Type.STATE_SYNC || + message.getType() == GossipProtocol.GossipMessage.Type.JOIN || + message.getType() == GossipProtocol.GossipMessage.Type.PING) { + + try { + GossipProtocol.GossipMessage response = + (GossipProtocol.GossipMessage) in.readObject(); + + if (response.getType() == GossipProtocol.GossipMessage.Type.ACK || + response.getType() == GossipProtocol.GossipMessage.Type.STATE_SYNC) { + // Merge response state + if (response.getNodeStates() != null) { + gossipProtocol.mergeState(response.getNodeStates()); + } + } + } catch (SocketTimeoutException e) { + // No response, that's okay for some message types + logger.debug("No response from {}:{}", host, port); + } + } + } + + } catch (ConnectException e) { + throw new IOException("Could not connect to " + host + ":" + port, e); + } catch (SocketTimeoutException e) { + throw new IOException("Timeout connecting to " + host + ":" + port, e); + } catch (ClassNotFoundException e) { + throw new IOException("Invalid response from " + host + ":" + port, e); + } finally { + if (socket != null && !socket.isClosed()) { + try { + socket.close(); + } catch (IOException e) { + // Ignore + } + } + } + } + + /** + * Shutdown the message handler + */ + public void shutdown() { + running = false; + + try { + if (serverSocket != null && !serverSocket.isClosed()) { + serverSocket.close(); + } + } catch (IOException e) { + logger.error("Error closing server socket", e); + } + + if (listenerExecutor != null) { + listenerExecutor.shutdown(); + try { + if (!listenerExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + listenerExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + listenerExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/src/main/java/com/cube/gossip/GossipProtocol.java b/src/main/java/com/cube/gossip/GossipProtocol.java new file mode 100644 index 0000000..725cb4e --- /dev/null +++ b/src/main/java/com/cube/gossip/GossipProtocol.java @@ -0,0 +1,589 @@ +package com.cube.gossip; + +import com.cube.cluster.ClusterNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.*; + +/** + * Gossip Protocol Implementation for Cube Database + * + * Features: + * - SWIM (Scalable Weakly-consistent Infection-style Membership) protocol + * - Efficient failure detection + * - Eventually consistent cluster state + * - Configurable gossip intervals and fanout + */ +public class GossipProtocol { + + private static final Logger logger = LoggerFactory.getLogger(GossipProtocol.class); + + private final String localNodeId; + private final ConcurrentHashMap nodeStates; + private final GossipConfig config; + private final ScheduledExecutorService scheduler; + private final ExecutorService gossipExecutor; + private final GossipMessageHandler messageHandler; + private final List listeners; + + public String getLocalNodeId() { + return localNodeId; + } + + public ConcurrentHashMap getNodeStates() { + return nodeStates; + } + + public GossipConfig getConfig() { + return config; + } + + public ScheduledExecutorService getScheduler() { + return scheduler; + } + + public ExecutorService getGossipExecutor() { + return gossipExecutor; + } + + public GossipMessageHandler getMessageHandler() { + return messageHandler; + } + + public List getListeners() { + return listeners; + } + + /** + * Node state in the cluster + */ + public static class NodeState implements Serializable { + private final String nodeId; + private final String host; + private final int port; + private volatile Status status; + private volatile long heartbeatCounter; + private volatile long lastUpdateTime; + private volatile int suspicionCounter; + private final Map metadata; + + public enum Status { + ALIVE, // Node is healthy + SUSPECTED, // Node might be down + DEAD, // Node is confirmed down + LEAVING, // Node is gracefully shutting down + JOINING // Node is joining the cluster + } + + public NodeState(String nodeId, String host, int port) { + this.nodeId = nodeId; + this.host = host; + this.port = port; + this.status = Status.JOINING; + this.heartbeatCounter = 0; + this.lastUpdateTime = System.currentTimeMillis(); + this.suspicionCounter = 0; + this.metadata = new ConcurrentHashMap<>(); + } + + public String getNodeId() { return nodeId; } + public String getHost() { return host; } + public int getPort() { return port; } + public Status getStatus() { return status; } + public long getHeartbeatCounter() { return heartbeatCounter; } + public long getLastUpdateTime() { return lastUpdateTime; } + public Map getMetadata() { return metadata; } + + public void setStatus(Status status) { + this.status = status; + this.lastUpdateTime = System.currentTimeMillis(); + } + + public void incrementHeartbeat() { + this.heartbeatCounter++; + this.lastUpdateTime = System.currentTimeMillis(); + this.suspicionCounter = 0; + } + + public void incrementSuspicion() { + this.suspicionCounter++; + this.lastUpdateTime = System.currentTimeMillis(); + } + + public int getSuspicionCounter() { + return suspicionCounter; + } + + @Override + public String toString() { + return String.format("NodeState{id=%s, host=%s, port=%d, status=%s, heartbeat=%d}", + nodeId, host, port, status, heartbeatCounter); + } + } + + /** + * Gossip configuration + */ + public static class GossipConfig { + private final long gossipIntervalMs; // How often to gossip + private final int gossipFanout; // Number of nodes to gossip to + private final long suspicionTimeoutMs; // Time before marking node as suspected + private final long failureTimeoutMs; // Time before marking node as dead + private final int maxSuspicionCount; // Max suspicion count before marking dead + private final int protocolPort; // Port for gossip communication + + public GossipConfig(long gossipIntervalMs, int gossipFanout, + long suspicionTimeoutMs, long failureTimeoutMs, + int maxSuspicionCount, int protocolPort) { + this.gossipIntervalMs = gossipIntervalMs; + this.gossipFanout = gossipFanout; + this.suspicionTimeoutMs = suspicionTimeoutMs; + this.failureTimeoutMs = failureTimeoutMs; + this.maxSuspicionCount = maxSuspicionCount; + this.protocolPort = protocolPort; + } + + public static GossipConfig defaultConfig() { + return new GossipConfig( + 1000, // Gossip every 1 second + 3, // Gossip to 3 random nodes + 5000, // Suspect after 5 seconds + 15000, // Mark dead after 15 seconds + 3, // Max 3 suspicions before dead + 7946 // Default gossip port + ); + } + } + + /** + * Gossip message types + */ + public static class GossipMessage implements Serializable { + public enum Type { + PING, // Heartbeat check + PING_REQ, // Indirect ping request + ACK, // Acknowledgment + ALIVE, // Node is alive announcement + SUSPECT, // Node suspected announcement + DEAD, // Node dead announcement + JOIN, // Join request + LEAVE, // Leave announcement + STATE_SYNC // Full state synchronization + } + + private final Type type; + private final String fromNodeId; + private final String targetNodeId; + private final long timestamp; + private final Map nodeStates; + private final long heartbeatCounter; + + public GossipMessage(Type type, String fromNodeId) { + this(type, fromNodeId, null, new HashMap<>(), 0); + } + + public GossipMessage(Type type, String fromNodeId, String targetNodeId, + Map nodeStates, long heartbeatCounter) { + this.type = type; + this.fromNodeId = fromNodeId; + this.targetNodeId = targetNodeId; + this.timestamp = System.currentTimeMillis(); + this.nodeStates = nodeStates; + this.heartbeatCounter = heartbeatCounter; + } + + public Type getType() { return type; } + public String getFromNodeId() { return fromNodeId; } + public String getTargetNodeId() { return targetNodeId; } + public long getTimestamp() { return timestamp; } + public Map getNodeStates() { return nodeStates; } + public long getHeartbeatCounter() { return heartbeatCounter; } + } + + /** + * Gossip event listener + */ + public interface GossipListener { + void onNodeJoined(NodeState node); + void onNodeLeft(NodeState node); + void onNodeSuspected(NodeState node); + void onNodeAlive(NodeState node); + void onNodeDead(NodeState node); + } + + public GossipProtocol(String localNodeId, String host, int port, GossipConfig config) { + this.localNodeId = localNodeId; + this.config = config; + this.nodeStates = new ConcurrentHashMap<>(); + this.scheduler = Executors.newScheduledThreadPool(2, r -> { + Thread t = new Thread(r, "Gossip-Scheduler"); + t.setDaemon(true); + return t; + }); + this.gossipExecutor = Executors.newFixedThreadPool(config.gossipFanout, r -> { + Thread t = new Thread(r, "Gossip-Worker"); + t.setDaemon(true); + return t; + }); + this.messageHandler = new GossipMessageHandler(this); + this.listeners = new CopyOnWriteArrayList<>(); + + // Add local node + NodeState localNode = new NodeState(localNodeId, host, port); + localNode.setStatus(NodeState.Status.ALIVE); + nodeStates.put(localNodeId, localNode); + } + + /** + * Start the gossip protocol + */ + public void start() { + logger.info("Starting gossip protocol for node: {}", localNodeId); + + // Start gossip rounds + scheduler.scheduleAtFixedRate( + this::performGossipRound, + 0, + config.gossipIntervalMs, + TimeUnit.MILLISECONDS + ); + + // Start failure detection + scheduler.scheduleAtFixedRate( + this::detectFailures, + config.suspicionTimeoutMs, + config.suspicionTimeoutMs / 2, + TimeUnit.MILLISECONDS + ); + + // Start message handler + messageHandler.start(config.protocolPort); + + logger.info("Gossip protocol started on port {}", config.protocolPort); + } + + /** + * Perform one round of gossip + */ + private void performGossipRound() { + try { + // Increment local heartbeat + NodeState localNode = nodeStates.get(localNodeId); + if (localNode != null) { + localNode.incrementHeartbeat(); + } + + // Select random nodes to gossip with + List targets = selectGossipTargets(); + + if (targets.isEmpty()) { + logger.debug("No nodes to gossip with"); + return; + } + + logger.debug("Gossiping with {} nodes", targets.size()); + + // Send gossip messages to selected nodes + for (NodeState target : targets) { + gossipExecutor.submit(() -> gossipWithNode(target)); + } + + } catch (Exception e) { + logger.error("Error in gossip round", e); + } + } + + /** + * Select random nodes for gossip + */ + private List selectGossipTargets() { + List aliveNodes = new ArrayList<>(); + + for (NodeState node : nodeStates.values()) { + if (!node.getNodeId().equals(localNodeId) && + node.getStatus() == NodeState.Status.ALIVE) { + aliveNodes.add(node); + } + } + + // Shuffle and select fanout number of nodes + Collections.shuffle(aliveNodes); + return aliveNodes.subList(0, Math.min(config.gossipFanout, aliveNodes.size())); + } + + /** + * Gossip with a specific node + */ + private void gossipWithNode(NodeState target) { + try { + // Create state sync message with current view + Map statesToSend = new HashMap<>(); + for (Map.Entry entry : nodeStates.entrySet()) { + // Only send non-dead nodes + if (entry.getValue().getStatus() != NodeState.Status.DEAD) { + statesToSend.put(entry.getKey(), entry.getValue()); + } + } + + GossipMessage message = new GossipMessage( + GossipMessage.Type.STATE_SYNC, + localNodeId, + target.getNodeId(), + statesToSend, + nodeStates.get(localNodeId).getHeartbeatCounter() + ); + + // Send message + messageHandler.sendMessage(target.getHost(), target.getPort(), message); + + logger.debug("Sent gossip to {}", target.getNodeId()); + + } catch (Exception e) { + logger.warn("Failed to gossip with {}: {}", target.getNodeId(), e.getMessage()); + suspectNode(target.getNodeId()); + } + } + + /** + * Detect failed nodes + */ + private void detectFailures() { + long now = System.currentTimeMillis(); + + for (NodeState node : nodeStates.values()) { + if (node.getNodeId().equals(localNodeId)) { + continue; // Skip local node + } + + long timeSinceUpdate = now - node.getLastUpdateTime(); + + switch (node.getStatus()) { + case ALIVE: + // Check if node should be suspected + if (timeSinceUpdate > config.suspicionTimeoutMs) { + suspectNode(node.getNodeId()); + } + break; + + case SUSPECTED: + // Check if node should be marked dead + if (timeSinceUpdate > config.failureTimeoutMs || + node.getSuspicionCounter() >= config.maxSuspicionCount) { + markNodeDead(node.getNodeId()); + } + break; + + case DEAD: + // Remove dead nodes after some time + if (timeSinceUpdate > config.failureTimeoutMs * 3) { + removeNode(node.getNodeId()); + } + break; + } + } + } + + /** + * Mark a node as suspected + */ + private void suspectNode(String nodeId) { + NodeState node = nodeStates.get(nodeId); + if (node != null && node.getStatus() == NodeState.Status.ALIVE) { + node.setStatus(NodeState.Status.SUSPECTED); + node.incrementSuspicion(); + logger.warn("Node {} is now SUSPECTED", nodeId); + notifyListeners(l -> l.onNodeSuspected(node)); + } + } + + /** + * Mark a node as dead + */ + private void markNodeDead(String nodeId) { + NodeState node = nodeStates.get(nodeId); + if (node != null && node.getStatus() != NodeState.Status.DEAD) { + node.setStatus(NodeState.Status.DEAD); + logger.error("Node {} is now DEAD", nodeId); + notifyListeners(l -> l.onNodeDead(node)); + } + } + + /** + * Remove a node from the cluster + */ + private void removeNode(String nodeId) { + NodeState node = nodeStates.remove(nodeId); + if (node != null) { + logger.info("Removed node {} from cluster", nodeId); + notifyListeners(l -> l.onNodeLeft(node)); + } + } + + /** + * Merge received state with local state + */ + public void mergeState(Map receivedStates) { + for (Map.Entry entry : receivedStates.entrySet()) { + String nodeId = entry.getKey(); + NodeState receivedNode = entry.getValue(); + + NodeState localNode = nodeStates.get(nodeId); + + if (localNode == null) { + // New node discovered + nodeStates.put(nodeId, receivedNode); + logger.info("Discovered new node: {}", receivedNode); + notifyListeners(l -> l.onNodeJoined(receivedNode)); + } else { + // Merge states based on heartbeat counter + if (receivedNode.getHeartbeatCounter() > localNode.getHeartbeatCounter()) { + // Received state is newer + NodeState.Status oldStatus = localNode.getStatus(); + localNode.setStatus(receivedNode.getStatus()); + localNode.heartbeatCounter = receivedNode.getHeartbeatCounter(); + localNode.lastUpdateTime = System.currentTimeMillis(); + + // Notify if status changed + if (oldStatus != receivedNode.getStatus()) { + if (receivedNode.getStatus() == NodeState.Status.ALIVE) { + logger.info("Node {} is now ALIVE", nodeId); + notifyListeners(l -> l.onNodeAlive(localNode)); + } + } + } + } + } + } + + /** + * Join a cluster via seed nodes + */ + public void join(List seedNodes) { + logger.info("Joining cluster via seeds: {}", seedNodes); + + for (String seed : seedNodes) { + try { + String[] parts = seed.split(":"); + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + + // Send join message + GossipMessage joinMsg = new GossipMessage( + GossipMessage.Type.JOIN, + localNodeId + ); + + messageHandler.sendMessage(host, port, joinMsg); + logger.info("Sent join request to {}", seed); + + } catch (Exception e) { + logger.error("Failed to join via seed {}: {}", seed, e.getMessage()); + } + } + } + + /** + * Leave the cluster gracefully + */ + public void leave() { + logger.info("Leaving cluster gracefully"); + + NodeState localNode = nodeStates.get(localNodeId); + if (localNode != null) { + localNode.setStatus(NodeState.Status.LEAVING); + } + + // Announce leaving to all nodes + GossipMessage leaveMsg = new GossipMessage( + GossipMessage.Type.LEAVE, + localNodeId + ); + + for (NodeState node : nodeStates.values()) { + if (!node.getNodeId().equals(localNodeId)) { + try { + messageHandler.sendMessage(node.getHost(), node.getPort(), leaveMsg); + } catch (Exception e) { + logger.warn("Failed to send leave message to {}", node.getNodeId()); + } + } + } + } + + /** + * Shutdown the gossip protocol + */ + public void shutdown() { + logger.info("Shutting down gossip protocol"); + + leave(); + + scheduler.shutdown(); + gossipExecutor.shutdown(); + messageHandler.shutdown(); + + try { + scheduler.awaitTermination(5, TimeUnit.SECONDS); + gossipExecutor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Get current cluster state + */ + public Map getClusterState() { + return new HashMap<>(nodeStates); + } + + /** + * Get alive nodes + */ + public List getAliveNodes() { + return nodeStates.values().stream() + .filter(n -> n.getStatus() == NodeState.Status.ALIVE) + .collect(java.util.stream.Collectors.toList()); + } + + /** + * Add gossip listener + */ + public void addListener(GossipListener listener) { + listeners.add(listener); + } + + /** + * Notify all listeners + */ + private void notifyListeners(java.util.function.Consumer action) { + for (GossipListener listener : listeners) { + try { + action.accept(listener); + } catch (Exception e) { + logger.error("Error notifying listener", e); + } + } + } + + /** + * Get statistics + */ + public Map getStatistics() { + Map stats = new HashMap<>(); + stats.put("localNodeId", localNodeId); + stats.put("totalNodes", nodeStates.size()); + stats.put("aliveNodes", getAliveNodes().size()); + stats.put("suspectedNodes", nodeStates.values().stream() + .filter(n -> n.getStatus() == NodeState.Status.SUSPECTED).count()); + stats.put("deadNodes", nodeStates.values().stream() + .filter(n -> n.getStatus() == NodeState.Status.DEAD).count()); + stats.put("gossipInterval", config.gossipIntervalMs); + stats.put("gossipFanout", config.gossipFanout); + + return stats; + } +} diff --git a/src/main/java/com/cube/index/CubicIndexNode.java b/src/main/java/com/cube/index/CubicIndexNode.java new file mode 100644 index 0000000..98b2e07 --- /dev/null +++ b/src/main/java/com/cube/index/CubicIndexNode.java @@ -0,0 +1,261 @@ +package com.cube.index; + +import java.util.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Cubic Index Node - Revolutionary indexing based on cube numbers. + * + * Formula: Index = N³ × 6 + * Each node has 6 sides (like a cube) for data storage. + * + * Index progression: 1³×6=6, 2³×6=48, 3³×6=162, 4³×6=384, 5³×6=750... + */ +public class CubicIndexNode { + + private final int level; // N in N³×6 + private final long indexValue; // N³×6 + private final CubeSide[] sides; // 6 sides of the cube + private final ReadWriteLock lock; + + // 6 sides of a cube: Front, Back, Left, Right, Top, Bottom + public enum Side { + FRONT(0), // Side 0 + BACK(1), // Side 1 + LEFT(2), // Side 2 + RIGHT(3), // Side 3 + TOP(4), // Side 4 + BOTTOM(5); // Side 5 + + private final int index; + + Side(int index) { + this.index = index; + } + + public int getIndex() { + return index; + } + + public static Side fromIndex(int index) { + for (Side side : values()) { + if (side.index == index) { + return side; + } + } + throw new IllegalArgumentException("Invalid side index: " + index); + } + } + + /** + * One side of the cubic index node + */ + public static class CubeSide { + private final Side position; + private final Map data; + private final ReadWriteLock sideLock; + + public CubeSide(Side position) { + this.position = position; + this.data = new HashMap<>(); + this.sideLock = new ReentrantReadWriteLock(); + } + + public void put(String key, byte[] value) { + sideLock.writeLock().lock(); + try { + data.put(key, value); + } finally { + sideLock.writeLock().unlock(); + } + } + + public byte[] get(String key) { + sideLock.readLock().lock(); + try { + return data.get(key); + } finally { + sideLock.readLock().unlock(); + } + } + + public boolean containsKey(String key) { + sideLock.readLock().lock(); + try { + return data.containsKey(key); + } finally { + sideLock.readLock().unlock(); + } + } + + public boolean remove(String key) { + sideLock.writeLock().lock(); + try { + return data.remove(key) != null; + } finally { + sideLock.writeLock().unlock(); + } + } + + public int size() { + sideLock.readLock().lock(); + try { + return data.size(); + } finally { + sideLock.readLock().unlock(); + } + } + + public Set keys() { + sideLock.readLock().lock(); + try { + return new HashSet<>(data.keySet()); + } finally { + sideLock.readLock().unlock(); + } + } + + public Side getPosition() { + return position; + } + } + + /** + * Create a cubic index node at level N + */ + public CubicIndexNode(int level) { + if (level < 1) { + throw new IllegalArgumentException("Level must be >= 1"); + } + + this.level = level; + this.indexValue = calculateCubicIndex(level); + this.sides = new CubeSide[6]; + this.lock = new ReentrantReadWriteLock(); + + // Initialize all 6 sides + for (Side side : Side.values()) { + sides[side.getIndex()] = new CubeSide(side); + } + } + + /** + * Calculate cubic index: N³ × 6 + */ + public static long calculateCubicIndex(int n) { + return (long) n * n * n * 6; + } + + /** + * Calculate which level a given value belongs to + */ + public static int calculateLevel(long value) { + // Solve for N in: N³ × 6 = value + // N³ = value / 6 + // N = ∛(value / 6) + double cubeRoot = Math.cbrt(value / 6.0); + return (int) Math.ceil(cubeRoot); + } + + /** + * Determine which side to use based on key hash + */ + public static Side determineSide(String key) { + int hash = Math.abs(key.hashCode()); + return Side.fromIndex(hash % 6); + } + + /** + * Put data into the appropriate side + */ + public void put(String key, byte[] value) { + Side side = determineSide(key); + sides[side.getIndex()].put(key, value); + } + + /** + * Get data from the appropriate side + */ + public byte[] get(String key) { + Side side = determineSide(key); + return sides[side.getIndex()].get(key); + } + + /** + * Check if key exists on any side + */ + public boolean containsKey(String key) { + Side side = determineSide(key); + return sides[side.getIndex()].containsKey(key); + } + + /** + * Remove data from the appropriate side + */ + public boolean remove(String key) { + Side side = determineSide(key); + return sides[side.getIndex()].remove(key); + } + + /** + * Get a specific side + */ + public CubeSide getSide(Side side) { + return sides[side.getIndex()]; + } + + /** + * Get total size across all sides + */ + public int getTotalSize() { + int total = 0; + for (CubeSide side : sides) { + total += side.size(); + } + return total; + } + + /** + * Get all keys across all sides + */ + public Set getAllKeys() { + Set allKeys = new HashSet<>(); + for (CubeSide side : sides) { + allKeys.addAll(side.keys()); + } + return allKeys; + } + + /** + * Get statistics for this node + */ + public Map getStats() { + Map stats = new LinkedHashMap<>(); + stats.put("level", level); + stats.put("indexValue", indexValue); + stats.put("totalKeys", getTotalSize()); + + Map sideStats = new LinkedHashMap<>(); + for (Side side : Side.values()) { + sideStats.put(side.name(), sides[side.getIndex()].size()); + } + stats.put("sideDistribution", sideStats); + + return stats; + } + + public int getLevel() { + return level; + } + + public long getIndexValue() { + return indexValue; + } + + @Override + public String toString() { + return String.format("CubicNode[L=%d, Index=%d (=%d³×6), Keys=%d]", + level, indexValue, level, getTotalSize()); + } +} diff --git a/src/main/java/com/cube/index/CubicIndexTree.java b/src/main/java/com/cube/index/CubicIndexTree.java new file mode 100644 index 0000000..f4cb72b --- /dev/null +++ b/src/main/java/com/cube/index/CubicIndexTree.java @@ -0,0 +1,372 @@ +package com.cube.index; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Cubic Index Tree - Multi-level index using cubic progression. + * + * Index levels: + * Level 1: 1³×6 = 6 + * Level 2: 2³×6 = 48 + * Level 3: 3³×6 = 162 + * Level 4: 4³×6 = 384 + * Level 5: 5³×6 = 750 + * ... + * + * Each level has 6 sides for distributing data. + * Keys are routed to appropriate level based on hash value. + */ +public class CubicIndexTree { + + private static final Logger logger = LoggerFactory.getLogger(CubicIndexTree.class); + + private final ConcurrentSkipListMap levels; + private final ReadWriteLock treeLock; + private final int maxLevels; + private final boolean autoExpand; + + /** + * Create a cubic index tree + * + * @param initialLevels Number of initial levels to create + * @param maxLevels Maximum levels allowed + * @param autoExpand Whether to automatically create new levels + */ + public CubicIndexTree(int initialLevels, int maxLevels, boolean autoExpand) { + this.levels = new ConcurrentSkipListMap<>(); + this.treeLock = new ReentrantReadWriteLock(); + this.maxLevels = maxLevels; + this.autoExpand = autoExpand; + + // Create initial levels + for (int i = 1; i <= initialLevels; i++) { + levels.put(i, new CubicIndexNode(i)); + } + + logger.info("Cubic index tree created with {} levels (max: {}, auto-expand: {})", + initialLevels, maxLevels, autoExpand); + } + + /** + * Create a cubic index tree with defaults + */ + public CubicIndexTree() { + this(5, 20, true); // 5 initial levels, up to 20, auto-expand + } + + /** + * Calculate which level a key should go to based on its hash + */ + public int calculateLevel(String key) { + // Use hash to determine level + long hash = Math.abs((long) key.hashCode()); + + // Map hash to a level (1 to current max) + int currentMaxLevel = levels.isEmpty() ? 1 : levels.lastKey(); + + // Simple distribution: use modulo to spread across levels + // For better distribution, we could use more sophisticated hashing + int level = (int) (hash % currentMaxLevel) + 1; + + // Ensure level exists + if (!levels.containsKey(level) && autoExpand && level <= maxLevels) { + expandToLevel(level); + } + + return Math.min(level, currentMaxLevel); + } + + /** + * Calculate optimal level based on data size + */ + public int calculateOptimalLevel(int estimatedSize) { + // Choose level based on capacity + // Each level can hold approximately indexValue entries + + for (int level = 1; level <= maxLevels; level++) { + long capacity = CubicIndexNode.calculateCubicIndex(level); + if (capacity >= estimatedSize) { + return level; + } + } + + return maxLevels; + } + + /** + * Expand tree to include the specified level + */ + private void expandToLevel(int targetLevel) { + treeLock.writeLock().lock(); + try { + int currentMax = levels.isEmpty() ? 0 : levels.lastKey(); + + for (int level = currentMax + 1; level <= targetLevel && level <= maxLevels; level++) { + if (!levels.containsKey(level)) { + levels.put(level, new CubicIndexNode(level)); + logger.debug("Expanded cubic tree to level {}", level); + } + } + } finally { + treeLock.writeLock().unlock(); + } + } + + /** + * Put a key-value pair into the index + */ + public void put(String key, byte[] value) { + int level = calculateLevel(key); + CubicIndexNode node = levels.get(level); + + if (node == null) { + throw new IllegalStateException("Level " + level + " does not exist"); + } + + node.put(key, value); + } + + /** + * Get a value from the index + */ + public byte[] get(String key) { + // Try to find in the calculated level first + int level = calculateLevel(key); + CubicIndexNode node = levels.get(level); + + if (node != null) { + byte[] value = node.get(key); + if (value != null) { + return value; + } + } + + // If not found, search all levels (key might have been moved) + for (CubicIndexNode n : levels.values()) { + byte[] value = n.get(key); + if (value != null) { + return value; + } + } + + return null; + } + + /** + * Check if key exists in the index + */ + public boolean containsKey(String key) { + return get(key) != null; + } + + /** + * Remove a key from the index + */ + public boolean remove(String key) { + // Search all levels + for (CubicIndexNode node : levels.values()) { + if (node.remove(key)) { + return true; + } + } + return false; + } + + /** + * Get all keys in the index + */ + public Set getAllKeys() { + Set allKeys = new HashSet<>(); + for (CubicIndexNode node : levels.values()) { + allKeys.addAll(node.getAllKeys()); + } + return allKeys; + } + + /** + * Search for keys matching a prefix + */ + public List searchPrefix(String prefix) { + List results = new ArrayList<>(); + + for (CubicIndexNode node : levels.values()) { + for (String key : node.getAllKeys()) { + if (key.startsWith(prefix)) { + results.add(key); + } + } + } + + Collections.sort(results); + return results; + } + + /** + * Range search between two keys + */ + public List searchRange(String startKey, String endKey) { + List results = new ArrayList<>(); + + for (CubicIndexNode node : levels.values()) { + for (String key : node.getAllKeys()) { + if (key.compareTo(startKey) >= 0 && key.compareTo(endKey) <= 0) { + results.add(key); + } + } + } + + Collections.sort(results); + return results; + } + + /** + * Get a specific level node + */ + public CubicIndexNode getLevel(int level) { + return levels.get(level); + } + + /** + * Get number of levels + */ + public int getLevelCount() { + return levels.size(); + } + + /** + * Get total number of keys across all levels + */ + public int getTotalSize() { + int total = 0; + for (CubicIndexNode node : levels.values()) { + total += node.getTotalSize(); + } + return total; + } + + /** + * Rebalance the tree by redistributing keys + */ + public void rebalance() { + treeLock.writeLock().lock(); + try { + logger.info("Rebalancing cubic index tree..."); + + // Collect all key-value pairs + Map allData = new HashMap<>(); + for (CubicIndexNode node : levels.values()) { + for (String key : node.getAllKeys()) { + allData.put(key, node.get(key)); + } + } + + // Clear all nodes + for (CubicIndexNode node : levels.values()) { + for (String key : node.getAllKeys()) { + node.remove(key); + } + } + + // Redistribute + for (Map.Entry entry : allData.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + + logger.info("Rebalanced {} keys across {} levels", allData.size(), levels.size()); + + } finally { + treeLock.writeLock().unlock(); + } + } + + /** + * Get comprehensive statistics + */ + public Map getStats() { + Map stats = new LinkedHashMap<>(); + + stats.put("totalLevels", levels.size()); + stats.put("maxLevels", maxLevels); + stats.put("autoExpand", autoExpand); + stats.put("totalKeys", getTotalSize()); + + // Per-level statistics + Map levelStats = new LinkedHashMap<>(); + for (Map.Entry entry : levels.entrySet()) { + int level = entry.getKey(); + CubicIndexNode node = entry.getValue(); + + Map nodeStats = new LinkedHashMap<>(); + nodeStats.put("indexValue", node.getIndexValue()); + nodeStats.put("keys", node.getTotalSize()); + nodeStats.put("capacity", node.getIndexValue()); + nodeStats.put("utilization", + String.format("%.2f%%", (node.getTotalSize() * 100.0) / node.getIndexValue())); + + levelStats.put("Level-" + level, nodeStats); + } + stats.put("levels", levelStats); + + // Distribution across sides + Map sideDistribution = new LinkedHashMap<>(); + for (CubicIndexNode.Side side : CubicIndexNode.Side.values()) { + int count = 0; + for (CubicIndexNode node : levels.values()) { + count += node.getSide(side).size(); + } + sideDistribution.put(side.name(), count); + } + stats.put("sideDistribution", sideDistribution); + + return stats; + } + + /** + * Print tree structure + */ + public void printStructure() { + System.out.println("Cubic Index Tree Structure:"); + System.out.println("==========================="); + + for (Map.Entry entry : levels.entrySet()) { + int level = entry.getKey(); + CubicIndexNode node = entry.getValue(); + + System.out.printf("Level %d: Index=%d (%d³×6) - %d keys%n", + level, node.getIndexValue(), level, node.getTotalSize()); + + for (CubicIndexNode.Side side : CubicIndexNode.Side.values()) { + int count = node.getSide(side).size(); + if (count > 0) { + System.out.printf(" %s: %d keys%n", side.name(), count); + } + } + } + + System.out.println("==========================="); + System.out.printf("Total: %d keys across %d levels%n", getTotalSize(), levels.size()); + } + + /** + * Clear all data + */ + public void clear() { + treeLock.writeLock().lock(); + try { + for (CubicIndexNode node : levels.values()) { + for (String key : node.getAllKeys()) { + node.remove(key); + } + } + logger.info("Cleared all data from cubic index tree"); + } finally { + treeLock.writeLock().unlock(); + } + } +} diff --git a/src/main/java/com/cube/index/CubicIndexedStorage.java b/src/main/java/com/cube/index/CubicIndexedStorage.java new file mode 100644 index 0000000..b2e7762 --- /dev/null +++ b/src/main/java/com/cube/index/CubicIndexedStorage.java @@ -0,0 +1,253 @@ +package com.cube.index; + +import com.cube.storage.StorageEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Cubic Indexed Storage Engine - Combines LSM storage with cubic indexing. + * + * Features: + * - Cubic index tree (N³×6) for fast lookups + * - 6-sided distribution for load balancing + * - Auto-expanding levels + * - Prefix and range queries + */ +public class CubicIndexedStorage implements StorageEngine { + + private static final Logger logger = LoggerFactory.getLogger(CubicIndexedStorage.class); + + private final StorageEngine backingStorage; + private final CubicIndexTree index; + private final boolean indexEnabled; + + public CubicIndexedStorage(StorageEngine backingStorage) { + this(backingStorage, true, 5, 20); + } + + public CubicIndexedStorage( + StorageEngine backingStorage, + boolean indexEnabled, + int initialLevels, + int maxLevels) { + + this.backingStorage = backingStorage; + this.indexEnabled = indexEnabled; + this.index = indexEnabled ? new CubicIndexTree(initialLevels, maxLevels, true) : null; + + logger.info("Cubic indexed storage initialized (indexing: {})", indexEnabled); + } + + @Override + public void put(String key, byte[] value) throws IOException { + // Write to backing storage + backingStorage.put(key, value); + + // Update index + if (indexEnabled) { + index.put(key, value); + } + } + + @Override + public byte[] get(String key) throws IOException { + // Try index first for fast lookup + if (indexEnabled) { + byte[] value = index.get(key); + if (value != null) { + return value; + } + } + + // Fallback to backing storage + return backingStorage.get(key); + } + + @Override + public boolean delete(String key) throws IOException { + // Remove from index + if (indexEnabled) { + index.remove(key); + } + + // Delete from backing storage + return backingStorage.delete(key); + } + + @Override + public Iterator scan(String prefix) throws IOException { + if (indexEnabled) { + // Use index for fast prefix search + List results = index.searchPrefix(prefix); + return results.iterator(); + } + + // Fallback to backing storage + return backingStorage.scan(prefix); + } + + @Override + public Iterator> scanEntries(String prefix) throws IOException { + if (indexEnabled) { + // Use index for fast prefix search + List keys = index.searchPrefix(prefix); + List> entries = new ArrayList<>(); + + for (String key : keys) { + byte[] value = index.get(key); + if (value != null) { + entries.add(new AbstractMap.SimpleEntry<>(key, value)); + } + } + + return entries.iterator(); + } + + // Fallback to backing storage + return backingStorage.scanEntries(prefix); + } + + /** + * Range search using cubic index + */ + public List rangeSearch(String startKey, String endKey) { + if (!indexEnabled) { + throw new UnsupportedOperationException("Index is disabled"); + } + + return index.searchRange(startKey, endKey); + } + + /** + * Get keys at a specific cubic level + */ + public Set getKeysAtLevel(int level) { + if (!indexEnabled) { + throw new UnsupportedOperationException("Index is disabled"); + } + + CubicIndexNode node = index.getLevel(level); + return node != null ? node.getAllKeys() : Collections.emptySet(); + } + + /** + * Get keys on a specific side of a level + */ + public Set getKeysOnSide(int level, CubicIndexNode.Side side) { + if (!indexEnabled) { + throw new UnsupportedOperationException("Index is disabled"); + } + + CubicIndexNode node = index.getLevel(level); + return node != null ? node.getSide(side).keys() : Collections.emptySet(); + } + + /** + * Rebalance the cubic index + */ + public void rebalanceIndex() { + if (!indexEnabled) { + return; + } + + index.rebalance(); + logger.info("Cubic index rebalanced"); + } + + /** + * Rebuild index from backing storage + */ + public void rebuildIndex() throws IOException { + if (!indexEnabled) { + return; + } + + logger.info("Rebuilding cubic index from storage..."); + + index.clear(); + + // Scan all keys from backing storage + Iterator> entries = backingStorage.scanEntries(""); + int count = 0; + + while (entries.hasNext()) { + Map.Entry entry = entries.next(); + index.put(entry.getKey(), entry.getValue()); + count++; + } + + logger.info("Rebuilt cubic index with {} keys", count); + } + + @Override + public void flush() throws IOException { + backingStorage.flush(); + } + + @Override + public void compact() throws IOException { + backingStorage.compact(); + } + + @Override + public StorageStats getStats() { + StorageStats backingStats = backingStorage.getStats(); + + if (!indexEnabled) { + return backingStats; + } + + // Combine backing storage stats with index stats + Map indexStats = index.getStats(); + + return new StorageStats( + backingStats.getTotalKeys(), + backingStats.getTotalSize(), + backingStats.getMemtableSize(), + backingStats.getSstableCount() + ); + } + + /** + * Get cubic index statistics + */ + public Map getIndexStats() { + if (!indexEnabled) { + return Collections.emptyMap(); + } + + return index.getStats(); + } + + /** + * Print cubic index structure + */ + public void printIndexStructure() { + if (indexEnabled) { + index.printStructure(); + } + } + + /** + * Get the cubic index tree (for advanced operations) + */ + public CubicIndexTree getIndex() { + return index; + } + + public boolean isIndexEnabled() { + return indexEnabled; + } + + @Override + public void close() throws IOException { + if (indexEnabled) { + index.clear(); + } + backingStorage.close(); + } +} diff --git a/src/main/java/com/cube/replication/HintedHandoffManager.java b/src/main/java/com/cube/replication/HintedHandoffManager.java new file mode 100644 index 0000000..53a5bc1 --- /dev/null +++ b/src/main/java/com/cube/replication/HintedHandoffManager.java @@ -0,0 +1,315 @@ +package com.cube.replication; + +import com.cube.cluster.ClusterNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.file.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Hinted Handoff Manager - Stores writes for temporarily unavailable nodes. + * When a node is down, hints are stored locally and replayed when the node recovers. + */ +public class HintedHandoffManager { + + private static final Logger logger = LoggerFactory.getLogger(HintedHandoffManager.class); + + private final Path hintsDirectory; + private final Map> hintsByNode; // nodeId -> hints + private final ScheduledExecutorService replayExecutor; + private final int maxHintsPerNode; + private final long hintWindowMs; + + public static class Hint implements Serializable { + private static final long serialVersionUID = 1L; + + private final String targetNodeId; + private final String key; + private final byte[] value; + private final long timestamp; + private final long expirationTime; + + public Hint(String targetNodeId, String key, byte[] value, long hintWindowMs) { + this.targetNodeId = targetNodeId; + this.key = key; + this.value = value; + this.timestamp = System.currentTimeMillis(); + this.expirationTime = timestamp + hintWindowMs; + } + + public String getTargetNodeId() { return targetNodeId; } + public String getKey() { return key; } + public byte[] getValue() { return value; } + public long getTimestamp() { return timestamp; } + + public boolean isExpired() { + return System.currentTimeMillis() > expirationTime; + } + } + + public HintedHandoffManager(String hintsDir, int maxHintsPerNode, long hintWindowMs) throws IOException { + this.hintsDirectory = Paths.get(hintsDir); + this.maxHintsPerNode = maxHintsPerNode; + this.hintWindowMs = hintWindowMs; + this.hintsByNode = new ConcurrentHashMap<>(); + + Files.createDirectories(hintsDirectory); + + this.replayExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "CubeDB-HintReplay"); + t.setDaemon(true); + return t; + }); + + // Load existing hints from disk + loadHints(); + + // Schedule periodic replay + replayExecutor.scheduleAtFixedRate( + this::replayHints, + 10, + 10, + TimeUnit.SECONDS + ); + + logger.info("Hinted handoff manager initialized"); + } + + /** + * Store a hint for a temporarily unavailable node + */ + public void storeHint(String targetNodeId, String key, byte[] value) { + Queue hints = hintsByNode.computeIfAbsent(targetNodeId, k -> new ConcurrentLinkedQueue<>()); + + // Check if we've exceeded max hints for this node + if (hints.size() >= maxHintsPerNode) { + logger.warn("Max hints reached for node {}, dropping hint", targetNodeId); + return; + } + + Hint hint = new Hint(targetNodeId, key, value, hintWindowMs); + hints.add(hint); + + // Persist hint to disk + try { + persistHint(hint); + } catch (IOException e) { + logger.error("Failed to persist hint for node " + targetNodeId, e); + } + + logger.debug("Stored hint for node {} (key: {})", targetNodeId, key); + } + + /** + * Get the number of hints for a specific node + */ + public int getHintCount(String targetNodeId) { + Queue hints = hintsByNode.get(targetNodeId); + return hints != null ? hints.size() : 0; + } + + /** + * Get total number of hints across all nodes + */ + public int getTotalHintCount() { + return hintsByNode.values().stream() + .mapToInt(Queue::size) + .sum(); + } + + /** + * Replay hints for a specific node + */ + public void replayHintsForNode(String targetNodeId, HintReplayCallback callback) { + Queue hints = hintsByNode.get(targetNodeId); + if (hints == null || hints.isEmpty()) { + return; + } + + int replayed = 0; + int expired = 0; + int failed = 0; + + Iterator iterator = hints.iterator(); + while (iterator.hasNext()) { + Hint hint = iterator.next(); + + // Remove expired hints + if (hint.isExpired()) { + iterator.remove(); + expired++; + deleteHintFile(targetNodeId, hint.getKey()); + continue; + } + + // Try to replay hint + try { + boolean success = callback.replayHint(hint); + if (success) { + iterator.remove(); + replayed++; + deleteHintFile(targetNodeId, hint.getKey()); + } else { + failed++; + } + } catch (Exception e) { + logger.error("Failed to replay hint for node " + targetNodeId, e); + failed++; + } + } + + if (replayed > 0 || expired > 0) { + logger.info("Hint replay for node {}: replayed={}, expired={}, failed={}", + targetNodeId, replayed, expired, failed); + } + } + + /** + * Replay all hints + */ + private void replayHints() { + for (String nodeId : hintsByNode.keySet()) { + // Note: In a real implementation, you would check if the node is now alive + // and only replay if it is. For now, we'll leave this as a stub. + logger.debug("Checking hints for node {}", nodeId); + } + } + + /** + * Persist hint to disk + */ + private void persistHint(Hint hint) throws IOException { + Path nodeHintsDir = hintsDirectory.resolve(hint.getTargetNodeId()); + Files.createDirectories(nodeHintsDir); + + String filename = hint.getKey().replaceAll("[^a-zA-Z0-9.-]", "_") + "-" + hint.getTimestamp() + ".hint"; + Path hintFile = nodeHintsDir.resolve(filename); + + try (ObjectOutputStream oos = new ObjectOutputStream( + new BufferedOutputStream(Files.newOutputStream(hintFile)))) { + oos.writeObject(hint); + } + } + + /** + * Load hints from disk + */ + private void loadHints() throws IOException { + if (!Files.exists(hintsDirectory)) { + return; + } + + try (DirectoryStream nodeDirs = Files.newDirectoryStream(hintsDirectory)) { + for (Path nodeDir : nodeDirs) { + if (!Files.isDirectory(nodeDir)) { + continue; + } + + String nodeId = nodeDir.getFileName().toString(); + + try (DirectoryStream hintFiles = Files.newDirectoryStream(nodeDir, "*.hint")) { + for (Path hintFile : hintFiles) { + try (ObjectInputStream ois = new ObjectInputStream( + new BufferedInputStream(Files.newInputStream(hintFile)))) { + + Hint hint = (Hint) ois.readObject(); + + // Skip expired hints + if (hint.isExpired()) { + Files.deleteIfExists(hintFile); + continue; + } + + Queue hints = hintsByNode.computeIfAbsent(nodeId, k -> new ConcurrentLinkedQueue<>()); + hints.add(hint); + + } catch (ClassNotFoundException | IOException e) { + logger.error("Failed to load hint from " + hintFile, e); + } + } + } + } + } + + int totalHints = getTotalHintCount(); + if (totalHints > 0) { + logger.info("Loaded {} hints from disk", totalHints); + } + } + + /** + * Delete hint file from disk + */ + private void deleteHintFile(String nodeId, String key) { + try { + Path nodeHintsDir = hintsDirectory.resolve(nodeId); + if (!Files.exists(nodeHintsDir)) { + return; + } + + String keyPrefix = key.replaceAll("[^a-zA-Z0-9.-]", "_"); + + try (DirectoryStream files = Files.newDirectoryStream(nodeHintsDir, keyPrefix + "-*.hint")) { + for (Path file : files) { + Files.deleteIfExists(file); + } + } + } catch (IOException e) { + logger.error("Failed to delete hint file", e); + } + } + + /** + * Clear all hints for a node + */ + public void clearHintsForNode(String targetNodeId) { + hintsByNode.remove(targetNodeId); + + try { + Path nodeHintsDir = hintsDirectory.resolve(targetNodeId); + if (Files.exists(nodeHintsDir)) { + Files.walk(nodeHintsDir) + .sorted(Comparator.reverseOrder()) + .forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + logger.error("Failed to delete " + path, e); + } + }); + } + } catch (IOException e) { + logger.error("Failed to clear hints directory for " + targetNodeId, e); + } + + logger.info("Cleared all hints for node {}", targetNodeId); + } + + /** + * Shutdown the hint replay executor + */ + public void shutdown() { + replayExecutor.shutdown(); + try { + if (!replayExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + replayExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + replayExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + + logger.info("Hinted handoff manager shutdown"); + } + + /** + * Callback interface for replaying hints + */ + @FunctionalInterface + public interface HintReplayCallback { + boolean replayHint(Hint hint) throws Exception; + } +} diff --git a/src/main/java/com/cube/replication/NetworkTopologyReplicationStrategy.java b/src/main/java/com/cube/replication/NetworkTopologyReplicationStrategy.java new file mode 100644 index 0000000..bd678bc --- /dev/null +++ b/src/main/java/com/cube/replication/NetworkTopologyReplicationStrategy.java @@ -0,0 +1,89 @@ +package com.cube.replication; + +import com.cube.cluster.ClusterNode; +import java.util.*; + +/** + * Network topology aware replication strategy. + * Places replicas across different racks and datacenters. + */ +public class NetworkTopologyReplicationStrategy implements ReplicationStrategy { + + private final Map datacenterReplicationFactors; + + public NetworkTopologyReplicationStrategy(Map dcReplicationFactors) { + this.datacenterReplicationFactors = new HashMap<>(dcReplicationFactors); + } + + @Override + public List getReplicaNodes(String key, int replicationFactor, List availableNodes) { + List replicas = new ArrayList<>(); + + // Group nodes by datacenter + Map> nodesByDc = new HashMap<>(); + for (ClusterNode node : availableNodes) { + nodesByDc.computeIfAbsent(node.getDatacenter(), k -> new ArrayList<>()).add(node); + } + + // Get start node using consistent hashing + int startIndex = getStartIndex(key, availableNodes.size()); + ClusterNode primaryNode = availableNodes.get(startIndex); + + // Place replicas in each datacenter + for (Map.Entry entry : datacenterReplicationFactors.entrySet()) { + String dc = entry.getKey(); + int dcRF = entry.getValue(); + + List dcNodes = nodesByDc.getOrDefault(dc, new ArrayList<>()); + if (dcNodes.isEmpty()) { + continue; + } + + // Find starting node in this DC + int dcStartIndex = 0; + for (int i = 0; i < dcNodes.size(); i++) { + if (dcNodes.get(i).equals(primaryNode)) { + dcStartIndex = i; + break; + } + } + + // Select replicas in different racks if possible + Set usedRacks = new HashSet<>(); + int selected = 0; + int attempts = 0; + + while (selected < dcRF && attempts < dcNodes.size() * 2) { + int nodeIndex = (dcStartIndex + attempts) % dcNodes.size(); + ClusterNode node = dcNodes.get(nodeIndex); + + // Prefer nodes in different racks + if (!usedRacks.contains(node.getRack()) || usedRacks.size() >= dcRF) { + if (node.isAlive() && !replicas.contains(node)) { + replicas.add(node); + usedRacks.add(node.getRack()); + selected++; + } + } + + attempts++; + } + } + + return replicas; + } + + private int getStartIndex(String key, int nodeCount) { + int hash = key.hashCode(); + return Math.abs(hash) % nodeCount; + } + + @Override + public String getName() { + return "NetworkTopologyStrategy"; + } + + public Map getDatacenterReplicationFactors() { + return new HashMap<>(datacenterReplicationFactors); + } +} diff --git a/src/main/java/com/cube/replication/ReadRepairManager.java b/src/main/java/com/cube/replication/ReadRepairManager.java new file mode 100644 index 0000000..fef1bc1 --- /dev/null +++ b/src/main/java/com/cube/replication/ReadRepairManager.java @@ -0,0 +1,270 @@ +package com.cube.replication; + +import com.cube.cluster.ClusterNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Read Repair Manager - Ensures consistency by comparing replicas during reads. + * When inconsistencies are detected, the most recent value is propagated to all replicas. + */ +public class ReadRepairManager { + + private static final Logger logger = LoggerFactory.getLogger(ReadRepairManager.class); + + private final ExecutorService repairExecutor; + private final long readRepairChance; // Percentage (0-100) + private final Random random; + + /** + * Represents a read response from a replica + */ + public static class ReadResponse { + private final ClusterNode node; + private final String key; + private final byte[] value; + private final long timestamp; + private final boolean found; + + public ReadResponse(ClusterNode node, String key, byte[] value, long timestamp) { + this.node = node; + this.key = key; + this.value = value; + this.timestamp = timestamp; + this.found = value != null; + } + + public ClusterNode getNode() { return node; } + public String getKey() { return key; } + public byte[] getValue() { return value; } + public long getTimestamp() { return timestamp; } + public boolean isFound() { return found; } + } + + /** + * Represents a read repair result + */ + public static class ReadRepairResult { + private final String key; + private final boolean repairNeeded; + private final int repairedNodes; + private final byte[] canonicalValue; + + public ReadRepairResult(String key, boolean repairNeeded, int repairedNodes, byte[] canonicalValue) { + this.key = key; + this.repairNeeded = repairNeeded; + this.repairedNodes = repairedNodes; + this.canonicalValue = canonicalValue; + } + + public String getKey() { return key; } + public boolean isRepairNeeded() { return repairNeeded; } + public int getRepairedNodes() { return repairedNodes; } + public byte[] getCanonicalValue() { return canonicalValue; } + } + + public ReadRepairManager(long readRepairChance) { + this.readRepairChance = readRepairChance; + this.random = new Random(); + + this.repairExecutor = Executors.newFixedThreadPool( + Math.max(2, Runtime.getRuntime().availableProcessors() / 2), + r -> { + Thread t = new Thread(r, "CubeDB-ReadRepair"); + t.setDaemon(true); + return t; + } + ); + + logger.info("Read repair manager initialized with {}% chance", readRepairChance); + } + + /** + * Check if read repair should be performed for this read + */ + public boolean shouldPerformReadRepair() { + if (readRepairChance >= 100) { + return true; + } + if (readRepairChance <= 0) { + return false; + } + return random.nextInt(100) < readRepairChance; + } + + /** + * Perform read repair by comparing responses from all replicas. + * Returns the most recent value and asynchronously repairs inconsistent replicas. + * + * @param responses Responses from all replica nodes + * @param repairCallback Callback to write repaired values to nodes + * @return The canonical (most recent) value + */ + public CompletableFuture performReadRepair( + List responses, + RepairCallback repairCallback) { + + if (responses.isEmpty()) { + return CompletableFuture.completedFuture( + new ReadRepairResult(null, false, 0, null)); + } + + String key = responses.get(0).getKey(); + + // Find the most recent value (highest timestamp) + ReadResponse canonical = responses.stream() + .filter(ReadResponse::isFound) + .max(Comparator.comparingLong(ReadResponse::getTimestamp)) + .orElse(null); + + if (canonical == null) { + // No value found on any replica + return CompletableFuture.completedFuture( + new ReadRepairResult(key, false, 0, null)); + } + + // Check if repair is needed + boolean repairNeeded = false; + List nodesToRepair = new ArrayList<>(); + + for (ReadResponse response : responses) { + if (!response.isFound()) { + // Node doesn't have the value + repairNeeded = true; + nodesToRepair.add(response.getNode()); + } else if (response.getTimestamp() < canonical.getTimestamp()) { + // Node has stale value + repairNeeded = true; + nodesToRepair.add(response.getNode()); + } else if (!Arrays.equals(response.getValue(), canonical.getValue())) { + // Node has different value with same timestamp (conflict) + repairNeeded = true; + nodesToRepair.add(response.getNode()); + } + } + + if (!repairNeeded) { + return CompletableFuture.completedFuture( + new ReadRepairResult(key, false, 0, canonical.getValue())); + } + + // Perform async repair + return CompletableFuture.supplyAsync(() -> { + int repairedCount = 0; + + for (ClusterNode node : nodesToRepair) { + try { + boolean success = repairCallback.repairNode( + node, + canonical.getKey(), + canonical.getValue(), + canonical.getTimestamp() + ); + + if (success) { + repairedCount++; + logger.debug("Repaired node {} for key {}", node.getNodeId(), key); + } + } catch (Exception e) { + logger.error("Failed to repair node " + node.getNodeId() + " for key " + key, e); + } + } + + if (repairedCount > 0) { + logger.info("Read repair completed for key {}: repaired {} of {} nodes", + key, repairedCount, nodesToRepair.size()); + } + + return new ReadRepairResult(key, true, repairedCount, canonical.getValue()); + + }, repairExecutor); + } + + /** + * Perform blocking read repair + */ + public ReadRepairResult performReadRepairBlocking( + List responses, + RepairCallback repairCallback) throws Exception { + + return performReadRepair(responses, repairCallback) + .get(5, TimeUnit.SECONDS); + } + + /** + * Detect conflicts between replicas + */ + public List detectConflicts(List responses) { + List conflicts = new ArrayList<>(); + + if (responses.size() < 2) { + return conflicts; + } + + // Group by value + Map> valueGroups = new HashMap<>(); + + for (ReadResponse response : responses) { + if (response.isFound()) { + String valueKey = Arrays.toString(response.getValue()) + ":" + response.getTimestamp(); + valueGroups.computeIfAbsent(valueKey, k -> new ArrayList<>()).add(response); + } + } + + // If we have more than one distinct value, we have conflicts + if (valueGroups.size() > 1) { + conflicts.addAll(responses); + } + + return conflicts; + } + + /** + * Get read repair statistics + */ + public Map getStats() { + Map stats = new HashMap<>(); + stats.put("readRepairChance", readRepairChance + "%"); + stats.put("repairThreads", ((ThreadPoolExecutor) repairExecutor).getPoolSize()); + stats.put("activeRepairs", ((ThreadPoolExecutor) repairExecutor).getActiveCount()); + stats.put("queuedRepairs", ((ThreadPoolExecutor) repairExecutor).getQueue().size()); + return stats; + } + + /** + * Shutdown the repair executor + */ + public void shutdown() { + repairExecutor.shutdown(); + try { + if (!repairExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + repairExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + repairExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + + logger.info("Read repair manager shutdown"); + } + + /** + * Callback interface for repairing nodes + */ + @FunctionalInterface + public interface RepairCallback { + /** + * Write the canonical value to a node + * + * @param node The node to repair + * @param key The key to write + * @param value The canonical value + * @param timestamp The timestamp of the canonical value + * @return true if repair succeeded + */ + boolean repairNode(ClusterNode node, String key, byte[] value, long timestamp) throws Exception; + } +} diff --git a/src/main/java/com/cube/replication/ReplicationCoordinator.java b/src/main/java/com/cube/replication/ReplicationCoordinator.java new file mode 100644 index 0000000..4100fcf --- /dev/null +++ b/src/main/java/com/cube/replication/ReplicationCoordinator.java @@ -0,0 +1,293 @@ +package com.cube.replication; + +import com.cube.cluster.ClusterNode; +import com.cube.consistency.ConsistencyLevel; +import com.cube.storage.StorageEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; + +/** + * Replication Coordinator - Handles distributed reads and writes with consistency levels. + * + * Features: + * - Tunable consistency levels (ONE, QUORUM, ALL) + * - Read repair for consistency + * - Hinted handoff for availability + * - Timeout handling + */ +public class ReplicationCoordinator { + + private static final Logger logger = LoggerFactory.getLogger(ReplicationCoordinator.class); + + private final StorageEngine localStorage; + private final ReplicationStrategy replicationStrategy; + private final HintedHandoffManager hintedHandoff; + private final ReadRepairManager readRepair; + + private final int replicationFactor; + private final long writeTimeoutMs; + private final long readTimeoutMs; + + private final ExecutorService executorService; + + public static class WriteResult { + private final boolean success; + private final int successfulWrites; + private final int requiredWrites; + private final List failedNodes; + + public WriteResult(boolean success, int successfulWrites, int requiredWrites, List failedNodes) { + this.success = success; + this.successfulWrites = successfulWrites; + this.requiredWrites = requiredWrites; + this.failedNodes = failedNodes; + } + + public boolean isSuccess() { return success; } + public int getSuccessfulWrites() { return successfulWrites; } + public int getRequiredWrites() { return requiredWrites; } + public List getFailedNodes() { return failedNodes; } + } + + public static class ReadResult { + private final boolean success; + private final byte[] value; + private final long timestamp; + private final int responsesReceived; + private final boolean repairPerformed; + + public ReadResult(boolean success, byte[] value, long timestamp, int responsesReceived, boolean repairPerformed) { + this.success = success; + this.value = value; + this.timestamp = timestamp; + this.responsesReceived = responsesReceived; + this.repairPerformed = repairPerformed; + } + + public boolean isSuccess() { return success; } + public byte[] getValue() { return value; } + public long getTimestamp() { return timestamp; } + public int getResponsesReceived() { return responsesReceived; } + public boolean isRepairPerformed() { return repairPerformed; } + } + + public ReplicationCoordinator( + StorageEngine localStorage, + ReplicationStrategy replicationStrategy, + HintedHandoffManager hintedHandoff, + ReadRepairManager readRepair, + int replicationFactor, + long writeTimeoutMs, + long readTimeoutMs) { + + this.localStorage = localStorage; + this.replicationStrategy = replicationStrategy; + this.hintedHandoff = hintedHandoff; + this.readRepair = readRepair; + this.replicationFactor = replicationFactor; + this.writeTimeoutMs = writeTimeoutMs; + this.readTimeoutMs = readTimeoutMs; + + int threadCount = Math.max(4, Runtime.getRuntime().availableProcessors()); + this.executorService = Executors.newFixedThreadPool(threadCount, r -> { + Thread t = new Thread(r, "CubeDB-Replication"); + t.setDaemon(true); + return t; + }); + + logger.info("Replication coordinator initialized (RF={}, write_timeout={}ms, read_timeout={}ms)", + replicationFactor, writeTimeoutMs, readTimeoutMs); + } + + /** + * Perform a distributed write with tunable consistency. + * + * @param key The key to write + * @param value The value to write + * @param consistencyLevel Consistency level for the write + * @param allNodes All nodes in the cluster + * @return WriteResult with success status and details + */ + public WriteResult write( + String key, + byte[] value, + ConsistencyLevel consistencyLevel, + List allNodes) throws IOException { + + long timestamp = System.currentTimeMillis(); + + // Determine replica nodes + List replicaNodes = replicationStrategy.getReplicaNodes(key, replicationFactor, allNodes); + + if (replicaNodes.isEmpty()) { + throw new IOException("No replica nodes available"); + } + + int required = consistencyLevel.getRequiredResponses(replicationFactor); + + // Write to local storage first (coordinator is always one of the replicas in single-node setup) + try { + localStorage.put(key, value); + } catch (IOException e) { + logger.error("Failed to write to local storage", e); + return new WriteResult(false, 0, required, List.of("local")); + } + + // For now, in single-node mode, we succeed immediately + if (replicaNodes.size() == 1) { + return new WriteResult(true, 1, required, List.of()); + } + + // Multi-node write (stub for future implementation) + List failedNodes = new ArrayList<>(); + int successCount = 1; // Local write succeeded + + // Try to write to other replicas + for (ClusterNode node : replicaNodes) { + if (node.isAlive()) { + // In a real implementation, we would send the write over network + // For now, we'll simulate success + successCount++; + } else { + // Store hint for unavailable node + if (consistencyLevel.allowsHints()) { + hintedHandoff.storeHint(node.getNodeId(), key, value); + logger.debug("Stored hint for unavailable node {}", node.getNodeId()); + } + failedNodes.add(node.getNodeId()); + } + } + + boolean success = successCount >= required; + + if (success) { + logger.debug("Write successful: key={}, replicas={}/{}", key, successCount, replicaNodes.size()); + } else { + logger.warn("Write failed: key={}, replicas={}/{}, required={}", + key, successCount, replicaNodes.size(), required); + } + + return new WriteResult(success, successCount, required, failedNodes); + } + + /** + * Perform a distributed read with tunable consistency. + * + * @param key The key to read + * @param consistencyLevel Consistency level for the read + * @param allNodes All nodes in the cluster + * @return ReadResult with value and details + */ + public ReadResult read( + String key, + ConsistencyLevel consistencyLevel, + List allNodes) throws IOException { + + // Determine replica nodes + List replicaNodes = replicationStrategy.getReplicaNodes(key, replicationFactor, allNodes); + + if (replicaNodes.isEmpty()) { + throw new IOException("No replica nodes available"); + } + + int required = consistencyLevel.getRequiredResponses(replicationFactor); + + // Read from local storage first + byte[] localValue = localStorage.get(key); + long timestamp = System.currentTimeMillis(); + + // For single-node, return immediately + if (replicaNodes.size() == 1) { + return new ReadResult(true, localValue, timestamp, 1, false); + } + + // Multi-node read (stub for future implementation) + List responses = new ArrayList<>(); + responses.add(new ReadRepairManager.ReadResponse( + replicaNodes.get(0), key, localValue, timestamp)); + + // Check if we should perform read repair + boolean performRepair = readRepair.shouldPerformReadRepair() && consistencyLevel != ConsistencyLevel.ANY; + + if (performRepair && responses.size() > 1) { + // Perform async read repair + readRepair.performReadRepair(responses, (node, k, v, ts) -> { + // In a real implementation, send repair to node over network + logger.debug("Would repair node {} for key {}", node.getNodeId(), k); + return true; + }); + } + + return new ReadResult(true, localValue, timestamp, responses.size(), performRepair); + } + + /** + * Perform a local write (bypassing replication) + */ + public void writeLocal(String key, byte[] value) throws IOException { + localStorage.put(key, value); + } + + /** + * Perform a local read (bypassing replication) + */ + public byte[] readLocal(String key) throws IOException { + return localStorage.get(key); + } + + /** + * Delete a key with tunable consistency + */ + public WriteResult delete( + String key, + ConsistencyLevel consistencyLevel, + List allNodes) throws IOException { + + // Deletion is just a write with a tombstone + return write(key, new byte[0], consistencyLevel, allNodes); + } + + /** + * Get replication statistics + */ + public Map getStats() { + Map stats = new HashMap<>(); + stats.put("replicationFactor", replicationFactor); + stats.put("writeTimeoutMs", writeTimeoutMs); + stats.put("readTimeoutMs", readTimeoutMs); + stats.put("replicationStrategy", replicationStrategy.getName()); + stats.put("pendingHints", hintedHandoff.getTotalHintCount()); + stats.put("readRepairStats", readRepair.getStats()); + + ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService; + stats.put("replicationThreads", executor.getPoolSize()); + stats.put("activeReplicationTasks", executor.getActiveCount()); + stats.put("queuedReplicationTasks", executor.getQueue().size()); + + return stats; + } + + /** + * Shutdown the coordinator + */ + public void shutdown() { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + hintedHandoff.shutdown(); + readRepair.shutdown(); + + logger.info("Replication coordinator shutdown"); + } +} diff --git a/src/main/java/com/cube/replication/ReplicationStrategy.java b/src/main/java/com/cube/replication/ReplicationStrategy.java new file mode 100644 index 0000000..56c2e79 --- /dev/null +++ b/src/main/java/com/cube/replication/ReplicationStrategy.java @@ -0,0 +1,25 @@ +package com.cube.replication; + +import com.cube.cluster.ClusterNode; +import java.util.List; + +/** + * Strategy for determining which nodes should replicate data. + */ +public interface ReplicationStrategy { + + /** + * Get the list of replica nodes for a given key. + * + * @param key The key to replicate + * @param replicationFactor Number of replicas + * @param availableNodes All nodes in the cluster + * @return List of nodes that should store this key + */ + List getReplicaNodes(String key, int replicationFactor, List availableNodes); + + /** + * Get the name of this replication strategy. + */ + String getName(); +} diff --git a/src/main/java/com/cube/replication/SimpleReplicationStrategy.java b/src/main/java/com/cube/replication/SimpleReplicationStrategy.java new file mode 100644 index 0000000..0c70655 --- /dev/null +++ b/src/main/java/com/cube/replication/SimpleReplicationStrategy.java @@ -0,0 +1,52 @@ +package com.cube.replication; + +import com.cube.cluster.ClusterNode; +import java.util.ArrayList; +import java.util.List; + +/** + * Simple replication strategy that places replicas on consecutive nodes. + * Good for single-datacenter deployments. + */ +public class SimpleReplicationStrategy implements ReplicationStrategy { + + @Override + public List getReplicaNodes(String key, int replicationFactor, List availableNodes) { + List replicas = new ArrayList<>(); + + if (availableNodes.isEmpty()) { + return replicas; + } + + // Use consistent hashing to determine primary node + int startIndex = getStartIndex(key, availableNodes.size()); + + // Select consecutive nodes + for (int i = 0; i < replicationFactor && i < availableNodes.size(); i++) { + int nodeIndex = (startIndex + i) % availableNodes.size(); + ClusterNode node = availableNodes.get(nodeIndex); + + // Only add alive nodes + if (node.isAlive()) { + replicas.add(node); + } + } + + return replicas; + } + + /** + * Calculate start index using consistent hashing + */ + private int getStartIndex(String key, int nodeCount) { + // Simple hash function + int hash = key.hashCode(); + // Ensure positive and within range + return Math.abs(hash) % nodeCount; + } + + @Override + public String getName() { + return "SimpleStrategy"; + } +} diff --git a/src/main/java/com/cube/shell/CubeShell.java b/src/main/java/com/cube/shell/CubeShell.java new file mode 100644 index 0000000..31ac4a5 --- /dev/null +++ b/src/main/java/com/cube/shell/CubeShell.java @@ -0,0 +1,1060 @@ +package com.cube.shell; + +import com.cube.cluster.ClusterNode; +import com.cube.consistency.ConsistencyLevel; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.*; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.*; +import java.util.stream.Collectors; + +/** + * CubeShell - Interactive SQL shell for Cube database with cluster management + * + * Features: + * - Connect to multiple cluster nodes + * - View cluster topology + * - Set consistency levels + * - Execute queries with replication + * - Monitor node health + * - View replication statistics + */ +public class CubeShell { + + private static final String VERSION = "2.0.0"; + private static final String PROMPT = "cube> "; + + private final List clusterNodes; + private ClusterNode currentNode; + private ConsistencyLevel defaultConsistencyLevel; + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + private final List commandHistory; + private boolean running; + + public CubeShell() { + this.clusterNodes = new ArrayList<>(); + this.defaultConsistencyLevel = ConsistencyLevel.ONE; + this.httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(5)) + .build(); + this.objectMapper = new ObjectMapper(); + this.commandHistory = new ArrayList<>(); + this.running = true; + } + + public static void main(String[] args) { + CubeShell shell = new CubeShell(); + shell.printBanner(); + + // Parse command line arguments + String initialHost = "localhost"; + int initialPort = 8080; + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("--host") || args[i].equals("-h")) { + if (i + 1 < args.length) { + initialHost = args[++i]; + } + } else if (args[i].equals("--port") || args[i].equals("-p")) { + if (i + 1 < args.length) { + initialPort = Integer.parseInt(args[++i]); + } + } + } + + // Auto-connect to initial node + shell.connectToNode(initialHost, initialPort); + + // Start interactive shell + shell.run(); + } + + private void printBanner() { + System.out.println("╔══════════════════════════════════════════════════════════╗"); + System.out.println("║ 🌵 CubeShell v" + VERSION + " ║"); + System.out.println("║ CubeCactus Interactive Shell ║"); + System.out.println("║ SQL/CQL + Cluster Management ║"); + System.out.println("╚══════════════════════════════════════════════════════════╝"); + System.out.println(); + System.out.println("Type 'HELP' for available commands, 'EXIT' to quit."); + System.out.println(); + } + + private void run() { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) { + while (running) { + System.out.print(PROMPT); + String line = reader.readLine(); + + if (line == null) { + break; + } + + line = line.trim(); + if (line.isEmpty()) { + continue; + } + + commandHistory.add(line); + processCommand(line); + } + } catch (IOException e) { + System.err.println("Error reading input: " + e.getMessage()); + } + + System.out.println("\nGoodbye!"); + } + + private void processCommand(String line) { + String[] parts = line.split("\\s+", 2); + String command = parts[0].toUpperCase(); + String args = parts.length > 1 ? parts[1] : ""; + + try { + switch (command) { + case "HELP": + case "?": + showHelp(); + break; + + case "CONNECT": + handleConnect(args); + break; + + case "DISCONNECT": + handleDisconnect(args); + break; + + case "NODES": + case "CLUSTER": + showClusterInfo(); + break; + + case "USE": + handleUseNode(args); + break; + + case "CONSISTENCY": + case "CL": + handleConsistency(args); + break; + + case "STATUS": + showNodeStatus(); + break; + + case "STATS": + case "STATISTICS": + showReplicationStats(); + break; + + // SQL/CQL Commands + case "SELECT": + case "INSERT": + case "UPDATE": + case "DELETE": + case "CREATE": + case "DROP": + case "DESCRIBE": + case "DESC": + case "SHOW": + handleSQL(line); + break; + + case "PUT": + handlePut(args); + break; + + case "GET": + handleGet(args); + break; + + case "DEL": + handleDelete(args); + break; + + case "SCAN": + handleScan(args); + break; + + case "HISTORY": + showHistory(); + break; + + case "CLEAR": + case "CLS": + clearScreen(); + break; + + case "EXIT": + case "QUIT": + case "BYE": + running = false; + break; + + default: + // If not a shell command, try to execute as SQL + if (line.contains(" ") || line.endsWith(";")) { + handleSQL(line); + } else { + System.out.println("Unknown command: " + command); + System.out.println("Type 'HELP' for available commands."); + } + } + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + } + } + + private void showHelp() { + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ CubeShell Commands ║"); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.println("║ SQL/CQL Queries: ║"); + System.out.println("║ CREATE TABLE keyspace.table (col TYPE, ...) ║"); + System.out.println("║ INSERT INTO table (cols) VALUES (vals) ║"); + System.out.println("║ SELECT * FROM table WHERE id = 'value' ║"); + System.out.println("║ UPDATE table SET col='val' WHERE id='value' ║"); + System.out.println("║ DELETE FROM table WHERE id='value' ║"); + System.out.println("║ DESCRIBE table ║"); + System.out.println("║ SHOW TABLES ║"); + System.out.println("║ ║"); + System.out.println("║ Examples: ║"); + System.out.println("║ CREATE TABLE users (id TEXT PRIMARY KEY, name TEXT) ║"); + System.out.println("║ INSERT INTO users (id, name) VALUES ('1', 'Alice') ║"); + System.out.println("║ SELECT * FROM users WHERE id = '1' ║"); + System.out.println("║ UPDATE users SET name = 'Bob' WHERE id = '1' ║"); + System.out.println("║ ║"); + System.out.println("║ Key-Value Operations: ║"); + System.out.println("║ PUT - Store key-value pair ║"); + System.out.println("║ GET - Retrieve value ║"); + System.out.println("║ DEL - Delete key ║"); + System.out.println("║ SCAN - Scan keys by prefix ║"); + System.out.println("║ ║"); + System.out.println("║ Cluster Management: ║"); + System.out.println("║ CONNECT - Connect to node ║"); + System.out.println("║ DISCONNECT - Disconnect from node ║"); + System.out.println("║ NODES / CLUSTER - Show cluster topology ║"); + System.out.println("║ USE - Switch active node ║"); + System.out.println("║ STATUS - Show current node status ║"); + System.out.println("║ STATS - Show replication stats ║"); + System.out.println("║ ║"); + System.out.println("║ Consistency: ║"); + System.out.println("║ CONSISTENCY - Set consistency (ONE/QUORUM/ALL)║"); + System.out.println("║ CL - Short form ║"); + System.out.println("║ ║"); + System.out.println("║ Utility: ║"); + System.out.println("║ HISTORY - Show command history ║"); + System.out.println("║ CLEAR / CLS - Clear screen ║"); + System.out.println("║ HELP / ? - Show this help ║"); + System.out.println("║ EXIT / QUIT - Exit shell ║"); + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + + System.out.println("║ SELECT * FROM table WHERE id = 'value' ║"); + System.out.println("║ UPDATE table SET col='val' WHERE id='value' ║"); + System.out.println("║ DELETE FROM table WHERE id='value' ║"); + System.out.println("║ DESCRIBE table ║"); + System.out.println("║ SHOW TABLES ║"); + System.out.println("║ ║"); + System.out.println("║ Cluster Management: ║"); + System.out.println("║ CONNECT - Add node to cluster ║"); + System.out.println("║ DISCONNECT - Remove node from cluster ║"); + System.out.println("║ NODES / CLUSTER - Show all cluster nodes ║"); + System.out.println("║ USE - Switch to specific node ║"); + System.out.println("║ STATUS - Show current node status ║"); + System.out.println("║ STATS - Show replication statistics ║"); + System.out.println("║ ║"); + System.out.println("║ Consistency: ║"); + System.out.println("║ CONSISTENCY - Set consistency level ║"); + System.out.println("║ CL - Short form ║"); + System.out.println("║ Levels: ONE, TWO, THREE, QUORUM, ALL, ANY ║"); + System.out.println("║ ║"); + System.out.println("║ Key-Value Operations (Legacy): ║"); + System.out.println("║ PUT - Write key-value ║"); + System.out.println("║ GET - Read value ║"); + System.out.println("║ DELETE - Delete key ║"); + System.out.println("║ SCAN - Scan with prefix ║"); + System.out.println("║ ║"); + System.out.println("║ Shell Commands: ║"); + System.out.println("║ HISTORY - Show command history ║"); + System.out.println("║ CLEAR - Clear screen ║"); + System.out.println("║ HELP / ? - Show this help ║"); + System.out.println("║ EXIT / QUIT - Exit shell ║"); + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + + System.out.println("Examples:"); + System.out.println(" CREATE TABLE users.profiles (id TEXT PRIMARY KEY, name TEXT, email TEXT)"); + System.out.println(" INSERT INTO users.profiles (id, name, email) VALUES ('1', 'Alice', 'alice@example.com')"); + System.out.println(" SELECT * FROM users.profiles WHERE id = '1'"); + System.out.println(" UPDATE users.profiles SET name = 'Alice Johnson' WHERE id = '1'"); + System.out.println(" DELETE FROM users.profiles WHERE id = '1'"); + System.out.println(); + } + + private void handleConnect(String args) { + String[] parts = args.split("\\s+"); + if (parts.length < 2) { + System.out.println("Usage: CONNECT "); + return; + } + + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + + connectToNode(host, port); + } + + private void connectToNode(String host, int port) { + try { + // Test connection + String url = String.format("http://%s:%d/api/v1/health", host, port); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .timeout(Duration.ofSeconds(5)) + .GET() + .build(); + + HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 200) { + // Extract node ID from response or generate one + String nodeId = String.format("node-%s-%d", host, port); + + ClusterNode node = new ClusterNode(nodeId, host, port); + + // Check if already connected + boolean exists = clusterNodes.stream() + .anyMatch(n -> n.getHost().equals(host) && n.getPort() == port); + + if (!exists) { + clusterNodes.add(node); + System.out.println("✓ Connected to " + host + ":" + port); + System.out.println(" Node ID: " + nodeId); + + if (currentNode == null) { + currentNode = node; + System.out.println(" Set as current node"); + } + } else { + System.out.println("Already connected to " + host + ":" + port); + } + } else { + System.out.println("✗ Failed to connect: HTTP " + response.statusCode()); + } + } catch (Exception e) { + System.out.println("✗ Failed to connect: " + e.getMessage()); + } + } + + private void handleDisconnect(String args) { + if (args.isEmpty()) { + System.out.println("Usage: DISCONNECT "); + return; + } + + String nodeId = args.trim(); + boolean removed = clusterNodes.removeIf(n -> n.getNodeId().equals(nodeId)); + + if (removed) { + System.out.println("✓ Disconnected from " + nodeId); + + if (currentNode != null && currentNode.getNodeId().equals(nodeId)) { + currentNode = clusterNodes.isEmpty() ? null : clusterNodes.get(0); + if (currentNode != null) { + System.out.println(" Switched to " + currentNode.getNodeId()); + } + } + } else { + System.out.println("✗ Node not found: " + nodeId); + } + } + + private void showClusterInfo() { + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ Cluster Nodes ║"); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + + if (clusterNodes.isEmpty()) { + System.out.println("║ No nodes connected ║"); + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + return; + } + + for (ClusterNode node : clusterNodes) { + String current = (node.equals(currentNode)) ? "➜ " : " "; + String status = node.isAlive() ? "✓" : "✗"; + + System.out.printf("║ %s%s %-20s %s %-25s ║%n", + current, status, node.getNodeId(), + node.getEndpoint(), + "DC:" + node.getDatacenter()); + } + + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.printf("║ Total Nodes: %-3d Alive: %-3d Current: %-18s║%n", + clusterNodes.size(), + clusterNodes.stream().filter(ClusterNode::isAlive).count(), + currentNode != null ? currentNode.getNodeId() : "none"); + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + } + + private void handleUseNode(String args) { + if (args.isEmpty()) { + System.out.println("Usage: USE "); + return; + } + + String nodeId = args.trim(); + ClusterNode node = clusterNodes.stream() + .filter(n -> n.getNodeId().equals(nodeId)) + .findFirst() + .orElse(null); + + if (node != null) { + currentNode = node; + System.out.println("✓ Switched to " + nodeId); + } else { + System.out.println("✗ Node not found: " + nodeId); + } + } + + private void handleConsistency(String args) { + if (args.isEmpty()) { + System.out.println("\nCurrent consistency level: " + defaultConsistencyLevel); + System.out.println("\nAvailable levels:"); + for (ConsistencyLevel cl : ConsistencyLevel.values()) { + System.out.println(" " + cl.name() + " - " + cl.getDescription()); + } + System.out.println(); + return; + } + + try { + ConsistencyLevel cl = ConsistencyLevel.valueOf(args.trim().toUpperCase()); + defaultConsistencyLevel = cl; + System.out.println("✓ Consistency level set to " + cl); + } catch (IllegalArgumentException e) { + System.out.println("✗ Invalid consistency level: " + args); + System.out.println(" Valid: ONE, TWO, THREE, QUORUM, ALL, ANY"); + } + } + + private void showNodeStatus() { + if (currentNode == null) { + System.out.println("✗ Not connected to any node"); + return; + } + + try { + String url = String.format("http://%s:%d/api/v1/stats", + currentNode.getHost(), currentNode.getPort()); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .GET() + .build(); + + HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 200) { + @SuppressWarnings("unchecked") + Map data = objectMapper.readValue( + response.body(), Map.class); + + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ Node Status ║"); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.printf("║ Node: %-45s ║%n", currentNode.getNodeId()); + System.out.printf("║ Endpoint: %-45s ║%n", currentNode.getEndpoint()); + System.out.printf("║ Status: %-45s ║%n", "✓ ALIVE"); + + if (data.containsKey("stats")) { + @SuppressWarnings("unchecked") + Map stats = (Map) data.get("stats"); + + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.println("║ Storage Statistics: ║"); + System.out.printf("║ Total Keys: %-40s ║%n", stats.get("totalKeys")); + System.out.printf("║ Total Size: %-40s ║%n", stats.get("totalSize") + " bytes"); + System.out.printf("║ MemTable Size: %-40s ║%n", stats.get("memtableSize") + " bytes"); + System.out.printf("║ SSTable Count: %-40s ║%n", stats.get("sstableCount")); + } + + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + } + } catch (Exception e) { + System.out.println("✗ Failed to get status: " + e.getMessage()); + } + } + + private void showReplicationStats() { + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ Replication Statistics ║"); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.printf("║ Cluster Nodes: %-33d ║%n", clusterNodes.size()); + System.out.printf("║ Alive Nodes: %-33d ║%n", + clusterNodes.stream().filter(ClusterNode::isAlive).count()); + System.out.printf("║ Default Consistency: %-33s ║%n", defaultConsistencyLevel); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + System.out.println("║ Datacenter Distribution: ║"); + + Map dcCount = clusterNodes.stream() + .collect(Collectors.groupingBy(ClusterNode::getDatacenter, Collectors.counting())); + + for (Map.Entry entry : dcCount.entrySet()) { + System.out.printf("║ %-20s %-33d ║%n", entry.getKey() + ":", entry.getValue()); + } + + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + } + + private void handlePut(String args) { + if (currentNode == null) { + System.out.println("✗ Not connected to any node"); + return; + } + + String[] parts = args.split("\\s+", 2); + if (parts.length < 2) { + System.out.println("Usage: PUT "); + return; + } + + String key = parts[0]; + String value = parts[1]; + + try { + String url = String.format("http://%s:%d/api/v1/put", + currentNode.getHost(), currentNode.getPort()); + + Map body = new HashMap<>(); + body.put("key", key); + body.put("value", value); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString( + objectMapper.writeValueAsString(body))) + .build(); + + HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 200) { + System.out.println("✓ PUT successful"); + System.out.println(" Key: " + key); + System.out.println(" Value: " + value); + System.out.println(" CL: " + defaultConsistencyLevel); + } else { + System.out.println("✗ PUT failed: HTTP " + response.statusCode()); + } + } catch (Exception e) { + System.out.println("✗ Error: " + e.getMessage()); + } + } + + private void handleGet(String args) { + if (currentNode == null) { + System.out.println("✗ Not connected to any node"); + return; + } + + if (args.isEmpty()) { + System.out.println("Usage: GET "); + return; + } + + String key = args.trim(); + + try { + String url = String.format("http://%s:%d/api/v1/get/%s", + currentNode.getHost(), currentNode.getPort(), key); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .GET() + .build(); + + HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 200) { + @SuppressWarnings("unchecked") + Map data = objectMapper.readValue( + response.body(), Map.class); + + if (Boolean.TRUE.equals(data.get("found"))) { + System.out.println("✓ Found"); + System.out.println(" Key: " + key); + System.out.println(" Value: " + data.get("value")); + System.out.println(" CL: " + defaultConsistencyLevel); + } else { + System.out.println("✗ Not found: " + key); + } + } else { + System.out.println("✗ GET failed: HTTP " + response.statusCode()); + } + } catch (Exception e) { + System.out.println("✗ Error: " + e.getMessage()); + } + } + + private void handleDelete(String args) { + if (currentNode == null) { + System.out.println("✗ Not connected to any node"); + return; + } + + if (args.isEmpty()) { + System.out.println("Usage: DELETE "); + return; + } + + String key = args.trim(); + + try { + String url = String.format("http://%s:%d/api/v1/delete/%s", + currentNode.getHost(), currentNode.getPort(), key); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .DELETE() + .build(); + + HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 200) { + System.out.println("✓ DELETE successful"); + System.out.println(" Key: " + key); + System.out.println(" CL: " + defaultConsistencyLevel); + } else { + System.out.println("✗ DELETE failed: HTTP " + response.statusCode()); + } + } catch (Exception e) { + System.out.println("✗ Error: " + e.getMessage()); + } + } + + private void handleScan(String args) { + if (currentNode == null) { + System.out.println("✗ Not connected to any node"); + return; + } + + if (args.isEmpty()) { + System.out.println("Usage: SCAN "); + return; + } + + String prefix = args.trim(); + + try { + String url = String.format("http://%s:%d/api/v1/scan?prefix=%s", + currentNode.getHost(), currentNode.getPort(), prefix); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .GET() + .build(); + + HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 200) { + @SuppressWarnings("unchecked") + Map data = objectMapper.readValue( + response.body(), Map.class); + + int count = (int) data.get("count"); + System.out.println("✓ Found " + count + " result(s)"); + + if (count > 0) { + @SuppressWarnings("unchecked") + Map results = (Map) data.get("results"); + + System.out.println("\n┌────────────────────────────┬────────────────────────────┐"); + System.out.println("│ Key │ Value │"); + System.out.println("├────────────────────────────┼────────────────────────────┤"); + + for (Map.Entry entry : results.entrySet()) { + System.out.printf("│ %-26s │ %-26s │%n", + truncate(entry.getKey(), 26), + truncate(entry.getValue(), 26)); + } + + System.out.println("└────────────────────────────┴────────────────────────────┘"); + } + System.out.println(); + } else { + System.out.println("✗ SCAN failed: HTTP " + response.statusCode()); + } + } catch (Exception e) { + System.out.println("✗ Error: " + e.getMessage()); + } + } + + /** + * Handle SQL/CQL query execution + */ + private void handleSQLV1(String sql) { + if (currentNode == null) { + System.out.println("✗ Not connected to any node"); + return; + } + + // Remove trailing semicolon if present + sql = sql.trim(); + if (sql.endsWith(";")) { + sql = sql.substring(0, sql.length() - 1).trim(); + } + + try { + String url = String.format("http://%s:%d/api/v1/sql/execute", + currentNode.getHost(), currentNode.getPort()); + + Map requestBody = new HashMap<>(); + requestBody.put("sql", sql); + + String jsonBody = objectMapper.writeValueAsString(requestBody); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(jsonBody)) + .build(); + + HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 200) { + @SuppressWarnings("unchecked") + Map data = objectMapper.readValue( + response.body(), Map.class); + + boolean success = (boolean) data.getOrDefault("success", false); + String message = (String) data.getOrDefault("message", ""); + + if (success) { + System.out.println("✓ " + message); + + // Check if there are rows to display + if (data.containsKey("rows")) { + @SuppressWarnings("unchecked") + List> rows = + (List>) data.get("rows"); + + if (!rows.isEmpty()) { + displayResultTable(rows); + } + } + + // Display rows affected if present + if (data.containsKey("rowsAffected")) { + int rowsAffected = (int) data.get("rowsAffected"); + System.out.println(" (" + rowsAffected + " row(s) affected)"); + } + + // Display row count if present + if (data.containsKey("rowCount")) { + int rowCount = (int) data.get("rowCount"); + if (rowCount > 0) { + System.out.println(" Found " + rowCount + " row(s)"); + } + } + } else { + System.out.println("✗ " + message); + } + } else { + System.out.println("✗ SQL execution failed: HTTP " + response.statusCode()); + } + + } catch (Exception e) { + System.out.println("✗ Error: " + e.getMessage()); + } + } + + /** + * Display SQL results as a formatted table + */ + private void displayResultTable(List> rows) { + if (rows.isEmpty()) { + return; + } + + // Get all column names + Set allColumns = new LinkedHashSet<>(); + for (Map row : rows) { + allColumns.addAll(row.keySet()); + } + + List columns = new ArrayList<>(allColumns); + int numColumns = columns.size(); + + // Calculate column widths (max 30 chars per column) + Map colWidths = new HashMap<>(); + for (String col : columns) { + int maxWidth = col.length(); + for (Map row : rows) { + String val = row.getOrDefault(col, ""); + maxWidth = Math.max(maxWidth, Math.min(val.length(), 30)); + } + colWidths.put(col, Math.min(maxWidth + 2, 30)); + } + + // Print table + System.out.println(); + + // Top border + printTableBorder(columns, colWidths, "┌", "┬", "┐"); + + // Header + System.out.print("│"); + for (String col : columns) { + int width = colWidths.get(col); + System.out.print(" " + padRight(col, width - 1) + "│"); + } + System.out.println(); + + // Header separator + printTableBorder(columns, colWidths, "├", "┼", "┤"); + + // Data rows + for (Map row : rows) { + System.out.print("│"); + for (String col : columns) { + int width = colWidths.get(col); + String val = row.getOrDefault(col, ""); + System.out.print(" " + padRight(truncate(val, 28), width - 1) + "│"); + } + System.out.println(); + } + + // Bottom border + printTableBorder(columns, colWidths, "└", "┴", "┘"); + System.out.println(); + } + + /** + * Print table border + */ + private void printTableBorder(List columns, Map colWidths, + String left, String mid, String right) { + System.out.print(left); + for (int i = 0; i < columns.size(); i++) { + String col = columns.get(i); + int width = colWidths.get(col); + System.out.print("─".repeat(width)); + if (i < columns.size() - 1) { + System.out.print(mid); + } + } + System.out.println(right); + } + + /** + * Pad string to the right with spaces + */ + private String padRight(String str, int length) { + if (str.length() >= length) { + return str; + } + return str + " ".repeat(length - str.length()); + } + + private void handleSQL(String sql) { + if (currentNode == null) { + System.out.println("✗ Not connected to any node"); + return; + } + + try { + String url = String.format("http://%s:%d/api/v1/sql/execute", + currentNode.getHost(), currentNode.getPort()); + + Map body = new HashMap<>(); + body.put("sql", sql); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString( + objectMapper.writeValueAsString(body))) + .timeout(Duration.ofSeconds(30)) + .build(); + + HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 200) { + @SuppressWarnings("unchecked") + Map data = objectMapper.readValue( + response.body(), Map.class); + + boolean success = Boolean.TRUE.equals(data.get("success")); + + if (success) { + System.out.println("✓ Query executed successfully"); + + // Display message if present + if (data.containsKey("message")) { + System.out.println(" " + data.get("message")); + } + + // Display rows affected if present + if (data.containsKey("rowsAffected")) { + System.out.println(" Rows affected: " + data.get("rowsAffected")); + } + + // Display result rows if present + if (data.containsKey("rows")) { + @SuppressWarnings("unchecked") + List> rows = (List>) data.get("rows"); + + if (!rows.isEmpty()) { + displayResultTable(rows); + } + } + + // Display row count if present + if (data.containsKey("rowCount")) { + System.out.println(" Row count: " + data.get("rowCount")); + } + + System.out.println(" CL: " + defaultConsistencyLevel); + } else { + System.out.println("✗ Query failed"); + if (data.containsKey("message")) { + System.out.println(" Error: " + data.get("message")); + } + } + } else { + System.out.println("✗ Query failed: HTTP " + response.statusCode()); + System.out.println(" Response: " + response.body()); + } + } catch (Exception e) { + System.out.println("✗ Error executing SQL: " + e.getMessage()); + e.printStackTrace(); + } + } + + private void displayResultTableV1(List> rows) { + if (rows.isEmpty()) { + System.out.println(" (Empty result set)"); + return; + } + + // Get all unique column names + Set columnSet = new LinkedHashSet<>(); + for (Map row : rows) { + columnSet.addAll(row.keySet()); + } + List columns = new ArrayList<>(columnSet); + + // Calculate column widths + Map widths = new HashMap<>(); + for (String col : columns) { + int maxWidth = col.length(); + for (Map row : rows) { + String value = row.get(col); + if (value != null) { + maxWidth = Math.max(maxWidth, value.length()); + } + } + widths.put(col, Math.min(maxWidth, 30)); // Max 30 chars per column + } + + // Print header + System.out.println(); + printTableSeparator(columns, widths, '┌', '┬', '┐'); + printTableRow(columns, columns, widths); + printTableSeparator(columns, widths, '├', '┼', '┤'); + + // Print rows + for (Map row : rows) { + List values = new ArrayList<>(); + for (String col : columns) { + String value = row.get(col); + values.add(value != null ? value : "NULL"); + } + printTableRow(columns, values, widths); + } + + // Print footer + printTableSeparator(columns, widths, '└', '┴', '┘'); + System.out.println(); + } + + private void printTableSeparator(List columns, Map widths, + char left, char middle, char right) { + System.out.print(left); + for (int i = 0; i < columns.size(); i++) { + String col = columns.get(i); + int width = widths.get(col); + for (int j = 0; j < width + 2; j++) { + System.out.print('─'); + } + if (i < columns.size() - 1) { + System.out.print(middle); + } + } + System.out.println(right); + } + + private void printTableRow(List columns, List values, + Map widths) { + System.out.print("│"); + for (int i = 0; i < columns.size(); i++) { + String col = columns.get(i); + String value = values.get(i); + int width = widths.get(col); + + if (value.length() > width) { + value = value.substring(0, width - 3) + "..."; + } + + System.out.print(" " + String.format("%-" + width + "s", value) + " │"); + } + System.out.println(); + } + + private void showHistory() { + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ Command History ║"); + System.out.println("╠════════════════════════════════════════════════════════════╣"); + + if (commandHistory.isEmpty()) { + System.out.println("║ No commands in history ║"); + } else { + int start = Math.max(0, commandHistory.size() - 20); + for (int i = start; i < commandHistory.size(); i++) { + System.out.printf("║ %3d: %-53s ║%n", i + 1, + truncate(commandHistory.get(i), 53)); + } + } + + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); + } + + private void clearScreen() { + System.out.print("\033[H\033[2J"); + System.out.flush(); + printBanner(); + } + + private String truncate(String str, int maxLen) { + if (str.length() <= maxLen) { + return str; + } + return str.substring(0, maxLen - 3) + "..."; + } +} diff --git a/src/main/java/com/cube/sql/SQLExecutor.java b/src/main/java/com/cube/sql/SQLExecutor.java new file mode 100644 index 0000000..ec10982 --- /dev/null +++ b/src/main/java/com/cube/sql/SQLExecutor.java @@ -0,0 +1,314 @@ +package com.cube.sql; + +import com.cube.cql.CQLParser; +import com.cube.cql.QueryExecutor; +import com.cube.storage.StorageEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * SQL Executor - Executes SQL statements by translating to internal operations + */ +public class SQLExecutor { + + private static final Logger logger = LoggerFactory.getLogger(SQLExecutor.class); + + private final QueryExecutor queryExecutor; + private final StorageEngine storageEngine; + + public static class SQLResult { + private final boolean success; + private final String message; + private final List> rows; + private final int rowsAffected; + + public SQLResult(boolean success, String message, List> rows, int rowsAffected) { + this.success = success; + this.message = message; + this.rows = rows != null ? rows : new ArrayList<>(); + this.rowsAffected = rowsAffected; + } + + public boolean isSuccess() { return success; } + public String getMessage() { return message; } + public List> getRows() { return rows; } + public int getRowsAffected() { return rowsAffected; } + + public static SQLResult success(String message) { + return new SQLResult(true, message, null, 0); + } + + public static SQLResult success(String message, int rowsAffected) { + return new SQLResult(true, message, null, rowsAffected); + } + + public static SQLResult success(List> rows) { + return new SQLResult(true, "Query executed successfully", rows, rows.size()); + } + + public static SQLResult error(String message) { + return new SQLResult(false, message, null, 0); + } + } + + public SQLExecutor(QueryExecutor queryExecutor, StorageEngine storageEngine) { + this.queryExecutor = queryExecutor; + this.storageEngine = storageEngine; + } + + /** + * Execute SQL statement + */ + public SQLResult execute(String sql) { + try { + SQLParser.ParsedSQL parsedSQL = SQLParser.parse(sql); + + logger.info("Executing SQL: {} (Type: {})", sql, parsedSQL.getType()); + + switch (parsedSQL.getType()) { + case SELECT: + return executeSelect(parsedSQL); + case INSERT: + return executeInsert(parsedSQL); + case UPDATE: + return executeUpdate(parsedSQL); + case DELETE: + return executeDelete(parsedSQL); + case CREATE_TABLE: + return executeCreateTable(parsedSQL); + case DROP_TABLE: + return executeDropTable(parsedSQL); + case DESCRIBE: + return executeDescribe(parsedSQL); + case SHOW_TABLES: + return executeShowTables(); + default: + return SQLResult.error("Unsupported SQL type: " + parsedSQL.getType()); + } + } catch (Exception e) { + logger.error("SQL execution error", e); + return SQLResult.error("Error: " + e.getMessage()); + } + } + + /** + * Execute SELECT statement + */ + private SQLResult executeSelect(SQLParser.ParsedSQL sql) { + try { + // Build CQL query + StringBuilder cql = new StringBuilder("SELECT "); + + if (sql.getSelectColumns().contains("*")) { + cql.append("*"); + } else { + cql.append(String.join(", ", sql.getSelectColumns())); + } + + cql.append(" FROM ").append(sql.getKeyspace()).append(".").append(sql.getTable()); + + if (!sql.getWhereClause().isEmpty()) { + cql.append(" WHERE "); + List conditions = new ArrayList<>(); + for (Map.Entry entry : sql.getWhereClause().entrySet()) { + conditions.add(entry.getKey() + " = '" + entry.getValue() + "'"); + } + cql.append(String.join(" AND ", conditions)); + } + + // Execute via CQL + CQLParser.ParsedQuery parsedQuery = CQLParser.parse(cql.toString()); + QueryExecutor.Result result = queryExecutor.execute(parsedQuery); + + if (result.isSuccess()) { + return SQLResult.success(convertRows(result.getRows())); + } else { + return SQLResult.error(result.getMessage()); + } + + } catch (Exception e) { + return SQLResult.error("SELECT error: " + e.getMessage()); + } + } + + /** + * Execute INSERT statement + */ + private SQLResult executeInsert(SQLParser.ParsedSQL sql) { + try { + // Build CQL query + StringBuilder cql = new StringBuilder("INSERT INTO "); + cql.append(sql.getKeyspace()).append(".").append(sql.getTable()); + cql.append(" ("); + + List columns = new ArrayList<>(sql.getColumns().keySet()); + cql.append(String.join(", ", columns)); + cql.append(") VALUES ("); + + List values = new ArrayList<>(); + for (String col : columns) { + values.add("'" + sql.getColumns().get(col) + "'"); + } + cql.append(String.join(", ", values)); + cql.append(")"); + + // Execute via CQL + CQLParser.ParsedQuery parsedQuery = CQLParser.parse(cql.toString()); + QueryExecutor.Result result = queryExecutor.execute(parsedQuery); + + if (result.isSuccess()) { + return SQLResult.success(result.getMessage(), result.getRowsAffected()); + } else { + return SQLResult.error(result.getMessage()); + } + + } catch (Exception e) { + return SQLResult.error("INSERT error: " + e.getMessage()); + } + } + + /** + * Execute UPDATE statement + */ + private SQLResult executeUpdate(SQLParser.ParsedSQL sql) { + try { + // Build CQL query + StringBuilder cql = new StringBuilder("UPDATE "); + cql.append(sql.getKeyspace()).append(".").append(sql.getTable()); + cql.append(" SET "); + + List sets = new ArrayList<>(); + for (Map.Entry entry : sql.getColumns().entrySet()) { + sets.add(entry.getKey() + "='" + entry.getValue() + "'"); + } + cql.append(String.join(", ", sets)); + + cql.append(" WHERE "); + List conditions = new ArrayList<>(); + for (Map.Entry entry : sql.getWhereClause().entrySet()) { + conditions.add(entry.getKey() + "='" + entry.getValue() + "'"); + } + cql.append(String.join(" AND ", conditions)); + + // Execute via CQL + CQLParser.ParsedQuery parsedQuery = CQLParser.parse(cql.toString()); + QueryExecutor.Result result = queryExecutor.execute(parsedQuery); + + if (result.isSuccess()) { + return SQLResult.success(result.getMessage(), result.getRowsAffected()); + } else { + return SQLResult.error(result.getMessage()); + } + + } catch (Exception e) { + return SQLResult.error("UPDATE error: " + e.getMessage()); + } + } + + /** + * Execute DELETE statement + */ + private SQLResult executeDelete(SQLParser.ParsedSQL sql) { + try { + // Build CQL query + StringBuilder cql = new StringBuilder("DELETE FROM "); + cql.append(sql.getKeyspace()).append(".").append(sql.getTable()); + cql.append(" WHERE "); + + List conditions = new ArrayList<>(); + for (Map.Entry entry : sql.getWhereClause().entrySet()) { + conditions.add(entry.getKey() + "='" + entry.getValue() + "'"); + } + cql.append(String.join(" AND ", conditions)); + + // Execute via CQL + CQLParser.ParsedQuery parsedQuery = CQLParser.parse(cql.toString()); + QueryExecutor.Result result = queryExecutor.execute(parsedQuery); + + if (result.isSuccess()) { + return SQLResult.success(result.getMessage(), result.getRowsAffected()); + } else { + return SQLResult.error(result.getMessage()); + } + + } catch (Exception e) { + return SQLResult.error("DELETE error: " + e.getMessage()); + } + } + + /** + * Execute CREATE TABLE statement + */ + private SQLResult executeCreateTable(SQLParser.ParsedSQL sql) { + try { + // Build CQL query + StringBuilder cql = new StringBuilder("CREATE TABLE "); + cql.append(sql.getKeyspace()).append(".").append(sql.getTable()); + cql.append(" ("); + + List colDefs = new ArrayList<>(); + for (Map.Entry entry : sql.getColumnDefinitions().entrySet()) { + colDefs.add(entry.getKey() + " " + entry.getValue()); + } + cql.append(String.join(", ", colDefs)); + cql.append(")"); + + // Execute via CQL + CQLParser.ParsedQuery parsedQuery = CQLParser.parse(cql.toString()); + QueryExecutor.Result result = queryExecutor.execute(parsedQuery); + + if (result.isSuccess()) { + return SQLResult.success("Table created: " + sql.getKeyspace() + "." + sql.getTable()); + } else { + return SQLResult.error(result.getMessage()); + } + + } catch (Exception e) { + return SQLResult.error("CREATE TABLE error: " + e.getMessage()); + } + } + + /** + * Execute DROP TABLE statement + */ + private SQLResult executeDropTable(SQLParser.ParsedSQL sql) { + return SQLResult.error("DROP TABLE not yet implemented"); + } + + /** + * Execute DESCRIBE statement + */ + private SQLResult executeDescribe(SQLParser.ParsedSQL sql) { + return SQLResult.error("DESCRIBE not yet implemented"); + } + + /** + * Execute SHOW TABLES statement + */ + private SQLResult executeShowTables() { + return SQLResult.error("SHOW TABLES not yet implemented"); + } + + /** + * Convert CQL rows to SQL result format + */ + private List> convertRows(List> cqlRows) { + List> sqlRows = new ArrayList<>(); + + for (Map cqlRow : cqlRows) { + Map sqlRow = new LinkedHashMap<>(); + for (Map.Entry entry : cqlRow.entrySet()) { + if (entry.getValue() != null) { + sqlRow.put(entry.getKey(), new String(entry.getValue())); + } else { + sqlRow.put(entry.getKey(), null); + } + } + sqlRows.add(sqlRow); + } + + return sqlRows; + } +} diff --git a/src/main/java/com/cube/sql/SQLParser.java b/src/main/java/com/cube/sql/SQLParser.java new file mode 100644 index 0000000..7a86d71 --- /dev/null +++ b/src/main/java/com/cube/sql/SQLParser.java @@ -0,0 +1,473 @@ +package com.cube.sql; + +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * SQL Parser for Cube database + * + * Supports standard SQL syntax: + * - SELECT * FROM table WHERE id = 'value' + * - INSERT INTO table (col1, col2) VALUES ('val1', 'val2') + * - UPDATE table SET col1='val1' WHERE id='value' + * - DELETE FROM table WHERE id='value' + * - CREATE TABLE table (col1 TYPE, col2 TYPE, ...) + * - DROP TABLE table + */ +public class SQLParser { + + public enum SQLType { + SELECT, + INSERT, + UPDATE, + DELETE, + CREATE_TABLE, + DROP_TABLE, + DESCRIBE, + SHOW_TABLES, + UNKNOWN + } + + public static class ParsedSQL { + private final SQLType type; + private final String table; + private final String keyspace; + private final Map columns; + private final Map whereClause; + private final List selectColumns; + private final Map columnDefinitions; + private final String primaryKey; + + public ParsedSQL(SQLType type, String keyspace, String table) { + this.type = type; + this.keyspace = keyspace; + this.table = table; + this.columns = new LinkedHashMap<>(); + this.whereClause = new LinkedHashMap<>(); + this.selectColumns = new ArrayList<>(); + this.columnDefinitions = new LinkedHashMap<>(); + this.primaryKey = null; + } + + public ParsedSQL(SQLType type, String keyspace, String table, + Map columns, Map whereClause, + List selectColumns, Map columnDefinitions, + String primaryKey) { + this.type = type; + this.keyspace = keyspace; + this.table = table; + this.columns = columns != null ? columns : new LinkedHashMap<>(); + this.whereClause = whereClause != null ? whereClause : new LinkedHashMap<>(); + this.selectColumns = selectColumns != null ? selectColumns : new ArrayList<>(); + this.columnDefinitions = columnDefinitions != null ? columnDefinitions : new LinkedHashMap<>(); + this.primaryKey = primaryKey; + } + + public SQLType getType() { return type; } + public String getTable() { return table; } + public String getKeyspace() { return keyspace; } + public Map getColumns() { return columns; } + public Map getWhereClause() { return whereClause; } + public List getSelectColumns() { return selectColumns; } + public Map getColumnDefinitions() { return columnDefinitions; } + public String getPrimaryKey() { return primaryKey; } + + @Override + public String toString() { + return "ParsedSQL{type=" + type + ", table=" + keyspace + "." + table + "}"; + } + } + + /** + * Parse SQL statement + */ + public static ParsedSQL parse(String sql) throws IllegalArgumentException { + if (sql == null || sql.trim().isEmpty()) { + throw new IllegalArgumentException("SQL statement is empty"); + } + + sql = sql.trim(); + if (sql.endsWith(";")) { + sql = sql.substring(0, sql.length() - 1).trim(); + } + + String upperSQL = sql.toUpperCase(); + + if (upperSQL.startsWith("SELECT")) { + return parseSelect(sql); + } else if (upperSQL.startsWith("INSERT")) { + return parseInsert(sql); + } else if (upperSQL.startsWith("UPDATE")) { + return parseUpdate(sql); + } else if (upperSQL.startsWith("DELETE")) { + return parseDelete(sql); + } else if (upperSQL.startsWith("CREATE TABLE")) { + return parseCreateTable(sql); + } else if (upperSQL.startsWith("DROP TABLE")) { + return parseDropTable(sql); + } else if (upperSQL.startsWith("DESCRIBE") || upperSQL.startsWith("DESC")) { + return parseDescribe(sql); + } else if (upperSQL.startsWith("SHOW TABLES")) { + return parseShowTables(sql); + } else { + throw new IllegalArgumentException("Unsupported SQL statement: " + sql); + } + } + + /** + * Parse SELECT statement + * Examples: + * - SELECT * FROM users.profiles WHERE id = 'user-1' + * - SELECT name, email FROM users.profiles WHERE id = 'user-1' + * - SELECT * FROM profiles WHERE id = 'user-1' (default keyspace) + */ + private static ParsedSQL parseSelect(String sql) { + // Pattern: SELECT columns FROM [keyspace.]table WHERE conditions + Pattern pattern = Pattern.compile( + "SELECT\\s+(.+?)\\s+FROM\\s+([\\w.]+)(?:\\s+WHERE\\s+(.+))?", + Pattern.CASE_INSENSITIVE + ); + + Matcher matcher = pattern.matcher(sql); + if (!matcher.find()) { + throw new IllegalArgumentException("Invalid SELECT syntax: " + sql); + } + + String columnsStr = matcher.group(1).trim(); + String tableRef = matcher.group(2).trim(); + String whereStr = matcher.group(3); + + // Parse table reference + String[] parts = tableRef.split("\\."); + String keyspace = parts.length > 1 ? parts[0] : "default"; + String table = parts.length > 1 ? parts[1] : parts[0]; + + // Parse columns + List selectColumns = new ArrayList<>(); + if ("*".equals(columnsStr)) { + selectColumns.add("*"); + } else { + for (String col : columnsStr.split(",")) { + selectColumns.add(col.trim()); + } + } + + // Parse WHERE clause + Map whereClause = parseWhereClause(whereStr); + + return new ParsedSQL(SQLType.SELECT, keyspace, table, null, whereClause, + selectColumns, null, null); + } + + /** + * Parse INSERT statement + * Example: INSERT INTO users.profiles (id, name, email) VALUES ('user-1', 'Alice', 'alice@example.com') + */ + private static ParsedSQL parseInsert(String sql) { + // Pattern: INSERT INTO [keyspace.]table (columns) VALUES (values) + Pattern pattern = Pattern.compile( + "INSERT\\s+INTO\\s+([\\w.]+)\\s*\\(([^)]+)\\)\\s*VALUES\\s*\\(([^)]+)\\)", + Pattern.CASE_INSENSITIVE + ); + + Matcher matcher = pattern.matcher(sql); + if (!matcher.find()) { + throw new IllegalArgumentException("Invalid INSERT syntax: " + sql); + } + + String tableRef = matcher.group(1).trim(); + String columnsStr = matcher.group(2).trim(); + String valuesStr = matcher.group(3).trim(); + + // Parse table reference + String[] parts = tableRef.split("\\."); + String keyspace = parts.length > 1 ? parts[0] : "default"; + String table = parts.length > 1 ? parts[1] : parts[0]; + + // Parse columns and values + String[] columns = columnsStr.split(","); + String[] values = parseValues(valuesStr); + + if (columns.length != values.length) { + throw new IllegalArgumentException("Column count doesn't match value count"); + } + + Map columnValues = new LinkedHashMap<>(); + for (int i = 0; i < columns.length; i++) { + columnValues.put(columns[i].trim(), values[i]); + } + + return new ParsedSQL(SQLType.INSERT, keyspace, table, columnValues, null, + null, null, null); + } + + /** + * Parse UPDATE statement + * Example: UPDATE users.profiles SET name='Alice Johnson', age='31' WHERE id='user-1' + */ + private static ParsedSQL parseUpdate(String sql) { + // Pattern: UPDATE [keyspace.]table SET assignments WHERE conditions + Pattern pattern = Pattern.compile( + "UPDATE\\s+([\\w.]+)\\s+SET\\s+(.+?)\\s+WHERE\\s+(.+)", + Pattern.CASE_INSENSITIVE + ); + + Matcher matcher = pattern.matcher(sql); + if (!matcher.find()) { + throw new IllegalArgumentException("Invalid UPDATE syntax: " + sql); + } + + String tableRef = matcher.group(1).trim(); + String setStr = matcher.group(2).trim(); + String whereStr = matcher.group(3).trim(); + + // Parse table reference + String[] parts = tableRef.split("\\."); + String keyspace = parts.length > 1 ? parts[0] : "default"; + String table = parts.length > 1 ? parts[1] : parts[0]; + + // Parse SET clause + Map columns = parseSetClause(setStr); + + // Parse WHERE clause + Map whereClause = parseWhereClause(whereStr); + + return new ParsedSQL(SQLType.UPDATE, keyspace, table, columns, whereClause, + null, null, null); + } + + /** + * Parse DELETE statement + * Example: DELETE FROM users.profiles WHERE id='user-1' + */ + private static ParsedSQL parseDelete(String sql) { + // Pattern: DELETE FROM [keyspace.]table WHERE conditions + Pattern pattern = Pattern.compile( + "DELETE\\s+FROM\\s+([\\w.]+)\\s+WHERE\\s+(.+)", + Pattern.CASE_INSENSITIVE + ); + + Matcher matcher = pattern.matcher(sql); + if (!matcher.find()) { + throw new IllegalArgumentException("Invalid DELETE syntax: " + sql); + } + + String tableRef = matcher.group(1).trim(); + String whereStr = matcher.group(2).trim(); + + // Parse table reference + String[] parts = tableRef.split("\\."); + String keyspace = parts.length > 1 ? parts[0] : "default"; + String table = parts.length > 1 ? parts[1] : parts[0]; + + // Parse WHERE clause + Map whereClause = parseWhereClause(whereStr); + + return new ParsedSQL(SQLType.DELETE, keyspace, table, null, whereClause, + null, null, null); + } + + /** + * Parse CREATE TABLE statement + * Example: CREATE TABLE users.profiles (id TEXT PRIMARY KEY, name TEXT, email TEXT) + */ + private static ParsedSQL parseCreateTable(String sql) { + // Pattern: CREATE TABLE [keyspace.]table (column_definitions) + Pattern pattern = Pattern.compile( + "CREATE\\s+TABLE\\s+([\\w.]+)\\s*\\(([^)]+)\\)", + Pattern.CASE_INSENSITIVE + ); + + Matcher matcher = pattern.matcher(sql); + if (!matcher.find()) { + throw new IllegalArgumentException("Invalid CREATE TABLE syntax: " + sql); + } + + String tableRef = matcher.group(1).trim(); + String columnsStr = matcher.group(2).trim(); + + // Parse table reference + String[] parts = tableRef.split("\\."); + String keyspace = parts.length > 1 ? parts[0] : "default"; + String table = parts.length > 1 ? parts[1] : parts[0]; + + // Parse column definitions + Map columnDefs = new LinkedHashMap<>(); + String primaryKey = null; + + String[] columns = columnsStr.split(","); + for (String col : columns) { + col = col.trim(); + + // Check for PRIMARY KEY constraint + if (col.toUpperCase().contains("PRIMARY KEY")) { + String[] colParts = col.split("\\s+"); + String colName = colParts[0]; + String colType = colParts[1]; + columnDefs.put(colName, colType); + primaryKey = colName; + } else { + String[] colParts = col.split("\\s+", 2); + if (colParts.length == 2) { + columnDefs.put(colParts[0], colParts[1]); + } + } + } + + // If no PRIMARY KEY specified, first column is primary key + if (primaryKey == null && !columnDefs.isEmpty()) { + primaryKey = columnDefs.keySet().iterator().next(); + } + + return new ParsedSQL(SQLType.CREATE_TABLE, keyspace, table, null, null, + null, columnDefs, primaryKey); + } + + /** + * Parse DROP TABLE statement + * Example: DROP TABLE users.profiles + */ + private static ParsedSQL parseDropTable(String sql) { + Pattern pattern = Pattern.compile( + "DROP\\s+TABLE\\s+([\\w.]+)", + Pattern.CASE_INSENSITIVE + ); + + Matcher matcher = pattern.matcher(sql); + if (!matcher.find()) { + throw new IllegalArgumentException("Invalid DROP TABLE syntax: " + sql); + } + + String tableRef = matcher.group(1).trim(); + + String[] parts = tableRef.split("\\."); + String keyspace = parts.length > 1 ? parts[0] : "default"; + String table = parts.length > 1 ? parts[1] : parts[0]; + + return new ParsedSQL(SQLType.DROP_TABLE, keyspace, table, null, null, + null, null, null); + } + + /** + * Parse DESCRIBE statement + * Example: DESCRIBE users.profiles + */ + private static ParsedSQL parseDescribe(String sql) { + Pattern pattern = Pattern.compile( + "(?:DESCRIBE|DESC)\\s+([\\w.]+)", + Pattern.CASE_INSENSITIVE + ); + + Matcher matcher = pattern.matcher(sql); + if (!matcher.find()) { + throw new IllegalArgumentException("Invalid DESCRIBE syntax: " + sql); + } + + String tableRef = matcher.group(1).trim(); + + String[] parts = tableRef.split("\\."); + String keyspace = parts.length > 1 ? parts[0] : "default"; + String table = parts.length > 1 ? parts[1] : parts[0]; + + return new ParsedSQL(SQLType.DESCRIBE, keyspace, table, null, null, + null, null, null); + } + + /** + * Parse SHOW TABLES statement + */ + private static ParsedSQL parseShowTables(String sql) { + return new ParsedSQL(SQLType.SHOW_TABLES, "default", null, null, null, + null, null, null); + } + + /** + * Parse WHERE clause + * Example: id='user-1' AND status='active' + */ + private static Map parseWhereClause(String whereStr) { + Map where = new LinkedHashMap<>(); + + if (whereStr == null || whereStr.trim().isEmpty()) { + return where; + } + + // Split by AND (simple implementation) + String[] conditions = whereStr.split("\\s+AND\\s+", -1); + + for (String condition : conditions) { + condition = condition.trim(); + + // Parse: column = 'value' or column='value' + Pattern pattern = Pattern.compile("([\\w]+)\\s*=\\s*['\"]?([^'\"]+)['\"]?"); + Matcher matcher = pattern.matcher(condition); + + if (matcher.find()) { + where.put(matcher.group(1).trim(), matcher.group(2).trim()); + } + } + + return where; + } + + /** + * Parse SET clause + * Example: name='Alice', age='31' + */ + private static Map parseSetClause(String setStr) { + Map sets = new LinkedHashMap<>(); + + String[] assignments = setStr.split(","); + for (String assignment : assignments) { + assignment = assignment.trim(); + + Pattern pattern = Pattern.compile("([\\w]+)\\s*=\\s*['\"]?([^'\"]+)['\"]?"); + Matcher matcher = pattern.matcher(assignment); + + if (matcher.find()) { + sets.put(matcher.group(1).trim(), matcher.group(2).trim()); + } + } + + return sets; + } + + /** + * Parse VALUES clause, handling quoted strings + */ + private static String[] parseValues(String valuesStr) { + List values = new ArrayList<>(); + + StringBuilder current = new StringBuilder(); + boolean inQuotes = false; + char quoteChar = '\0'; + + for (int i = 0; i < valuesStr.length(); i++) { + char c = valuesStr.charAt(i); + + if ((c == '\'' || c == '"') && (i == 0 || valuesStr.charAt(i - 1) != '\\')) { + if (!inQuotes) { + inQuotes = true; + quoteChar = c; + } else if (c == quoteChar) { + inQuotes = false; + quoteChar = '\0'; + } else { + current.append(c); + } + } else if (c == ',' && !inQuotes) { + values.add(current.toString().trim()); + current = new StringBuilder(); + } else if (!Character.isWhitespace(c) || inQuotes) { + current.append(c); + } + } + + if (current.length() > 0) { + values.add(current.toString().trim()); + } + + return values.toArray(new String[0]); + } +} diff --git a/src/main/java/com/cube/storage/LSMStorageEngine.java b/src/main/java/com/cube/storage/LSMStorageEngine.java new file mode 100644 index 0000000..f225988 --- /dev/null +++ b/src/main/java/com/cube/storage/LSMStorageEngine.java @@ -0,0 +1,395 @@ +package com.cube.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.file.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * LSM-Tree storage engine for Cube database - 100% Pure Java! + */ +public class LSMStorageEngine implements StorageEngine { + + private static final Logger logger = LoggerFactory.getLogger(LSMStorageEngine.class); + private static final int MEMTABLE_FLUSH_THRESHOLD = 1024 * 1024; // 1MB + + private final Path dataDirectory; + private final Path walPath; + + private volatile MemTable activeMemtable; + private final Queue immutableMemtables; + private final ReadWriteLock memtableLock; + + private final List sstables; + private final ReadWriteLock sstableLock; + + private final ExecutorService flushExecutor; + private final ExecutorService compactionExecutor; + + private WriteAheadLog wal; + + public LSMStorageEngine(String dataDir) throws IOException { + this.dataDirectory = Paths.get(dataDir); + this.walPath = dataDirectory.resolve("wal"); + + Files.createDirectories(dataDirectory); + Files.createDirectories(walPath); + + this.activeMemtable = new MemTable(); + this.immutableMemtables = new ConcurrentLinkedQueue<>(); + this.memtableLock = new ReentrantReadWriteLock(); + + this.sstables = new CopyOnWriteArrayList<>(); + this.sstableLock = new ReentrantReadWriteLock(); + + this.wal = new WriteAheadLog(walPath.resolve("wal.log")); + + this.flushExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "CubeDB-Flush"); + t.setDaemon(true); + return t; + }); + + this.compactionExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "CubeDB-Compaction"); + t.setDaemon(true); + return t; + }); + + recoverFromWAL(); + loadSSTables(); + + logger.info("Cube database initialized at {}", dataDirectory); + } + + @Override + public void put(String key, byte[] value) throws IOException { + if (key == null || value == null) { + throw new IllegalArgumentException("Key and value cannot be null"); + } + + memtableLock.readLock().lock(); + try { + wal.append(new WriteAheadLog.LogEntry( + WriteAheadLog.OperationType.PUT, key, value)); + + activeMemtable.put(key, value); + + if (activeMemtable.size() >= MEMTABLE_FLUSH_THRESHOLD) { + rotateMemtable(); + } + } finally { + memtableLock.readLock().unlock(); + } + } + + @Override + public byte[] get(String key) throws IOException { + if (key == null) { + throw new IllegalArgumentException("Key cannot be null"); + } + + memtableLock.readLock().lock(); + try { + byte[] value = activeMemtable.get(key); + if (value != null) { + return value; + } + } finally { + memtableLock.readLock().unlock(); + } + + for (MemTable memtable : immutableMemtables) { + byte[] value = memtable.get(key); + if (value != null) { + return value; + } + } + + sstableLock.readLock().lock(); + try { + for (int i = sstables.size() - 1; i >= 0; i--) { + byte[] value = sstables.get(i).get(key); + if (value != null) { + return value; + } + } + } finally { + sstableLock.readLock().unlock(); + } + + return null; + } + + @Override + public boolean delete(String key) throws IOException { + put(key, WriteAheadLog.TOMBSTONE); + return true; + } + + @Override + public Iterator scan(String prefix) throws IOException { + Set keys = new TreeSet<>(); + + memtableLock.readLock().lock(); + try { + keys.addAll(activeMemtable.scan(prefix)); + } finally { + memtableLock.readLock().unlock(); + } + + for (MemTable memtable : immutableMemtables) { + keys.addAll(memtable.scan(prefix)); + } + + sstableLock.readLock().lock(); + try { + for (SSTable sstable : sstables) { + keys.addAll(sstable.scan(prefix)); + } + } finally { + sstableLock.readLock().unlock(); + } + + return keys.iterator(); + } + + @Override + public Iterator> scanEntries(String prefix) throws IOException { + Map entries = new TreeMap<>(); + + sstableLock.readLock().lock(); + try { + for (SSTable sstable : sstables) { + entries.putAll(sstable.scanEntries(prefix)); + } + } finally { + sstableLock.readLock().unlock(); + } + + for (MemTable memtable : immutableMemtables) { + entries.putAll(memtable.scanEntries(prefix)); + } + + memtableLock.readLock().lock(); + try { + entries.putAll(activeMemtable.scanEntries(prefix)); + } finally { + memtableLock.readLock().unlock(); + } + + entries.entrySet().removeIf(e -> + Arrays.equals(e.getValue(), WriteAheadLog.TOMBSTONE)); + + return entries.entrySet().iterator(); + } + + @Override + public void flush() throws IOException { + memtableLock.writeLock().lock(); + try { + if (!activeMemtable.isEmpty()) { + rotateMemtable(); + } + } finally { + memtableLock.writeLock().unlock(); + } + + flushAllImmutableMemtables(); + } + + @Override + public void compact() throws IOException { + performCompaction(); + } + + @Override + public StorageStats getStats() { + long memtableSize = activeMemtable.size(); + for (MemTable mt : immutableMemtables) { + memtableSize += mt.size(); + } + + sstableLock.readLock().lock(); + try { + long totalSize = 0; + long totalKeys = 0; + + for (SSTable sst : sstables) { + totalSize += sst.getSize(); + totalKeys += sst.getKeyCount(); + } + + return new StorageStats(totalKeys, totalSize, memtableSize, sstables.size()); + } finally { + sstableLock.readLock().unlock(); + } + } + + @Override + public void close() throws IOException { + logger.info("Closing Cube database..."); + + flush(); + + if (wal != null) { + wal.close(); + } + + sstableLock.writeLock().lock(); + try { + for (SSTable sstable : sstables) { + sstable.close(); + } + } finally { + sstableLock.writeLock().unlock(); + } + + flushExecutor.shutdown(); + compactionExecutor.shutdown(); + + logger.info("Cube database closed"); + } + + private void rotateMemtable() { + memtableLock.writeLock().lock(); + try { + immutableMemtables.add(activeMemtable); + activeMemtable = new MemTable(); + + try { + wal.rotate(); + } catch (IOException e) { + logger.error("Failed to rotate WAL", e); + } + + flushExecutor.submit(this::flushOneImmutableMemtable); + + } finally { + memtableLock.writeLock().unlock(); + } + } + + private void flushOneImmutableMemtable() { + MemTable memtable = immutableMemtables.poll(); + if (memtable == null) { + return; + } + + try { + String sstableFile = "sstable-" + System.currentTimeMillis() + ".db"; + Path sstablePath = dataDirectory.resolve(sstableFile); + + SSTable sstable = SSTable.create(sstablePath, memtable.getEntries()); + + sstableLock.writeLock().lock(); + try { + sstables.add(sstable); + } finally { + sstableLock.writeLock().unlock(); + } + + logger.info("Flushed memtable to {} ({} keys)", sstableFile, memtable.getKeyCount()); + + } catch (IOException e) { + logger.error("Failed to flush memtable", e); + immutableMemtables.add(memtable); + } + } + + private void flushAllImmutableMemtables() { + while (!immutableMemtables.isEmpty()) { + flushOneImmutableMemtable(); + } + + // Give executor a moment to finish any pending work + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private void recoverFromWAL() throws IOException { + List entries = wal.replay(); + + for (WriteAheadLog.LogEntry entry : entries) { + if (entry.getType() == WriteAheadLog.OperationType.PUT) { + activeMemtable.put(entry.getKey(), entry.getValue()); + } + } + + if (!entries.isEmpty()) { + logger.info("Recovered {} entries from WAL", entries.size()); + } + } + + private void loadSSTables() throws IOException { + try (DirectoryStream stream = Files.newDirectoryStream( + dataDirectory, "sstable-*.db")) { + + for (Path path : stream) { + try { + SSTable sstable = SSTable.open(path); + sstables.add(sstable); + } catch (IOException e) { + logger.error("Failed to load SSTable " + path, e); + } + } + } + + sstables.sort(Comparator.comparing(SSTable::getCreationTime)); + + if (!sstables.isEmpty()) { + logger.info("Loaded {} SSTables", sstables.size()); + } + } + + private void performCompaction() { + sstableLock.writeLock().lock(); + try { + if (sstables.size() < 2) { + return; + } + + logger.info("Starting compaction of {} SSTables...", sstables.size()); + + Map merged = new TreeMap<>(); + + for (SSTable sstable : sstables) { + merged.putAll(sstable.getAll()); + } + + merged.entrySet().removeIf(e -> + Arrays.equals(e.getValue(), WriteAheadLog.TOMBSTONE)); + + String compactedFile = "sstable-compacted-" + System.currentTimeMillis() + ".db"; + Path compactedPath = dataDirectory.resolve(compactedFile); + + SSTable compacted = SSTable.create(compactedPath, merged); + + for (SSTable old : sstables) { + old.delete(); + } + + sstables.clear(); + sstables.add(compacted); + + logger.info("Compaction complete: {} keys in {}", merged.size(), compactedFile); + + } catch (IOException e) { + logger.error("Compaction failed", e); + } finally { + sstableLock.writeLock().unlock(); + } + } + + public Path getDataDirectory() { + return dataDirectory; + } +} diff --git a/src/main/java/com/cube/storage/MemTable.java b/src/main/java/com/cube/storage/MemTable.java new file mode 100644 index 0000000..2953487 --- /dev/null +++ b/src/main/java/com/cube/storage/MemTable.java @@ -0,0 +1,97 @@ +package com.cube.storage; + +import java.util.*; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * In-memory sorted table for recent writes. + */ +public class MemTable { + + private final ConcurrentSkipListMap data; + private volatile long size; + + public MemTable() { + this.data = new ConcurrentSkipListMap<>(); + this.size = 0; + } + + public void put(String key, byte[] value) { + byte[] oldValue = data.put(key, value); + + if (oldValue != null) { + size -= estimateEntrySize(key, oldValue); + } + size += estimateEntrySize(key, value); + } + + public byte[] get(String key) { + return data.get(key); + } + + public boolean contains(String key) { + return data.containsKey(key); + } + + public Collection scan(String prefix) { + if (prefix == null || prefix.isEmpty()) { + return new ArrayList<>(data.keySet()); + } + + List result = new ArrayList<>(); + String startKey = prefix; + String endKey = prefix + Character.MAX_VALUE; + + for (String key : data.subMap(startKey, endKey).keySet()) { + result.add(key); + } + + return result; + } + + public Map scanEntries(String prefix) { + if (prefix == null || prefix.isEmpty()) { + return new TreeMap<>(data); + } + + Map result = new TreeMap<>(); + String startKey = prefix; + String endKey = prefix + Character.MAX_VALUE; + + for (Map.Entry entry : data.subMap(startKey, endKey).entrySet()) { + result.put(entry.getKey(), entry.getValue()); + } + + return result; + } + + public Map getEntries() { + return new TreeMap<>(data); + } + + public long size() { + return size; + } + + public int getKeyCount() { + return data.size(); + } + + public boolean isEmpty() { + return data.isEmpty(); + } + + public void clear() { + data.clear(); + size = 0; + } + + private long estimateEntrySize(String key, byte[] value) { + return (key.length() * 2L) + value.length + 32; + } + + @Override + public String toString() { + return "MemTable{keys=" + data.size() + ", size=" + size + " bytes}"; + } +} diff --git a/src/main/java/com/cube/storage/SSTable.java b/src/main/java/com/cube/storage/SSTable.java new file mode 100644 index 0000000..4fda4cd --- /dev/null +++ b/src/main/java/com/cube/storage/SSTable.java @@ -0,0 +1,176 @@ +package com.cube.storage; + +import java.io.*; +import java.nio.file.*; +import java.util.*; + +/** + * SSTable (Sorted String Table) for Cube database. + */ +public class SSTable implements AutoCloseable { + + private static final int MAGIC_NUMBER = 0x43554245; // "CUBE" + private static final int VERSION = 1; + + private final Path filePath; + private final long creationTime; + private final long size; + private final int keyCount; + + private SSTable(Path filePath, long creationTime, long size, int keyCount) { + this.filePath = filePath; + this.creationTime = creationTime; + this.size = size; + this.keyCount = keyCount; + } + + public static SSTable create(Path filePath, Map entries) throws IOException { + long creationTime = System.currentTimeMillis(); + + try (DataOutputStream dos = new DataOutputStream( + new BufferedOutputStream(Files.newOutputStream(filePath)))) { + + dos.writeInt(MAGIC_NUMBER); + dos.writeInt(VERSION); + dos.writeInt(entries.size()); + dos.writeLong(creationTime); + + for (Map.Entry entry : entries.entrySet()) { + byte[] keyBytes = entry.getKey().getBytes("UTF-8"); + dos.writeInt(keyBytes.length); + dos.write(keyBytes); + + dos.writeInt(entry.getValue().length); + dos.write(entry.getValue()); + } + + dos.flush(); + } + + long size = Files.size(filePath); + return new SSTable(filePath, creationTime, size, entries.size()); + } + + public static SSTable open(Path filePath) throws IOException { + try (DataInputStream dis = new DataInputStream( + new BufferedInputStream(Files.newInputStream(filePath)))) { + + int magic = dis.readInt(); + if (magic != MAGIC_NUMBER) { + throw new IOException("Invalid SSTable file"); + } + + int version = dis.readInt(); + int keyCount = dis.readInt(); + long creationTime = dis.readLong(); + + long fileSize = Files.size(filePath); + return new SSTable(filePath, creationTime, fileSize, keyCount); + } + } + + public byte[] get(String key) throws IOException { + try (RandomAccessFile raf = new RandomAccessFile(filePath.toFile(), "r")) { + raf.seek(4 + 4 + 4 + 8); + + int keysRead = 0; + while (keysRead < keyCount && raf.getFilePointer() < raf.length()) { + int keyLen = raf.readInt(); + byte[] keyBytes = new byte[keyLen]; + raf.readFully(keyBytes); + String currentKey = new String(keyBytes, "UTF-8"); + + int valueLen = raf.readInt(); + + if (currentKey.equals(key)) { + byte[] value = new byte[valueLen]; + raf.readFully(value); + return value; + } else { + raf.skipBytes(valueLen); + } + + keysRead++; + } + } + + return null; + } + + public Collection scan(String prefix) throws IOException { + List result = new ArrayList<>(); + + try (RandomAccessFile raf = new RandomAccessFile(filePath.toFile(), "r")) { + raf.seek(4 + 4 + 4 + 8); + + int keysRead = 0; + while (keysRead < keyCount && raf.getFilePointer() < raf.length()) { + int keyLen = raf.readInt(); + byte[] keyBytes = new byte[keyLen]; + raf.readFully(keyBytes); + String currentKey = new String(keyBytes, "UTF-8"); + + int valueLen = raf.readInt(); + raf.skipBytes(valueLen); + + if (currentKey.startsWith(prefix)) { + result.add(currentKey); + } + + keysRead++; + } + } + + return result; + } + + public Map scanEntries(String prefix) throws IOException { + Map result = new TreeMap<>(); + + try (RandomAccessFile raf = new RandomAccessFile(filePath.toFile(), "r")) { + raf.seek(4 + 4 + 4 + 8); + + int keysRead = 0; + while (keysRead < keyCount && raf.getFilePointer() < raf.length()) { + int keyLen = raf.readInt(); + byte[] keyBytes = new byte[keyLen]; + raf.readFully(keyBytes); + String currentKey = new String(keyBytes, "UTF-8"); + + int valueLen = raf.readInt(); + byte[] value = new byte[valueLen]; + raf.readFully(value); + + if (currentKey.startsWith(prefix)) { + result.put(currentKey, value); + } + + keysRead++; + } + } + + return result; + } + + public Map getAll() throws IOException { + return scanEntries(""); + } + + public void delete() throws IOException { + Files.deleteIfExists(filePath); + } + + public Path getFilePath() { return filePath; } + public long getCreationTime() { return creationTime; } + public long getSize() { return size; } + public int getKeyCount() { return keyCount; } + + @Override + public void close() throws IOException { + } + + @Override + public String toString() { + return "SSTable{file=" + filePath.getFileName() + ", keys=" + keyCount + ", size=" + size + "}"; + } +} diff --git a/src/main/java/com/cube/storage/StorageEngine.java b/src/main/java/com/cube/storage/StorageEngine.java new file mode 100644 index 0000000..dc67717 --- /dev/null +++ b/src/main/java/com/cube/storage/StorageEngine.java @@ -0,0 +1,58 @@ +package com.cube.storage; + +import java.io.IOException; +import java.util.*; + +/** + * Storage engine interface for Cube database. + * Pure Java implementation with no native dependencies. + */ +public interface StorageEngine { + + void put(String key, byte[] value) throws IOException; + + byte[] get(String key) throws IOException; + + boolean delete(String key) throws IOException; + + Iterator scan(String prefix) throws IOException; + + Iterator> scanEntries(String prefix) throws IOException; + + void flush() throws IOException; + + void compact() throws IOException; + + StorageStats getStats(); + + void close() throws IOException; + + class StorageStats { + private final long totalKeys; + private final long totalSize; + private final long memtableSize; + private final long sstableCount; + + public StorageStats(long totalKeys, long totalSize, long memtableSize, long sstableCount) { + this.totalKeys = totalKeys; + this.totalSize = totalSize; + this.memtableSize = memtableSize; + this.sstableCount = sstableCount; + } + + public long getTotalKeys() { return totalKeys; } + public long getTotalSize() { return totalSize; } + public long getMemtableSize() { return memtableSize; } + public long getSstableCount() { return sstableCount; } + + @Override + public String toString() { + return "StorageStats{" + + "keys=" + totalKeys + + ", size=" + totalSize + + ", memtable=" + memtableSize + + ", sstables=" + sstableCount + + '}'; + } + } +} diff --git a/src/main/java/com/cube/storage/WriteAheadLog.java b/src/main/java/com/cube/storage/WriteAheadLog.java new file mode 100644 index 0000000..6ae8397 --- /dev/null +++ b/src/main/java/com/cube/storage/WriteAheadLog.java @@ -0,0 +1,161 @@ +package com.cube.storage; + +import java.io.*; +import java.nio.file.*; +import java.util.*; + +/** + * Write-Ahead Log (WAL) for durability in Cube database. + */ +public class WriteAheadLog implements AutoCloseable { + + private static final int MAGIC_NUMBER = 0x43554245; // "CUBE" + private static final int VERSION = 1; + public static final byte[] TOMBSTONE = new byte[]{0x00}; + + private final Path logPath; + private DataOutputStream output; + private long entriesWritten; + + public enum OperationType { + PUT(1), + DELETE(2); + + private final int code; + + OperationType(int code) { + this.code = code; + } + + public int getCode() { return code; } + + public static OperationType fromCode(int code) { + for (OperationType type : values()) { + if (type.code == code) { + return type; + } + } + throw new IllegalArgumentException("Unknown operation type: " + code); + } + } + + public static class LogEntry { + private final OperationType type; + private final String key; + private final byte[] value; + + public LogEntry(OperationType type, String key, byte[] value) { + this.type = type; + this.key = key; + this.value = value; + } + + public OperationType getType() { return type; } + public String getKey() { return key; } + public byte[] getValue() { return value; } + } + + public WriteAheadLog(Path logPath) throws IOException { + this.logPath = logPath; + this.entriesWritten = 0; + + boolean exists = Files.exists(logPath); + this.output = new DataOutputStream( + new BufferedOutputStream(Files.newOutputStream(logPath, + StandardOpenOption.CREATE, StandardOpenOption.APPEND))); + + if (!exists) { + output.writeInt(MAGIC_NUMBER); + output.writeInt(VERSION); + output.flush(); + } + } + + public synchronized void append(LogEntry entry) throws IOException { + output.writeInt(entry.type.getCode()); + + byte[] keyBytes = entry.key.getBytes("UTF-8"); + output.writeInt(keyBytes.length); + output.write(keyBytes); + + output.writeInt(entry.value.length); + output.write(entry.value); + + output.flush(); + entriesWritten++; + } + + public List replay() throws IOException { + List entries = new ArrayList<>(); + + if (!Files.exists(logPath) || Files.size(logPath) == 0) { + return entries; + } + + try (DataInputStream input = new DataInputStream( + new BufferedInputStream(Files.newInputStream(logPath)))) { + + int magic = input.readInt(); + if (magic != MAGIC_NUMBER) { + throw new IOException("Invalid WAL file"); + } + + int version = input.readInt(); + + while (input.available() > 0) { + try { + int typeCode = input.readInt(); + OperationType type = OperationType.fromCode(typeCode); + + int keyLen = input.readInt(); + byte[] keyBytes = new byte[keyLen]; + input.readFully(keyBytes); + String key = new String(keyBytes, "UTF-8"); + + int valueLen = input.readInt(); + byte[] value = new byte[valueLen]; + input.readFully(value); + + entries.add(new LogEntry(type, key, value)); + + } catch (EOFException e) { + break; + } + } + } + + return entries; + } + + public synchronized void rotate() throws IOException { + close(); + + if (Files.exists(logPath)) { + Path archivePath = logPath.getParent().resolve( + "wal-" + System.currentTimeMillis() + ".archived"); + Files.move(logPath, archivePath); + } + + this.output = new DataOutputStream( + new BufferedOutputStream(Files.newOutputStream(logPath, + StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))); + + output.writeInt(MAGIC_NUMBER); + output.writeInt(VERSION); + output.flush(); + + entriesWritten = 0; + } + + public long getEntriesWritten() { + return entriesWritten; + } + + @Override + public synchronized void close() throws IOException { + if (output != null) { + output.flush(); + output.close(); + } + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties new file mode 100644 index 0000000..71de3f7 --- /dev/null +++ b/src/main/resources/application.properties @@ -0,0 +1,9 @@ +server.port=8080 +spring.application.name=cube-db + +# Logging +logging.level.com.cube=INFO +logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss} - %msg%n + +# Cube DB Configuration +cube.datadir=/tmp/cube-data diff --git a/src/test/java/com/cube/gossip/GossipProtocolTest.java b/src/test/java/com/cube/gossip/GossipProtocolTest.java new file mode 100644 index 0000000..9aabe9a --- /dev/null +++ b/src/test/java/com/cube/gossip/GossipProtocolTest.java @@ -0,0 +1,203 @@ +package com.cube.gossip; + +import org.junit.jupiter.api.*; +import static org.junit.jupiter.api.Assertions.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Tests for Gossip Protocol + */ +public class GossipProtocolTest { + + private GossipProtocol gossip1; + private GossipProtocol gossip2; + private GossipProtocol gossip3; + + @BeforeEach + public void setup() { + GossipProtocol.GossipConfig config = GossipProtocol.GossipConfig.defaultConfig(); + + gossip1 = new GossipProtocol("node-1", "localhost", 8080, config); + gossip2 = new GossipProtocol("node-2", "localhost", 8081, + new GossipProtocol.GossipConfig(1000, 3, 5000, 15000, 3, 7947)); + gossip3 = new GossipProtocol("node-3", "localhost", 8082, + new GossipProtocol.GossipConfig(1000, 3, 5000, 15000, 3, 7948)); + } + + @AfterEach + public void teardown() { + if (gossip1 != null) gossip1.shutdown(); + if (gossip2 != null) gossip2.shutdown(); + if (gossip3 != null) gossip3.shutdown(); + } + + @Test + public void testSingleNodeStartup() { + gossip1.start(); + + // Should have local node + Map state = gossip1.getClusterState(); + assertEquals(1, state.size()); + assertTrue(state.containsKey("node-1")); + + GossipProtocol.NodeState localNode = state.get("node-1"); + assertEquals(GossipProtocol.NodeState.Status.ALIVE, localNode.getStatus()); + } + + @Test + public void testTwoNodeCluster() throws Exception { + gossip1.start(); + gossip2.start(); + + // Node 2 joins via node 1 + gossip2.join(Arrays.asList("localhost:7946")); + + // Wait for gossip to propagate + Thread.sleep(3000); + + // Both nodes should see each other + Map state1 = gossip1.getClusterState(); + Map state2 = gossip2.getClusterState(); + + assertEquals(2, state1.size()); + assertEquals(2, state2.size()); + + assertTrue(state1.containsKey("node-1")); + assertTrue(state1.containsKey("node-2")); + assertTrue(state2.containsKey("node-1")); + assertTrue(state2.containsKey("node-2")); + } + + @Test + public void testThreeNodeCluster() throws Exception { + gossip1.start(); + gossip2.start(); + gossip3.start(); + + // Nodes join + gossip2.join(Arrays.asList("localhost:7946")); + Thread.sleep(1000); + gossip3.join(Arrays.asList("localhost:7946", "localhost:7947")); + + // Wait for convergence + Thread.sleep(5000); + + // All nodes should see all nodes + assertEquals(3, gossip1.getClusterState().size()); + assertEquals(3, gossip2.getClusterState().size()); + assertEquals(3, gossip3.getClusterState().size()); + } + + @Test + public void testAliveNodes() throws Exception { + gossip1.start(); + gossip2.start(); + + gossip2.join(Arrays.asList("localhost:7946")); + Thread.sleep(3000); + + List aliveNodes1 = gossip1.getAliveNodes(); + List aliveNodes2 = gossip2.getAliveNodes(); + + assertEquals(2, aliveNodes1.size()); + assertEquals(2, aliveNodes2.size()); + } + + @Test + public void testEventListener() throws Exception { + CountDownLatch joinLatch = new CountDownLatch(1); + CountDownLatch aliveLatch = new CountDownLatch(1); + + gossip1.addListener(new GossipProtocol.GossipListener() { + @Override + public void onNodeJoined(GossipProtocol.NodeState node) { + if (node.getNodeId().equals("node-2")) { + joinLatch.countDown(); + } + } + + @Override + public void onNodeLeft(GossipProtocol.NodeState node) {} + + @Override + public void onNodeSuspected(GossipProtocol.NodeState node) {} + + @Override + public void onNodeAlive(GossipProtocol.NodeState node) { + if (node.getNodeId().equals("node-2")) { + aliveLatch.countDown(); + } + } + + @Override + public void onNodeDead(GossipProtocol.NodeState node) {} + }); + + gossip1.start(); + gossip2.start(); + gossip2.join(Arrays.asList("localhost:7946")); + + // Wait for join event + assertTrue(joinLatch.await(5, TimeUnit.SECONDS)); + assertTrue(aliveLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testStatistics() throws Exception { + gossip1.start(); + gossip2.start(); + gossip3.start(); + + gossip2.join(Arrays.asList("localhost:7946")); + gossip3.join(Arrays.asList("localhost:7946")); + Thread.sleep(3000); + + Map stats = gossip1.getStatistics(); + + assertEquals("node-1", stats.get("localNodeId")); + assertEquals(3, stats.get("totalNodes")); + assertEquals(3L, stats.get("aliveNodes")); + assertEquals(0L, stats.get("suspectedNodes")); + assertEquals(0L, stats.get("deadNodes")); + } + + @Test + public void testGracefulLeave() throws Exception { + gossip1.start(); + gossip2.start(); + + gossip2.join(Arrays.asList("localhost:7946")); + Thread.sleep(3000); + + assertEquals(2, gossip1.getClusterState().size()); + + // Node 2 leaves gracefully + gossip2.leave(); + gossip2.shutdown(); + + // Wait for propagation + Thread.sleep(3000); + + // Node 1 should still see node 2 but as leaving/dead + GossipProtocol.NodeState node2State = gossip1.getClusterState().get("node-2"); + assertNotNull(node2State); + assertTrue(node2State.getStatus() == GossipProtocol.NodeState.Status.LEAVING || + node2State.getStatus() == GossipProtocol.NodeState.Status.DEAD); + } + + @Test + public void testHeartbeatIncrement() throws Exception { + gossip1.start(); + + GossipProtocol.NodeState localNode = gossip1.getClusterState().get("node-1"); + long initialHeartbeat = localNode.getHeartbeatCounter(); + + // Wait for a few gossip rounds + Thread.sleep(5000); + + long newHeartbeat = localNode.getHeartbeatCounter(); + assertTrue(newHeartbeat > initialHeartbeat); + } +} diff --git a/src/test/java/com/cube/index/CubicIndexTest.java b/src/test/java/com/cube/index/CubicIndexTest.java new file mode 100644 index 0000000..837487d --- /dev/null +++ b/src/test/java/com/cube/index/CubicIndexTest.java @@ -0,0 +1,277 @@ +package com.cube.index; + +import org.junit.jupiter.api.*; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for Cubic Index System + */ +public class CubicIndexTest { + + @Test + public void testCubicIndexCalculation() { + // Test cubic index formula: N³ × 6 + assertEquals(6, CubicIndexNode.calculateCubicIndex(1)); // 1³×6 = 6 + assertEquals(48, CubicIndexNode.calculateCubicIndex(2)); // 2³×6 = 48 + assertEquals(162, CubicIndexNode.calculateCubicIndex(3)); // 3³×6 = 162 + assertEquals(384, CubicIndexNode.calculateCubicIndex(4)); // 4³×6 = 384 + assertEquals(750, CubicIndexNode.calculateCubicIndex(5)); // 5³×6 = 750 + assertEquals(1296, CubicIndexNode.calculateCubicIndex(6)); // 6³×6 = 1296 + } + + @Test + public void testLevelCalculation() { + // Test calculating level from index value + assertEquals(1, CubicIndexNode.calculateLevel(6)); + assertEquals(2, CubicIndexNode.calculateLevel(48)); + assertEquals(3, CubicIndexNode.calculateLevel(162)); + assertEquals(4, CubicIndexNode.calculateLevel(384)); + assertEquals(5, CubicIndexNode.calculateLevel(750)); + } + + @Test + public void testSideDetermination() { + // Test that keys are consistently mapped to sides + String key1 = "test-key-1"; + CubicIndexNode.Side side1 = CubicIndexNode.determineSide(key1); + + assertNotNull(side1); + + // Same key should always map to same side + assertEquals(side1, CubicIndexNode.determineSide(key1)); + + // Test all possible sides + Set seenSides = new HashSet<>(); + for (int i = 0; i < 100; i++) { + CubicIndexNode.Side side = CubicIndexNode.determineSide("key-" + i); + seenSides.add(side); + } + + // With 100 keys, we should see multiple sides + assertTrue(seenSides.size() > 1, "Keys should distribute across multiple sides"); + } + + @Test + public void testCubicNodeCreation() { + CubicIndexNode node = new CubicIndexNode(3); + + assertEquals(3, node.getLevel()); + assertEquals(162, node.getIndexValue()); + assertEquals(0, node.getTotalSize()); + } + + @Test + public void testCubicNodePutAndGet() { + CubicIndexNode node = new CubicIndexNode(2); + + // Put data + node.put("key1", "value1".getBytes()); + node.put("key2", "value2".getBytes()); + node.put("key3", "value3".getBytes()); + + // Get data + assertArrayEquals("value1".getBytes(), node.get("key1")); + assertArrayEquals("value2".getBytes(), node.get("key2")); + assertArrayEquals("value3".getBytes(), node.get("key3")); + + // Total size + assertEquals(3, node.getTotalSize()); + } + + @Test + public void testCubicNodeSideDistribution() { + CubicIndexNode node = new CubicIndexNode(3); + + // Add many keys + for (int i = 0; i < 60; i++) { + node.put("key-" + i, ("value-" + i).getBytes()); + } + + assertEquals(60, node.getTotalSize()); + + // Check that keys are distributed across sides + Map stats = node.getStats(); + @SuppressWarnings("unchecked") + Map sideDistribution = (Map) stats.get("sideDistribution"); + + assertNotNull(sideDistribution); + assertEquals(6, sideDistribution.size()); + + // At least some sides should have keys + long nonEmptySides = sideDistribution.values().stream().filter(count -> count > 0).count(); + assertTrue(nonEmptySides > 1, "Keys should be distributed across multiple sides"); + } + + @Test + public void testCubicIndexTree() { + CubicIndexTree tree = new CubicIndexTree(3, 10, true); + + // Put data + tree.put("user:1", "Alice".getBytes()); + tree.put("user:2", "Bob".getBytes()); + tree.put("user:3", "Charlie".getBytes()); + tree.put("product:1", "Laptop".getBytes()); + tree.put("product:2", "Mouse".getBytes()); + + // Get data + assertArrayEquals("Alice".getBytes(), tree.get("user:1")); + assertArrayEquals("Bob".getBytes(), tree.get("user:2")); + assertArrayEquals("Laptop".getBytes(), tree.get("product:1")); + + // Total size + assertEquals(5, tree.getTotalSize()); + } + + @Test + public void testPrefixSearch() { + CubicIndexTree tree = new CubicIndexTree(3, 10, true); + + // Add data + tree.put("user:1:name", "Alice".getBytes()); + tree.put("user:1:email", "alice@example.com".getBytes()); + tree.put("user:2:name", "Bob".getBytes()); + tree.put("user:2:email", "bob@example.com".getBytes()); + tree.put("product:1", "Laptop".getBytes()); + + // Search by prefix + List userKeys = tree.searchPrefix("user:"); + assertEquals(4, userKeys.size()); + assertTrue(userKeys.contains("user:1:name")); + assertTrue(userKeys.contains("user:1:email")); + + List user1Keys = tree.searchPrefix("user:1"); + assertEquals(2, user1Keys.size()); + + List productKeys = tree.searchPrefix("product:"); + assertEquals(1, productKeys.size()); + } + + @Test + public void testRangeSearch() { + CubicIndexTree tree = new CubicIndexTree(3, 10, true); + + // Add sequential keys + for (int i = 0; i < 20; i++) { + tree.put(String.format("key-%03d", i), ("value-" + i).getBytes()); + } + + // Range search + List range = tree.searchRange("key-005", "key-010"); + + assertTrue(range.size() >= 6); // At least 005-010 + assertTrue(range.contains("key-005")); + assertTrue(range.contains("key-010")); + } + + @Test + public void testAutoExpansion() { + CubicIndexTree tree = new CubicIndexTree(2, 10, true); + + assertEquals(2, tree.getLevelCount()); + + // Add enough data to potentially trigger expansion + for (int i = 0; i < 100; i++) { + tree.put("key-" + i, ("value-" + i).getBytes()); + } + + // Tree should maintain or expand levels + assertTrue(tree.getLevelCount() >= 2); + } + + @Test + public void testRebalance() { + CubicIndexTree tree = new CubicIndexTree(3, 10, true); + + // Add data + for (int i = 0; i < 50; i++) { + tree.put("key-" + i, ("value-" + i).getBytes()); + } + + int beforeSize = tree.getTotalSize(); + + // Rebalance + tree.rebalance(); + + // Size should remain the same + assertEquals(beforeSize, tree.getTotalSize()); + + // Data should still be accessible + assertArrayEquals("value-0".getBytes(), tree.get("key-0")); + assertArrayEquals("value-25".getBytes(), tree.get("key-25")); + } + + @Test + public void testRemove() { + CubicIndexTree tree = new CubicIndexTree(3, 10, true); + + tree.put("key1", "value1".getBytes()); + tree.put("key2", "value2".getBytes()); + + assertEquals(2, tree.getTotalSize()); + + // Remove + assertTrue(tree.remove("key1")); + + assertEquals(1, tree.getTotalSize()); + assertNull(tree.get("key1")); + assertArrayEquals("value2".getBytes(), tree.get("key2")); + + // Remove non-existent key + assertFalse(tree.remove("nonexistent")); + } + + @Test + public void testGetAllKeys() { + CubicIndexTree tree = new CubicIndexTree(3, 10, true); + + tree.put("key1", "value1".getBytes()); + tree.put("key2", "value2".getBytes()); + tree.put("key3", "value3".getBytes()); + + Set allKeys = tree.getAllKeys(); + + assertEquals(3, allKeys.size()); + assertTrue(allKeys.contains("key1")); + assertTrue(allKeys.contains("key2")); + assertTrue(allKeys.contains("key3")); + } + + @Test + public void testCubicIndexStatistics() { + CubicIndexTree tree = new CubicIndexTree(5, 20, true); + + // Add data + for (int i = 0; i < 100; i++) { + tree.put("key-" + i, ("value-" + i).getBytes()); + } + + Map stats = tree.getStats(); + + assertNotNull(stats); + assertTrue((Integer) stats.get("totalKeys") > 0); + assertTrue((Integer) stats.get("totalLevels") > 0); + assertNotNull(stats.get("sideDistribution")); + assertNotNull(stats.get("levels")); + } + + @Test + public void testSideEnumeration() { + // Test all 6 sides exist + CubicIndexNode.Side[] sides = CubicIndexNode.Side.values(); + assertEquals(6, sides.length); + + // Test side names + assertEquals("FRONT", CubicIndexNode.Side.FRONT.name()); + assertEquals("BACK", CubicIndexNode.Side.BACK.name()); + assertEquals("LEFT", CubicIndexNode.Side.LEFT.name()); + assertEquals("RIGHT", CubicIndexNode.Side.RIGHT.name()); + assertEquals("TOP", CubicIndexNode.Side.TOP.name()); + assertEquals("BOTTOM", CubicIndexNode.Side.BOTTOM.name()); + + // Test side indices + assertEquals(0, CubicIndexNode.Side.FRONT.getIndex()); + assertEquals(5, CubicIndexNode.Side.BOTTOM.getIndex()); + } +} diff --git a/src/test/java/com/cube/replication/ReplicationTest.java b/src/test/java/com/cube/replication/ReplicationTest.java new file mode 100644 index 0000000..fb956f7 --- /dev/null +++ b/src/test/java/com/cube/replication/ReplicationTest.java @@ -0,0 +1,265 @@ +package com.cube.replication; + +import com.cube.cluster.ClusterNode; +import com.cube.consistency.ConsistencyLevel; +import com.cube.storage.LSMStorageEngine; +import org.junit.jupiter.api.*; + +import java.io.IOException; +import java.nio.file.*; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for Phase 2: Replication and Consistency + */ +public class ReplicationTest { + + private Path testDir; + private LSMStorageEngine storage; + private HintedHandoffManager hintedHandoff; + private ReadRepairManager readRepair; + private ReplicationCoordinator coordinator; + + @BeforeEach + public void setUp() throws IOException { + testDir = Files.createTempDirectory("cube-replication-test-"); + storage = new LSMStorageEngine(testDir.toString()); + + Path hintsDir = testDir.resolve("hints"); + hintedHandoff = new HintedHandoffManager(hintsDir.toString(), 1000, 3600000); + + readRepair = new ReadRepairManager(100); // 100% read repair chance + + ReplicationStrategy strategy = new SimpleReplicationStrategy(); + + coordinator = new ReplicationCoordinator( + storage, + strategy, + hintedHandoff, + readRepair, + 3, // RF=3 + 5000, // 5s write timeout + 3000 // 3s read timeout + ); + } + + @AfterEach + public void tearDown() throws IOException { + if (coordinator != null) { + coordinator.shutdown(); + } + if (storage != null) { + storage.close(); + } + deleteDirectory(testDir); + } + + @Test + public void testConsistencyLevelCalculation() { + // QUORUM with RF=3 should require 2 responses + assertEquals(2, ConsistencyLevel.QUORUM.getRequiredResponses(3)); + + // QUORUM with RF=5 should require 3 responses + assertEquals(3, ConsistencyLevel.QUORUM.getRequiredResponses(5)); + + // ALL should require all replicas + assertEquals(3, ConsistencyLevel.ALL.getRequiredResponses(3)); + assertEquals(5, ConsistencyLevel.ALL.getRequiredResponses(5)); + + // ONE should require 1 + assertEquals(1, ConsistencyLevel.ONE.getRequiredResponses(3)); + } + + @Test + public void testSimpleReplicationStrategy() { + SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(); + + List nodes = new ArrayList<>(); + nodes.add(new ClusterNode("node1", "host1", 8080)); + nodes.add(new ClusterNode("node2", "host2", 8080)); + nodes.add(new ClusterNode("node3", "host3", 8080)); + + // Get replicas for a key + List replicas = strategy.getReplicaNodes("testkey", 2, nodes); + + assertFalse(replicas.isEmpty()); + assertTrue(replicas.size() <= 2); + } + + @Test + public void testHintedHandoff() { + String targetNode = "node-2"; + String key = "test-key"; + byte[] value = "test-value".getBytes(); + + // Store hint + hintedHandoff.storeHint(targetNode, key, value); + + // Verify hint was stored + assertEquals(1, hintedHandoff.getHintCount(targetNode)); + assertEquals(1, hintedHandoff.getTotalHintCount()); + } + + @Test + public void testHintedHandoffReplay() { + String targetNode = "node-2"; + String key = "test-key"; + byte[] value = "test-value".getBytes(); + + // Store hint + hintedHandoff.storeHint(targetNode, key, value); + + // Replay hints + List replayedKeys = new ArrayList<>(); + hintedHandoff.replayHintsForNode(targetNode, hint -> { + replayedKeys.add(hint.getKey()); + return true; // Success + }); + + // Verify hint was replayed + assertTrue(replayedKeys.contains(key)); + assertEquals(0, hintedHandoff.getHintCount(targetNode)); + } + + @Test + public void testReadRepairDetection() { + ReadRepairManager manager = new ReadRepairManager(100); + + ClusterNode node1 = new ClusterNode("node1", "host1", 8080); + ClusterNode node2 = new ClusterNode("node2", "host2", 8080); + ClusterNode node3 = new ClusterNode("node3", "host3", 8080); + + // Create responses with different values + List responses = new ArrayList<>(); + responses.add(new ReadRepairManager.ReadResponse(node1, "key1", "value1".getBytes(), 1000)); + responses.add(new ReadRepairManager.ReadResponse(node2, "key1", "value2".getBytes(), 2000)); // Newer + responses.add(new ReadRepairManager.ReadResponse(node3, "key1", "value1".getBytes(), 1000)); + + // Detect conflicts + List conflicts = manager.detectConflicts(responses); + + assertEquals(3, conflicts.size(), "Should detect conflicts when values differ"); + + manager.shutdown(); + } + + @Test + public void testReadRepairChoosesNewestValue() throws Exception { + ReadRepairManager manager = new ReadRepairManager(100); + + ClusterNode node1 = new ClusterNode("node1", "host1", 8080); + ClusterNode node2 = new ClusterNode("node2", "host2", 8080); + ClusterNode node3 = new ClusterNode("node3", "host3", 8080); + + List responses = new ArrayList<>(); + responses.add(new ReadRepairManager.ReadResponse(node1, "key1", "old".getBytes(), 1000)); + responses.add(new ReadRepairManager.ReadResponse(node2, "key1", "new".getBytes(), 3000)); // Newest + responses.add(new ReadRepairManager.ReadResponse(node3, "key1", "old".getBytes(), 1000)); + + // Perform read repair + List repairedNodes = new ArrayList<>(); + ReadRepairManager.ReadRepairResult result = manager.performReadRepairBlocking( + responses, + (node, key, value, timestamp) -> { + repairedNodes.add(node.getNodeId()); + return true; + } + ); + + // Verify newest value was chosen + assertArrayEquals("new".getBytes(), result.getCanonicalValue()); + + // Verify repair was needed + assertTrue(result.isRepairNeeded()); + + // Verify old nodes were repaired + assertEquals(2, result.getRepairedNodes()); + assertTrue(repairedNodes.contains("node1")); + assertTrue(repairedNodes.contains("node3")); + + manager.shutdown(); + } + + @Test + public void testNetworkTopologyStrategy() { + Map dcRF = new HashMap<>(); + dcRF.put("dc1", 3); + dcRF.put("dc2", 2); + + NetworkTopologyReplicationStrategy strategy = + new NetworkTopologyReplicationStrategy(dcRF); + + List nodes = new ArrayList<>(); + nodes.add(new ClusterNode("node1", "host1", 8080, "dc1", "rack1")); + nodes.add(new ClusterNode("node2", "host2", 8080, "dc1", "rack2")); + nodes.add(new ClusterNode("node3", "host3", 8080, "dc1", "rack3")); + nodes.add(new ClusterNode("node4", "host4", 8080, "dc2", "rack1")); + nodes.add(new ClusterNode("node5", "host5", 8080, "dc2", "rack2")); + + List replicas = strategy.getReplicaNodes("testkey", 3, nodes); + + assertFalse(replicas.isEmpty()); + assertEquals("NetworkTopologyStrategy", strategy.getName()); + } + + @Test + public void testWriteWithConsistencyOne() throws IOException { + List nodes = new ArrayList<>(); + nodes.add(new ClusterNode("node1", "localhost", 8080)); + + ReplicationCoordinator.WriteResult result = coordinator.write( + "key1", + "value1".getBytes(), + ConsistencyLevel.ONE, + nodes + ); + + assertTrue(result.isSuccess()); + assertEquals(1, result.getSuccessfulWrites()); + } + + @Test + public void testReadWithConsistencyOne() throws IOException { + List nodes = new ArrayList<>(); + nodes.add(new ClusterNode("node1", "localhost", 8080)); + + // Write first + coordinator.writeLocal("key1", "value1".getBytes()); + + // Read + ReplicationCoordinator.ReadResult result = coordinator.read( + "key1", + ConsistencyLevel.ONE, + nodes + ); + + assertTrue(result.isSuccess()); + assertArrayEquals("value1".getBytes(), result.getValue()); + } + + @Test + public void testCoordinatorStats() { + Map stats = coordinator.getStats(); + + assertNotNull(stats); + assertEquals(3, stats.get("replicationFactor")); + assertEquals(5000L, stats.get("writeTimeoutMs")); + assertEquals(3000L, stats.get("readTimeoutMs")); + } + + private void deleteDirectory(Path dir) throws IOException { + if (Files.exists(dir)) { + Files.walk(dir) + .sorted(Comparator.reverseOrder()) + .forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + // Ignore + } + }); + } + } +} diff --git a/src/test/java/com/cube/sql/SQLParserTest.java b/src/test/java/com/cube/sql/SQLParserTest.java new file mode 100644 index 0000000..4417b60 --- /dev/null +++ b/src/test/java/com/cube/sql/SQLParserTest.java @@ -0,0 +1,204 @@ +package com.cube.sql; + +import org.junit.jupiter.api.*; +import static org.junit.jupiter.api.Assertions.*; + +import java.util.*; + +/** + * Tests for SQL Parser + */ +public class SQLParserTest { + + @Test + public void testParseSelectAll() { + String sql = "SELECT * FROM users.profiles WHERE id = 'user-1'"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.SELECT, parsed.getType()); + assertEquals("users", parsed.getKeyspace()); + assertEquals("profiles", parsed.getTable()); + assertTrue(parsed.getSelectColumns().contains("*")); + assertEquals("user-1", parsed.getWhereClause().get("id")); + } + + @Test + public void testParseSelectColumns() { + String sql = "SELECT name, email, age FROM users.profiles WHERE id = 'user-1'"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.SELECT, parsed.getType()); + assertEquals(3, parsed.getSelectColumns().size()); + assertTrue(parsed.getSelectColumns().contains("name")); + assertTrue(parsed.getSelectColumns().contains("email")); + assertTrue(parsed.getSelectColumns().contains("age")); + } + + @Test + public void testParseSelectDefaultKeyspace() { + String sql = "SELECT * FROM profiles WHERE id = 'user-1'"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals("default", parsed.getKeyspace()); + assertEquals("profiles", parsed.getTable()); + } + + @Test + public void testParseSelectMultipleConditions() { + String sql = "SELECT * FROM users WHERE id = 'user-1' AND status = 'active'"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(2, parsed.getWhereClause().size()); + assertEquals("user-1", parsed.getWhereClause().get("id")); + assertEquals("active", parsed.getWhereClause().get("status")); + } + + @Test + public void testParseInsert() { + String sql = "INSERT INTO users.profiles (id, name, email) VALUES ('user-1', 'Alice', 'alice@example.com')"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.INSERT, parsed.getType()); + assertEquals("users", parsed.getKeyspace()); + assertEquals("profiles", parsed.getTable()); + assertEquals(3, parsed.getColumns().size()); + assertEquals("user-1", parsed.getColumns().get("id")); + assertEquals("Alice", parsed.getColumns().get("name")); + assertEquals("alice@example.com", parsed.getColumns().get("email")); + } + + @Test + public void testParseInsertWithQuotes() { + String sql = "INSERT INTO messages (id, text) VALUES ('msg-1', 'Hello \"World\"')"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.INSERT, parsed.getType()); + assertEquals("msg-1", parsed.getColumns().get("id")); + assertTrue(parsed.getColumns().get("text").contains("Hello")); + } + + @Test + public void testParseUpdate() { + String sql = "UPDATE users.profiles SET name='Alice Johnson', age='31' WHERE id='user-1'"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.UPDATE, parsed.getType()); + assertEquals("users", parsed.getKeyspace()); + assertEquals("profiles", parsed.getTable()); + assertEquals(2, parsed.getColumns().size()); + assertEquals("Alice Johnson", parsed.getColumns().get("name")); + assertEquals("31", parsed.getColumns().get("age")); + assertEquals("user-1", parsed.getWhereClause().get("id")); + } + + @Test + public void testParseDelete() { + String sql = "DELETE FROM users.profiles WHERE id='user-1'"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.DELETE, parsed.getType()); + assertEquals("users", parsed.getKeyspace()); + assertEquals("profiles", parsed.getTable()); + assertEquals("user-1", parsed.getWhereClause().get("id")); + } + + @Test + public void testParseCreateTable() { + String sql = "CREATE TABLE users.profiles (id TEXT PRIMARY KEY, name TEXT, email TEXT)"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.CREATE_TABLE, parsed.getType()); + assertEquals("users", parsed.getKeyspace()); + assertEquals("profiles", parsed.getTable()); + assertEquals(3, parsed.getColumnDefinitions().size()); + assertEquals("TEXT", parsed.getColumnDefinitions().get("id")); + assertEquals("TEXT", parsed.getColumnDefinitions().get("name")); + assertEquals("TEXT", parsed.getColumnDefinitions().get("email")); + assertEquals("id", parsed.getPrimaryKey()); + } + + @Test + public void testParseCreateTableImplicitPrimaryKey() { + String sql = "CREATE TABLE products (sku TEXT, name TEXT, price TEXT)"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.CREATE_TABLE, parsed.getType()); + assertEquals("default", parsed.getKeyspace()); + assertEquals("products", parsed.getTable()); + assertEquals("sku", parsed.getPrimaryKey()); + } + + @Test + public void testParseDropTable() { + String sql = "DROP TABLE users.profiles"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.DROP_TABLE, parsed.getType()); + assertEquals("users", parsed.getKeyspace()); + assertEquals("profiles", parsed.getTable()); + } + + @Test + public void testParseDescribe() { + String sql = "DESCRIBE users.profiles"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.DESCRIBE, parsed.getType()); + assertEquals("users", parsed.getKeyspace()); + assertEquals("profiles", parsed.getTable()); + } + + @Test + public void testParseDescribeShort() { + String sql = "DESC users.profiles"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.DESCRIBE, parsed.getType()); + } + + @Test + public void testParseShowTables() { + String sql = "SHOW TABLES"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.SHOW_TABLES, parsed.getType()); + } + + @Test + public void testParseSemicolon() { + String sql = "SELECT * FROM users WHERE id = 'user-1';"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.SELECT, parsed.getType()); + assertEquals("user-1", parsed.getWhereClause().get("id")); + } + + @Test + public void testParseCaseInsensitive() { + String sql = "select * from users where id = 'user-1'"; + SQLParser.ParsedSQL parsed = SQLParser.parse(sql); + + assertEquals(SQLParser.SQLType.SELECT, parsed.getType()); + } + + @Test + public void testParseInvalidSQL() { + assertThrows(IllegalArgumentException.class, () -> { + SQLParser.parse("INVALID SQL STATEMENT"); + }); + } + + @Test + public void testParseEmptySQL() { + assertThrows(IllegalArgumentException.class, () -> { + SQLParser.parse(""); + }); + } + + @Test + public void testParseNullSQL() { + assertThrows(IllegalArgumentException.class, () -> { + SQLParser.parse(null); + }); + } +} diff --git a/src/test/java/com/cube/storage/CubeStorageEngineTest.java b/src/test/java/com/cube/storage/CubeStorageEngineTest.java new file mode 100644 index 0000000..020238c --- /dev/null +++ b/src/test/java/com/cube/storage/CubeStorageEngineTest.java @@ -0,0 +1,186 @@ +package com.cube.storage; + +import org.junit.jupiter.api.*; +import java.io.IOException; +import java.nio.file.*; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Comprehensive tests for Cube LSM storage engine + */ +public class CubeStorageEngineTest { + + private Path testDir; + private LSMStorageEngine storage; + + @BeforeEach + public void setUp() throws IOException { + testDir = Files.createTempDirectory("cube-test-"); + storage = new LSMStorageEngine(testDir.toString()); + } + + @AfterEach + public void tearDown() throws IOException { + if (storage != null) { + storage.close(); + } + deleteDirectory(testDir); + } + + @Test + public void testBasicPutAndGet() throws IOException { + storage.put("user:1", "Alice".getBytes()); + + byte[] result = storage.get("user:1"); + assertNotNull(result); + assertEquals("Alice", new String(result)); + } + + @Test + public void testMultipleOperations() throws IOException { + // Insert 100 records + for (int i = 0; i < 100; i++) { + storage.put("key:" + i, ("value:" + i).getBytes()); + } + + // Verify all records + for (int i = 0; i < 100; i++) { + byte[] result = storage.get("key:" + i); + assertNotNull(result); + assertEquals("value:" + i, new String(result)); + } + } + + @Test + public void testUpdate() throws IOException { + storage.put("key1", "value1".getBytes()); + assertEquals("value1", new String(storage.get("key1"))); + + storage.put("key1", "value2".getBytes()); + assertEquals("value2", new String(storage.get("key1"))); + } + + @Test + public void testDelete() throws IOException { + storage.put("key1", "value1".getBytes()); + assertNotNull(storage.get("key1")); + + storage.delete("key1"); + + // After delete, tombstone or null + byte[] result = storage.get("key1"); + if (result != null) { + assertArrayEquals(WriteAheadLog.TOMBSTONE, result); + } + } + + @Test + public void testScanWithPrefix() throws IOException { + storage.put("user:1:name", "Alice".getBytes()); + storage.put("user:1:email", "alice@example.com".getBytes()); + storage.put("user:2:name", "Bob".getBytes()); + storage.put("product:1:name", "Laptop".getBytes()); + + Iterator keys = storage.scan("user:1"); + Set results = new HashSet<>(); + keys.forEachRemaining(results::add); + + assertEquals(2, results.size()); + assertTrue(results.contains("user:1:name")); + assertTrue(results.contains("user:1:email")); + } + + @Test + public void testScanEntries() throws IOException { + storage.put("user:1:name", "Alice".getBytes()); + storage.put("user:1:email", "alice@example.com".getBytes()); + + Iterator> entries = storage.scanEntries("user:1"); + Map results = new HashMap<>(); + + entries.forEachRemaining(e -> + results.put(e.getKey(), new String(e.getValue()))); + + assertEquals(2, results.size()); + assertEquals("Alice", results.get("user:1:name")); + assertEquals("alice@example.com", results.get("user:1:email")); + } + + @Test + public void testFlush() throws IOException, InterruptedException { + storage.put("key1", "value1".getBytes()); + storage.put("key2", "value2".getBytes()); + + storage.flush(); + + // Wait a bit for async flush to complete + Thread.sleep(100); + + byte[] value1 = storage.get("key1"); + byte[] value2 = storage.get("key2"); + + assertNotNull(value1, "key1 should not be null after flush"); + assertNotNull(value2, "key2 should not be null after flush"); + + assertEquals("value1", new String(value1)); + assertEquals("value2", new String(value2)); + } + + @Test + public void testRecovery() throws IOException, InterruptedException { + storage.put("key1", "value1".getBytes()); + storage.put("key2", "value2".getBytes()); + + storage.close(); + + // Wait a moment before reopening + Thread.sleep(100); + + // Reopen and verify recovery + storage = new LSMStorageEngine(testDir.toString()); + + byte[] value1 = storage.get("key1"); + byte[] value2 = storage.get("key2"); + + assertNotNull(value1, "key1 should be recovered"); + assertNotNull(value2, "key2 should be recovered"); + + assertEquals("value1", new String(value1)); + assertEquals("value2", new String(value2)); + } + + @Test + public void testStats() throws IOException { + for (int i = 0; i < 50; i++) { + storage.put("key:" + i, ("value:" + i).getBytes()); + } + + StorageEngine.StorageStats stats = storage.getStats(); + + assertNotNull(stats); + assertTrue(stats.getMemtableSize() > 0); + System.out.println("Stats: " + stats); + } + + @Test + public void testNonExistentKey() throws IOException { + byte[] result = storage.get("nonexistent"); + assertNull(result); + } + + private void deleteDirectory(Path dir) throws IOException { + if (Files.exists(dir)) { + Files.walk(dir) + .sorted(Comparator.reverseOrder()) + .forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + // Ignore + } + }); + } + } +} diff --git a/target/classes/application.properties b/target/classes/application.properties new file mode 100644 index 0000000..71de3f7 --- /dev/null +++ b/target/classes/application.properties @@ -0,0 +1,9 @@ +server.port=8080 +spring.application.name=cube-db + +# Logging +logging.level.com.cube=INFO +logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss} - %msg%n + +# Cube DB Configuration +cube.datadir=/tmp/cube-data diff --git a/target/classes/com/cube/CubeApplication.class b/target/classes/com/cube/CubeApplication.class new file mode 100644 index 0000000..fbe0d27 --- /dev/null +++ b/target/classes/com/cube/CubeApplication.class Binary files differ diff --git a/target/classes/com/cube/api/CubeController.class b/target/classes/com/cube/api/CubeController.class new file mode 100644 index 0000000..c52ebf5 --- /dev/null +++ b/target/classes/com/cube/api/CubeController.class Binary files differ diff --git a/target/classes/com/cube/api/SQLController.class b/target/classes/com/cube/api/SQLController.class new file mode 100644 index 0000000..5259aad --- /dev/null +++ b/target/classes/com/cube/api/SQLController.class Binary files differ diff --git a/target/classes/com/cube/cluster/ClusterNode$NodeState.class b/target/classes/com/cube/cluster/ClusterNode$NodeState.class new file mode 100644 index 0000000..fc827e6 --- /dev/null +++ b/target/classes/com/cube/cluster/ClusterNode$NodeState.class Binary files differ diff --git a/target/classes/com/cube/cluster/ClusterNode.class b/target/classes/com/cube/cluster/ClusterNode.class new file mode 100644 index 0000000..6a4ee77 --- /dev/null +++ b/target/classes/com/cube/cluster/ClusterNode.class Binary files differ diff --git a/target/classes/com/cube/cluster/ClusterUtils$HealthChecker.class b/target/classes/com/cube/cluster/ClusterUtils$HealthChecker.class new file mode 100644 index 0000000..fa43e02 --- /dev/null +++ b/target/classes/com/cube/cluster/ClusterUtils$HealthChecker.class Binary files differ diff --git a/target/classes/com/cube/cluster/ClusterUtils$NodeDiscovery.class b/target/classes/com/cube/cluster/ClusterUtils$NodeDiscovery.class new file mode 100644 index 0000000..4000a5b --- /dev/null +++ b/target/classes/com/cube/cluster/ClusterUtils$NodeDiscovery.class Binary files differ diff --git a/target/classes/com/cube/cluster/ClusterUtils$StatsAggregator.class b/target/classes/com/cube/cluster/ClusterUtils$StatsAggregator.class new file mode 100644 index 0000000..f4c1c16 --- /dev/null +++ b/target/classes/com/cube/cluster/ClusterUtils$StatsAggregator.class Binary files differ diff --git a/target/classes/com/cube/cluster/ClusterUtils$TokenRing.class b/target/classes/com/cube/cluster/ClusterUtils$TokenRing.class new file mode 100644 index 0000000..6bd859c --- /dev/null +++ b/target/classes/com/cube/cluster/ClusterUtils$TokenRing.class Binary files differ diff --git a/target/classes/com/cube/cluster/ClusterUtils$Topology.class b/target/classes/com/cube/cluster/ClusterUtils$Topology.class new file mode 100644 index 0000000..14b5c10 --- /dev/null +++ b/target/classes/com/cube/cluster/ClusterUtils$Topology.class Binary files differ diff --git a/target/classes/com/cube/cluster/ClusterUtils.class b/target/classes/com/cube/cluster/ClusterUtils.class new file mode 100644 index 0000000..ba11aa4 --- /dev/null +++ b/target/classes/com/cube/cluster/ClusterUtils.class Binary files differ diff --git a/target/classes/com/cube/consistency/ConsistencyLevel.class b/target/classes/com/cube/consistency/ConsistencyLevel.class new file mode 100644 index 0000000..8d353d1 --- /dev/null +++ b/target/classes/com/cube/consistency/ConsistencyLevel.class Binary files differ diff --git a/target/classes/com/cube/cql/CQLParser$ParsedQuery.class b/target/classes/com/cube/cql/CQLParser$ParsedQuery.class new file mode 100644 index 0000000..7ce9417 --- /dev/null +++ b/target/classes/com/cube/cql/CQLParser$ParsedQuery.class Binary files differ diff --git a/target/classes/com/cube/cql/CQLParser.class b/target/classes/com/cube/cql/CQLParser.class new file mode 100644 index 0000000..5fb28ba --- /dev/null +++ b/target/classes/com/cube/cql/CQLParser.class Binary files differ diff --git a/target/classes/com/cube/cql/QueryExecutor$1.class b/target/classes/com/cube/cql/QueryExecutor$1.class new file mode 100644 index 0000000..58a3f9d --- /dev/null +++ b/target/classes/com/cube/cql/QueryExecutor$1.class Binary files differ diff --git a/target/classes/com/cube/cql/QueryExecutor$Result.class b/target/classes/com/cube/cql/QueryExecutor$Result.class new file mode 100644 index 0000000..43a089d --- /dev/null +++ b/target/classes/com/cube/cql/QueryExecutor$Result.class Binary files differ diff --git a/target/classes/com/cube/cql/QueryExecutor.class b/target/classes/com/cube/cql/QueryExecutor.class new file mode 100644 index 0000000..0841f24 --- /dev/null +++ b/target/classes/com/cube/cql/QueryExecutor.class Binary files differ diff --git a/target/classes/com/cube/examples/CubeExamples.class b/target/classes/com/cube/examples/CubeExamples.class new file mode 100644 index 0000000..f89c1f6 --- /dev/null +++ b/target/classes/com/cube/examples/CubeExamples.class Binary files differ diff --git a/target/classes/com/cube/examples/CubicIndexExamples.class b/target/classes/com/cube/examples/CubicIndexExamples.class new file mode 100644 index 0000000..213263d --- /dev/null +++ b/target/classes/com/cube/examples/CubicIndexExamples.class Binary files differ diff --git a/target/classes/com/cube/examples/Phase2Examples.class b/target/classes/com/cube/examples/Phase2Examples.class new file mode 100644 index 0000000..2e55418 --- /dev/null +++ b/target/classes/com/cube/examples/Phase2Examples.class Binary files differ diff --git a/target/classes/com/cube/examples/SQLExamples.class b/target/classes/com/cube/examples/SQLExamples.class new file mode 100644 index 0000000..76c9cdc --- /dev/null +++ b/target/classes/com/cube/examples/SQLExamples.class Binary files differ diff --git a/target/classes/com/cube/gossip/GossipMessageHandler$1.class b/target/classes/com/cube/gossip/GossipMessageHandler$1.class new file mode 100644 index 0000000..5695c19 --- /dev/null +++ b/target/classes/com/cube/gossip/GossipMessageHandler$1.class Binary files differ diff --git a/target/classes/com/cube/gossip/GossipMessageHandler.class b/target/classes/com/cube/gossip/GossipMessageHandler.class new file mode 100644 index 0000000..7aaf92b --- /dev/null +++ b/target/classes/com/cube/gossip/GossipMessageHandler.class Binary files differ diff --git a/target/classes/com/cube/gossip/GossipProtocol$GossipConfig.class b/target/classes/com/cube/gossip/GossipProtocol$GossipConfig.class new file mode 100644 index 0000000..d98e5a8 --- /dev/null +++ b/target/classes/com/cube/gossip/GossipProtocol$GossipConfig.class Binary files differ diff --git a/target/classes/com/cube/gossip/GossipProtocol$GossipListener.class b/target/classes/com/cube/gossip/GossipProtocol$GossipListener.class new file mode 100644 index 0000000..5ffacd6 --- /dev/null +++ b/target/classes/com/cube/gossip/GossipProtocol$GossipListener.class Binary files differ diff --git a/target/classes/com/cube/gossip/GossipProtocol$GossipMessage$Type.class b/target/classes/com/cube/gossip/GossipProtocol$GossipMessage$Type.class new file mode 100644 index 0000000..6912080 --- /dev/null +++ b/target/classes/com/cube/gossip/GossipProtocol$GossipMessage$Type.class Binary files differ diff --git a/target/classes/com/cube/gossip/GossipProtocol$GossipMessage.class b/target/classes/com/cube/gossip/GossipProtocol$GossipMessage.class new file mode 100644 index 0000000..5d69ccf --- /dev/null +++ b/target/classes/com/cube/gossip/GossipProtocol$GossipMessage.class Binary files differ diff --git a/target/classes/com/cube/gossip/GossipProtocol$NodeState$Status.class b/target/classes/com/cube/gossip/GossipProtocol$NodeState$Status.class new file mode 100644 index 0000000..c04b5e1 --- /dev/null +++ b/target/classes/com/cube/gossip/GossipProtocol$NodeState$Status.class Binary files differ diff --git a/target/classes/com/cube/gossip/GossipProtocol$NodeState.class b/target/classes/com/cube/gossip/GossipProtocol$NodeState.class new file mode 100644 index 0000000..c5931eb --- /dev/null +++ b/target/classes/com/cube/gossip/GossipProtocol$NodeState.class Binary files differ diff --git a/target/classes/com/cube/gossip/GossipProtocol.class b/target/classes/com/cube/gossip/GossipProtocol.class new file mode 100644 index 0000000..59ab328 --- /dev/null +++ b/target/classes/com/cube/gossip/GossipProtocol.class Binary files differ diff --git a/target/classes/com/cube/index/CubicIndexNode$CubeSide.class b/target/classes/com/cube/index/CubicIndexNode$CubeSide.class new file mode 100644 index 0000000..d9812a9 --- /dev/null +++ b/target/classes/com/cube/index/CubicIndexNode$CubeSide.class Binary files differ diff --git a/target/classes/com/cube/index/CubicIndexNode$Side.class b/target/classes/com/cube/index/CubicIndexNode$Side.class new file mode 100644 index 0000000..ad06cbd --- /dev/null +++ b/target/classes/com/cube/index/CubicIndexNode$Side.class Binary files differ diff --git a/target/classes/com/cube/index/CubicIndexNode.class b/target/classes/com/cube/index/CubicIndexNode.class new file mode 100644 index 0000000..39440db --- /dev/null +++ b/target/classes/com/cube/index/CubicIndexNode.class Binary files differ diff --git a/target/classes/com/cube/index/CubicIndexTree.class b/target/classes/com/cube/index/CubicIndexTree.class new file mode 100644 index 0000000..1bb93db --- /dev/null +++ b/target/classes/com/cube/index/CubicIndexTree.class Binary files differ diff --git a/target/classes/com/cube/index/CubicIndexedStorage.class b/target/classes/com/cube/index/CubicIndexedStorage.class new file mode 100644 index 0000000..215a602 --- /dev/null +++ b/target/classes/com/cube/index/CubicIndexedStorage.class Binary files differ diff --git a/target/classes/com/cube/replication/HintedHandoffManager$Hint.class b/target/classes/com/cube/replication/HintedHandoffManager$Hint.class new file mode 100644 index 0000000..39db7ee --- /dev/null +++ b/target/classes/com/cube/replication/HintedHandoffManager$Hint.class Binary files differ diff --git a/target/classes/com/cube/replication/HintedHandoffManager$HintReplayCallback.class b/target/classes/com/cube/replication/HintedHandoffManager$HintReplayCallback.class new file mode 100644 index 0000000..a3e6ad9 --- /dev/null +++ b/target/classes/com/cube/replication/HintedHandoffManager$HintReplayCallback.class Binary files differ diff --git a/target/classes/com/cube/replication/HintedHandoffManager.class b/target/classes/com/cube/replication/HintedHandoffManager.class new file mode 100644 index 0000000..6aba8a3 --- /dev/null +++ b/target/classes/com/cube/replication/HintedHandoffManager.class Binary files differ diff --git a/target/classes/com/cube/replication/NetworkTopologyReplicationStrategy.class b/target/classes/com/cube/replication/NetworkTopologyReplicationStrategy.class new file mode 100644 index 0000000..8d9da4c --- /dev/null +++ b/target/classes/com/cube/replication/NetworkTopologyReplicationStrategy.class Binary files differ diff --git a/target/classes/com/cube/replication/ReadRepairManager$ReadRepairResult.class b/target/classes/com/cube/replication/ReadRepairManager$ReadRepairResult.class new file mode 100644 index 0000000..1ee8ef8 --- /dev/null +++ b/target/classes/com/cube/replication/ReadRepairManager$ReadRepairResult.class Binary files differ diff --git a/target/classes/com/cube/replication/ReadRepairManager$ReadResponse.class b/target/classes/com/cube/replication/ReadRepairManager$ReadResponse.class new file mode 100644 index 0000000..fec7058 --- /dev/null +++ b/target/classes/com/cube/replication/ReadRepairManager$ReadResponse.class Binary files differ diff --git a/target/classes/com/cube/replication/ReadRepairManager$RepairCallback.class b/target/classes/com/cube/replication/ReadRepairManager$RepairCallback.class new file mode 100644 index 0000000..58d7cfd --- /dev/null +++ b/target/classes/com/cube/replication/ReadRepairManager$RepairCallback.class Binary files differ diff --git a/target/classes/com/cube/replication/ReadRepairManager.class b/target/classes/com/cube/replication/ReadRepairManager.class new file mode 100644 index 0000000..68fb24a --- /dev/null +++ b/target/classes/com/cube/replication/ReadRepairManager.class Binary files differ diff --git a/target/classes/com/cube/replication/ReplicationCoordinator$ReadResult.class b/target/classes/com/cube/replication/ReplicationCoordinator$ReadResult.class new file mode 100644 index 0000000..4073a0b --- /dev/null +++ b/target/classes/com/cube/replication/ReplicationCoordinator$ReadResult.class Binary files differ diff --git a/target/classes/com/cube/replication/ReplicationCoordinator$WriteResult.class b/target/classes/com/cube/replication/ReplicationCoordinator$WriteResult.class new file mode 100644 index 0000000..806d664 --- /dev/null +++ b/target/classes/com/cube/replication/ReplicationCoordinator$WriteResult.class Binary files differ diff --git a/target/classes/com/cube/replication/ReplicationCoordinator.class b/target/classes/com/cube/replication/ReplicationCoordinator.class new file mode 100644 index 0000000..c4620af --- /dev/null +++ b/target/classes/com/cube/replication/ReplicationCoordinator.class Binary files differ diff --git a/target/classes/com/cube/replication/ReplicationStrategy.class b/target/classes/com/cube/replication/ReplicationStrategy.class new file mode 100644 index 0000000..7427518 --- /dev/null +++ b/target/classes/com/cube/replication/ReplicationStrategy.class Binary files differ diff --git a/target/classes/com/cube/replication/SimpleReplicationStrategy.class b/target/classes/com/cube/replication/SimpleReplicationStrategy.class new file mode 100644 index 0000000..1be0025 --- /dev/null +++ b/target/classes/com/cube/replication/SimpleReplicationStrategy.class Binary files differ diff --git a/target/classes/com/cube/shell/CubeShell.class b/target/classes/com/cube/shell/CubeShell.class new file mode 100644 index 0000000..4b3430b --- /dev/null +++ b/target/classes/com/cube/shell/CubeShell.class Binary files differ diff --git a/target/classes/com/cube/sql/SQLExecutor$1.class b/target/classes/com/cube/sql/SQLExecutor$1.class new file mode 100644 index 0000000..ed7d753 --- /dev/null +++ b/target/classes/com/cube/sql/SQLExecutor$1.class Binary files differ diff --git a/target/classes/com/cube/sql/SQLExecutor$SQLResult.class b/target/classes/com/cube/sql/SQLExecutor$SQLResult.class new file mode 100644 index 0000000..0177932 --- /dev/null +++ b/target/classes/com/cube/sql/SQLExecutor$SQLResult.class Binary files differ diff --git a/target/classes/com/cube/sql/SQLExecutor.class b/target/classes/com/cube/sql/SQLExecutor.class new file mode 100644 index 0000000..f1d75ae --- /dev/null +++ b/target/classes/com/cube/sql/SQLExecutor.class Binary files differ diff --git a/target/classes/com/cube/sql/SQLParser$ParsedSQL.class b/target/classes/com/cube/sql/SQLParser$ParsedSQL.class new file mode 100644 index 0000000..ddcf137 --- /dev/null +++ b/target/classes/com/cube/sql/SQLParser$ParsedSQL.class Binary files differ diff --git a/target/classes/com/cube/sql/SQLParser$SQLType.class b/target/classes/com/cube/sql/SQLParser$SQLType.class new file mode 100644 index 0000000..60161b7 --- /dev/null +++ b/target/classes/com/cube/sql/SQLParser$SQLType.class Binary files differ diff --git a/target/classes/com/cube/sql/SQLParser.class b/target/classes/com/cube/sql/SQLParser.class new file mode 100644 index 0000000..0c8faac --- /dev/null +++ b/target/classes/com/cube/sql/SQLParser.class Binary files differ diff --git a/target/classes/com/cube/storage/LSMStorageEngine.class b/target/classes/com/cube/storage/LSMStorageEngine.class new file mode 100644 index 0000000..4880a9e --- /dev/null +++ b/target/classes/com/cube/storage/LSMStorageEngine.class Binary files differ diff --git a/target/classes/com/cube/storage/MemTable.class b/target/classes/com/cube/storage/MemTable.class new file mode 100644 index 0000000..e0f3085 --- /dev/null +++ b/target/classes/com/cube/storage/MemTable.class Binary files differ diff --git a/target/classes/com/cube/storage/SSTable.class b/target/classes/com/cube/storage/SSTable.class new file mode 100644 index 0000000..e4379ba --- /dev/null +++ b/target/classes/com/cube/storage/SSTable.class Binary files differ diff --git a/target/classes/com/cube/storage/StorageEngine$StorageStats.class b/target/classes/com/cube/storage/StorageEngine$StorageStats.class new file mode 100644 index 0000000..2c5edc3 --- /dev/null +++ b/target/classes/com/cube/storage/StorageEngine$StorageStats.class Binary files differ diff --git a/target/classes/com/cube/storage/StorageEngine.class b/target/classes/com/cube/storage/StorageEngine.class new file mode 100644 index 0000000..c233b2e --- /dev/null +++ b/target/classes/com/cube/storage/StorageEngine.class Binary files differ diff --git a/target/classes/com/cube/storage/WriteAheadLog$LogEntry.class b/target/classes/com/cube/storage/WriteAheadLog$LogEntry.class new file mode 100644 index 0000000..d660e6b --- /dev/null +++ b/target/classes/com/cube/storage/WriteAheadLog$LogEntry.class Binary files differ diff --git a/target/classes/com/cube/storage/WriteAheadLog$OperationType.class b/target/classes/com/cube/storage/WriteAheadLog$OperationType.class new file mode 100644 index 0000000..9b53e16 --- /dev/null +++ b/target/classes/com/cube/storage/WriteAheadLog$OperationType.class Binary files differ diff --git a/target/classes/com/cube/storage/WriteAheadLog.class b/target/classes/com/cube/storage/WriteAheadLog.class new file mode 100644 index 0000000..083a6f3 --- /dev/null +++ b/target/classes/com/cube/storage/WriteAheadLog.class Binary files differ