package com.cube.storage;
import java.io.*;
import java.nio.file.*;
import java.util.*;
/**
* Write-Ahead Log (WAL) for durability in Cube database.
*/
public class WriteAheadLog implements AutoCloseable {
private static final int MAGIC_NUMBER = 0x43554245; // "CUBE"
private static final int VERSION = 1;
public static final byte[] TOMBSTONE = new byte[]{0x00};
private final Path logPath;
private DataOutputStream output;
private long entriesWritten;
public enum OperationType {
PUT(1),
DELETE(2);
private final int code;
OperationType(int code) {
this.code = code;
}
public int getCode() { return code; }
public static OperationType fromCode(int code) {
for (OperationType type : values()) {
if (type.code == code) {
return type;
}
}
throw new IllegalArgumentException("Unknown operation type: " + code);
}
}
public static class LogEntry {
private final OperationType type;
private final String key;
private final byte[] value;
public LogEntry(OperationType type, String key, byte[] value) {
this.type = type;
this.key = key;
this.value = value;
}
public OperationType getType() { return type; }
public String getKey() { return key; }
public byte[] getValue() { return value; }
}
public WriteAheadLog(Path logPath) throws IOException {
this.logPath = logPath;
this.entriesWritten = 0;
boolean exists = Files.exists(logPath);
this.output = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(logPath,
StandardOpenOption.CREATE, StandardOpenOption.APPEND)));
if (!exists) {
output.writeInt(MAGIC_NUMBER);
output.writeInt(VERSION);
output.flush();
}
}
public synchronized void append(LogEntry entry) throws IOException {
output.writeInt(entry.type.getCode());
byte[] keyBytes = entry.key.getBytes("UTF-8");
output.writeInt(keyBytes.length);
output.write(keyBytes);
output.writeInt(entry.value.length);
output.write(entry.value);
output.flush();
entriesWritten++;
}
public List<LogEntry> replay() throws IOException {
List<LogEntry> entries = new ArrayList<>();
if (!Files.exists(logPath) || Files.size(logPath) == 0) {
return entries;
}
try (DataInputStream input = new DataInputStream(
new BufferedInputStream(Files.newInputStream(logPath)))) {
int magic = input.readInt();
if (magic != MAGIC_NUMBER) {
throw new IOException("Invalid WAL file");
}
int version = input.readInt();
while (input.available() > 0) {
try {
int typeCode = input.readInt();
OperationType type = OperationType.fromCode(typeCode);
int keyLen = input.readInt();
byte[] keyBytes = new byte[keyLen];
input.readFully(keyBytes);
String key = new String(keyBytes, "UTF-8");
int valueLen = input.readInt();
byte[] value = new byte[valueLen];
input.readFully(value);
entries.add(new LogEntry(type, key, value));
} catch (EOFException e) {
break;
}
}
}
return entries;
}
public synchronized void rotate() throws IOException {
close();
if (Files.exists(logPath)) {
Path archivePath = logPath.getParent().resolve(
"wal-" + System.currentTimeMillis() + ".archived");
Files.move(logPath, archivePath);
}
this.output = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(logPath,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)));
output.writeInt(MAGIC_NUMBER);
output.writeInt(VERSION);
output.flush();
entriesWritten = 0;
}
public long getEntriesWritten() {
return entriesWritten;
}
@Override
public synchronized void close() throws IOException {
if (output != null) {
output.flush();
output.close();
}
}
}