package com.cube.shell;
import com.cube.cluster.ClusterNode;
import com.cube.consistency.ConsistencyLevel;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.*;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
/**
* CubeShell - Interactive SQL shell for Cube database with cluster management
*
* Features:
* - Connect to multiple cluster nodes
* - View cluster topology
* - Set consistency levels
* - Execute queries with replication
* - Monitor node health
* - View replication statistics
*/
public class CubeShell {
private static final String VERSION = "2.0.0";
private static final String PROMPT = "cube> ";
private final List<ClusterNode> clusterNodes;
private ClusterNode currentNode;
private ConsistencyLevel defaultConsistencyLevel;
private final HttpClient httpClient;
private final ObjectMapper objectMapper;
private final List<String> commandHistory;
private boolean running;
public CubeShell() {
this.clusterNodes = new ArrayList<>();
this.defaultConsistencyLevel = ConsistencyLevel.ONE;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
this.objectMapper = new ObjectMapper();
this.commandHistory = new ArrayList<>();
this.running = true;
}
public static void main(String[] args) {
CubeShell shell = new CubeShell();
shell.printBanner();
// Parse command line arguments
String initialHost = "localhost";
int initialPort = 8080;
for (int i = 0; i < args.length; i++) {
if (args[i].equals("--host") || args[i].equals("-h")) {
if (i + 1 < args.length) {
initialHost = args[++i];
}
} else if (args[i].equals("--port") || args[i].equals("-p")) {
if (i + 1 < args.length) {
initialPort = Integer.parseInt(args[++i]);
}
}
}
// Auto-connect to initial node
shell.connectToNode(initialHost, initialPort);
// Start interactive shell
shell.run();
}
private void printBanner() {
System.out.println("╔══════════════════════════════════════════════════════════╗");
System.out.println("║ CubeShell v" + VERSION + " ║");
System.out.println("║ Distributed Database Interactive Shell ║");
System.out.println("║ Phase 2: Cluster Edition ║");
System.out.println("╚══════════════════════════════════════════════════════════╝");
System.out.println();
System.out.println("Type 'HELP' for available commands, 'EXIT' to quit.");
System.out.println();
}
private void run() {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) {
while (running) {
System.out.print(PROMPT);
String line = reader.readLine();
if (line == null) {
break;
}
line = line.trim();
if (line.isEmpty()) {
continue;
}
commandHistory.add(line);
processCommand(line);
}
} catch (IOException e) {
System.err.println("Error reading input: " + e.getMessage());
}
System.out.println("\nGoodbye!");
}
private void processCommand(String line) {
String[] parts = line.split("\\s+", 2);
String command = parts[0].toUpperCase();
String args = parts.length > 1 ? parts[1] : "";
try {
switch (command) {
case "HELP":
case "?":
showHelp();
break;
case "CONNECT":
handleConnect(args);
break;
case "DISCONNECT":
handleDisconnect(args);
break;
case "NODES":
case "CLUSTER":
showClusterInfo();
break;
case "USE":
handleUseNode(args);
break;
case "CONSISTENCY":
case "CL":
handleConsistency(args);
break;
case "STATUS":
showNodeStatus();
break;
case "STATS":
case "STATISTICS":
showReplicationStats();
break;
case "PUT":
handlePut(args);
break;
case "GET":
handleGet(args);
break;
case "DELETE":
case "DEL":
handleDelete(args);
break;
case "SCAN":
handleScan(args);
break;
case "HISTORY":
showHistory();
break;
case "CLEAR":
case "CLS":
clearScreen();
break;
case "EXIT":
case "QUIT":
case "BYE":
running = false;
break;
default:
System.out.println("Unknown command: " + command);
System.out.println("Type 'HELP' for available commands.");
}
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
}
}
private void showHelp() {
System.out.println("\n╔════════════════════════════════════════════════════════════╗");
System.out.println("║ CubeShell Commands ║");
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.println("║ Cluster Management: ║");
System.out.println("║ CONNECT <host> <port> - Add node to cluster ║");
System.out.println("║ DISCONNECT <node-id> - Remove node from cluster ║");
System.out.println("║ NODES / CLUSTER - Show all cluster nodes ║");
System.out.println("║ USE <node-id> - Switch to specific node ║");
System.out.println("║ STATUS - Show current node status ║");
System.out.println("║ STATS - Show replication statistics ║");
System.out.println("║ ║");
System.out.println("║ Consistency: ║");
System.out.println("║ CONSISTENCY <level> - Set consistency level ║");
System.out.println("║ CL <level> - Short form ║");
System.out.println("║ Levels: ONE, TWO, THREE, QUORUM, ALL, ANY ║");
System.out.println("║ ║");
System.out.println("║ Data Operations: ║");
System.out.println("║ PUT <key> <value> - Write key-value ║");
System.out.println("║ GET <key> - Read value ║");
System.out.println("║ DELETE <key> - Delete key ║");
System.out.println("║ SCAN <prefix> - Scan with prefix ║");
System.out.println("║ ║");
System.out.println("║ Shell Commands: ║");
System.out.println("║ HISTORY - Show command history ║");
System.out.println("║ CLEAR - Clear screen ║");
System.out.println("║ HELP / ? - Show this help ║");
System.out.println("║ EXIT / QUIT - Exit shell ║");
System.out.println("╚════════════════════════════════════════════════════════════╝\n");
}
private void handleConnect(String args) {
String[] parts = args.split("\\s+");
if (parts.length < 2) {
System.out.println("Usage: CONNECT <host> <port>");
return;
}
String host = parts[0];
int port = Integer.parseInt(parts[1]);
connectToNode(host, port);
}
private void connectToNode(String host, int port) {
try {
// Test connection
String url = String.format("http://%s:%d/api/v1/health", host, port);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.timeout(Duration.ofSeconds(5))
.GET()
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
// Extract node ID from response or generate one
String nodeId = String.format("node-%s-%d", host, port);
ClusterNode node = new ClusterNode(nodeId, host, port);
// Check if already connected
boolean exists = clusterNodes.stream()
.anyMatch(n -> n.getHost().equals(host) && n.getPort() == port);
if (!exists) {
clusterNodes.add(node);
System.out.println("✓ Connected to " + host + ":" + port);
System.out.println(" Node ID: " + nodeId);
if (currentNode == null) {
currentNode = node;
System.out.println(" Set as current node");
}
} else {
System.out.println("Already connected to " + host + ":" + port);
}
} else {
System.out.println("✗ Failed to connect: HTTP " + response.statusCode());
}
} catch (Exception e) {
System.out.println("✗ Failed to connect: " + e.getMessage());
}
}
private void handleDisconnect(String args) {
if (args.isEmpty()) {
System.out.println("Usage: DISCONNECT <node-id>");
return;
}
String nodeId = args.trim();
boolean removed = clusterNodes.removeIf(n -> n.getNodeId().equals(nodeId));
if (removed) {
System.out.println("✓ Disconnected from " + nodeId);
if (currentNode != null && currentNode.getNodeId().equals(nodeId)) {
currentNode = clusterNodes.isEmpty() ? null : clusterNodes.get(0);
if (currentNode != null) {
System.out.println(" Switched to " + currentNode.getNodeId());
}
}
} else {
System.out.println("✗ Node not found: " + nodeId);
}
}
private void showClusterInfo() {
System.out.println("\n╔════════════════════════════════════════════════════════════╗");
System.out.println("║ Cluster Nodes ║");
System.out.println("╠════════════════════════════════════════════════════════════╣");
if (clusterNodes.isEmpty()) {
System.out.println("║ No nodes connected ║");
System.out.println("╚════════════════════════════════════════════════════════════╝\n");
return;
}
for (ClusterNode node : clusterNodes) {
String current = (node.equals(currentNode)) ? "➜ " : " ";
String status = node.isAlive() ? "✓" : "✗";
System.out.printf("║ %s%s %-20s %s %-25s ║%n",
current, status, node.getNodeId(),
node.getEndpoint(),
"DC:" + node.getDatacenter());
}
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.printf("║ Total Nodes: %-3d Alive: %-3d Current: %-18s║%n",
clusterNodes.size(),
clusterNodes.stream().filter(ClusterNode::isAlive).count(),
currentNode != null ? currentNode.getNodeId() : "none");
System.out.println("╚════════════════════════════════════════════════════════════╝\n");
}
private void handleUseNode(String args) {
if (args.isEmpty()) {
System.out.println("Usage: USE <node-id>");
return;
}
String nodeId = args.trim();
ClusterNode node = clusterNodes.stream()
.filter(n -> n.getNodeId().equals(nodeId))
.findFirst()
.orElse(null);
if (node != null) {
currentNode = node;
System.out.println("✓ Switched to " + nodeId);
} else {
System.out.println("✗ Node not found: " + nodeId);
}
}
private void handleConsistency(String args) {
if (args.isEmpty()) {
System.out.println("\nCurrent consistency level: " + defaultConsistencyLevel);
System.out.println("\nAvailable levels:");
for (ConsistencyLevel cl : ConsistencyLevel.values()) {
System.out.println(" " + cl.name() + " - " + cl.getDescription());
}
System.out.println();
return;
}
try {
ConsistencyLevel cl = ConsistencyLevel.valueOf(args.trim().toUpperCase());
defaultConsistencyLevel = cl;
System.out.println("✓ Consistency level set to " + cl);
} catch (IllegalArgumentException e) {
System.out.println("✗ Invalid consistency level: " + args);
System.out.println(" Valid: ONE, TWO, THREE, QUORUM, ALL, ANY");
}
}
private void showNodeStatus() {
if (currentNode == null) {
System.out.println("✗ Not connected to any node");
return;
}
try {
String url = String.format("http://%s:%d/api/v1/stats",
currentNode.getHost(), currentNode.getPort());
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
@SuppressWarnings("unchecked")
Map<String, Object> data = objectMapper.readValue(
response.body(), Map.class);
System.out.println("\n╔════════════════════════════════════════════════════════════╗");
System.out.println("║ Node Status ║");
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.printf("║ Node: %-45s ║%n", currentNode.getNodeId());
System.out.printf("║ Endpoint: %-45s ║%n", currentNode.getEndpoint());
System.out.printf("║ Status: %-45s ║%n", "✓ ALIVE");
if (data.containsKey("stats")) {
@SuppressWarnings("unchecked")
Map<String, Object> stats = (Map<String, Object>) data.get("stats");
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.println("║ Storage Statistics: ║");
System.out.printf("║ Total Keys: %-40s ║%n", stats.get("totalKeys"));
System.out.printf("║ Total Size: %-40s ║%n", stats.get("totalSize") + " bytes");
System.out.printf("║ MemTable Size: %-40s ║%n", stats.get("memtableSize") + " bytes");
System.out.printf("║ SSTable Count: %-40s ║%n", stats.get("sstableCount"));
}
System.out.println("╚════════════════════════════════════════════════════════════╝\n");
}
} catch (Exception e) {
System.out.println("✗ Failed to get status: " + e.getMessage());
}
}
private void showReplicationStats() {
System.out.println("\n╔════════════════════════════════════════════════════════════╗");
System.out.println("║ Replication Statistics ║");
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.printf("║ Cluster Nodes: %-33d ║%n", clusterNodes.size());
System.out.printf("║ Alive Nodes: %-33d ║%n",
clusterNodes.stream().filter(ClusterNode::isAlive).count());
System.out.printf("║ Default Consistency: %-33s ║%n", defaultConsistencyLevel);
System.out.println("╠════════════════════════════════════════════════════════════╣");
System.out.println("║ Datacenter Distribution: ║");
Map<String, Long> dcCount = clusterNodes.stream()
.collect(Collectors.groupingBy(ClusterNode::getDatacenter, Collectors.counting()));
for (Map.Entry<String, Long> entry : dcCount.entrySet()) {
System.out.printf("║ %-20s %-33d ║%n", entry.getKey() + ":", entry.getValue());
}
System.out.println("╚════════════════════════════════════════════════════════════╝\n");
}
private void handlePut(String args) {
if (currentNode == null) {
System.out.println("✗ Not connected to any node");
return;
}
String[] parts = args.split("\\s+", 2);
if (parts.length < 2) {
System.out.println("Usage: PUT <key> <value>");
return;
}
String key = parts[0];
String value = parts[1];
try {
String url = String.format("http://%s:%d/api/v1/put",
currentNode.getHost(), currentNode.getPort());
Map<String, String> body = new HashMap<>();
body.put("key", key);
body.put("value", value);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(
objectMapper.writeValueAsString(body)))
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
System.out.println("✓ PUT successful");
System.out.println(" Key: " + key);
System.out.println(" Value: " + value);
System.out.println(" CL: " + defaultConsistencyLevel);
} else {
System.out.println("✗ PUT failed: HTTP " + response.statusCode());
}
} catch (Exception e) {
System.out.println("✗ Error: " + e.getMessage());
}
}
private void handleGet(String args) {
if (currentNode == null) {
System.out.println("✗ Not connected to any node");
return;
}
if (args.isEmpty()) {
System.out.println("Usage: GET <key>");
return;
}
String key = args.trim();
try {
String url = String.format("http://%s:%d/api/v1/get/%s",
currentNode.getHost(), currentNode.getPort(), key);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
@SuppressWarnings("unchecked")
Map<String, Object> data = objectMapper.readValue(
response.body(), Map.class);
if (Boolean.TRUE.equals(data.get("found"))) {
System.out.println("✓ Found");
System.out.println(" Key: " + key);
System.out.println(" Value: " + data.get("value"));
System.out.println(" CL: " + defaultConsistencyLevel);
} else {
System.out.println("✗ Not found: " + key);
}
} else {
System.out.println("✗ GET failed: HTTP " + response.statusCode());
}
} catch (Exception e) {
System.out.println("✗ Error: " + e.getMessage());
}
}
private void handleDelete(String args) {
if (currentNode == null) {
System.out.println("✗ Not connected to any node");
return;
}
if (args.isEmpty()) {
System.out.println("Usage: DELETE <key>");
return;
}
String key = args.trim();
try {
String url = String.format("http://%s:%d/api/v1/delete/%s",
currentNode.getHost(), currentNode.getPort(), key);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.DELETE()
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
System.out.println("✓ DELETE successful");
System.out.println(" Key: " + key);
System.out.println(" CL: " + defaultConsistencyLevel);
} else {
System.out.println("✗ DELETE failed: HTTP " + response.statusCode());
}
} catch (Exception e) {
System.out.println("✗ Error: " + e.getMessage());
}
}
private void handleScan(String args) {
if (currentNode == null) {
System.out.println("✗ Not connected to any node");
return;
}
if (args.isEmpty()) {
System.out.println("Usage: SCAN <prefix>");
return;
}
String prefix = args.trim();
try {
String url = String.format("http://%s:%d/api/v1/scan?prefix=%s",
currentNode.getHost(), currentNode.getPort(), prefix);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
@SuppressWarnings("unchecked")
Map<String, Object> data = objectMapper.readValue(
response.body(), Map.class);
int count = (int) data.get("count");
System.out.println("✓ Found " + count + " result(s)");
if (count > 0) {
@SuppressWarnings("unchecked")
Map<String, String> results = (Map<String, String>) data.get("results");
System.out.println("\n┌────────────────────────────┬────────────────────────────┐");
System.out.println("│ Key │ Value │");
System.out.println("├────────────────────────────┼────────────────────────────┤");
for (Map.Entry<String, String> entry : results.entrySet()) {
System.out.printf("│ %-26s │ %-26s │%n",
truncate(entry.getKey(), 26),
truncate(entry.getValue(), 26));
}
System.out.println("└────────────────────────────┴────────────────────────────┘");
}
System.out.println();
} else {
System.out.println("✗ SCAN failed: HTTP " + response.statusCode());
}
} catch (Exception e) {
System.out.println("✗ Error: " + e.getMessage());
}
}
private void showHistory() {
System.out.println("\n╔════════════════════════════════════════════════════════════╗");
System.out.println("║ Command History ║");
System.out.println("╠════════════════════════════════════════════════════════════╣");
if (commandHistory.isEmpty()) {
System.out.println("║ No commands in history ║");
} else {
int start = Math.max(0, commandHistory.size() - 20);
for (int i = start; i < commandHistory.size(); i++) {
System.out.printf("║ %3d: %-53s ║%n", i + 1,
truncate(commandHistory.get(i), 53));
}
}
System.out.println("╚════════════════════════════════════════════════════════════╝\n");
}
private void clearScreen() {
System.out.print("\033[H\033[2J");
System.out.flush();
printBanner();
}
private String truncate(String str, int maxLen) {
if (str.length() <= maxLen) {
return str;
}
return str.substring(0, maxLen - 3) + "...";
}
}