Skip to content

Commit 9a80c8d

Browse files
authored
uses FileSystem sync for AOF (#91)
1 parent 3289ff1 commit 9a80c8d

File tree

4 files changed

+24
-55
lines changed

4 files changed

+24
-55
lines changed

core/src/main/java/dev/keva/core/aof/AOFContainer.java

Lines changed: 15 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,21 @@
1818
@Component
1919
public class AOFContainer {
2020
private ReentrantLock bufferLock;
21-
private List<Command> buffer;
2221
private ObjectOutputStream output;
22+
private FileDescriptor fd;
23+
private boolean alwaysFlush;
2324

2425
@Autowired
2526
private KevaConfig kevaConfig;
2627

2728
public void init() {
29+
alwaysFlush = kevaConfig.getAofInterval() == 0;
2830
bufferLock = new ReentrantLock();
29-
buffer = new ArrayList<>(64);
3031

3132
try {
32-
boolean isExists = new File(getWorkingDir() + "keva.aof").exists();
3333
FileOutputStream fos = new FileOutputStream(getWorkingDir() + "keva.aof", true);
34-
output = isExists ? new AppendOnlyObjectOutputStream(fos) : new ObjectOutputStream(fos);
34+
fd = fos.getFD();
35+
output = new ObjectOutputStream(fos);
3536
} catch (IOException e) {
3637
if (e instanceof FileNotFoundException) {
3738
log.info("AOF file not found, creating new file...");
@@ -45,11 +46,11 @@ public void init() {
4546
}
4647
}
4748

48-
if (kevaConfig.getAofInterval() != 0) {
49+
if (!alwaysFlush) {
4950
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
5051
executorService.scheduleAtFixedRate(() -> {
5152
try {
52-
sync();
53+
flush();
5354
} catch (IOException e) {
5455
log.error("Error writing AOF file", e);
5556
}
@@ -61,51 +62,28 @@ public void init() {
6162
}
6263

6364
public void write(Command command) {
64-
if (kevaConfig.getAofInterval() == 0) {
65-
syncPerMutation(command);
66-
return;
67-
}
68-
69-
bufferLock.lock();
70-
try {
71-
buffer.add(command);
72-
} finally {
73-
bufferLock.unlock();
74-
}
75-
}
76-
77-
public void sync() throws IOException {
78-
if (buffer.isEmpty()) {
79-
return;
80-
}
8165
bufferLock.lock();
8266
try {
83-
for (Command command : buffer) {
84-
output.writeObject(command.getObjects());
67+
output.writeObject(command.getObjects());
68+
if (alwaysFlush) {
69+
flush();
8570
}
71+
} catch (IOException e) {
72+
log.error("Error writing AOF file", e);
8673
} finally {
87-
output.flush();
88-
for (Command command : buffer) {
89-
command.recycle();
90-
}
91-
buffer.clear();
9274
bufferLock.unlock();
9375
}
9476
}
9577

96-
public void syncPerMutation(Command command) {
97-
try {
98-
output.writeObject(command.getObjects());
99-
output.flush();
100-
} catch (IOException e) {
101-
log.error("Error writing AOF file", e);
102-
}
78+
private void flush() throws IOException {
79+
fd.sync();
10380
}
10481

10582
public List<Command> read() throws IOException {
10683
try {
10784
List<Command> commands = new ArrayList<>(100);
10885
FileInputStream fis = new FileInputStream(getWorkingDir() + "keva.aof");
86+
log.info("AOF size is: {}", fis.getChannel().size());
10987
ObjectInputStream input = new ObjectInputStream(fis);
11088
while (true) {
11189
try {
@@ -129,15 +107,4 @@ private String getWorkingDir() {
129107
String workingDir = kevaConfig.getWorkDirectory();
130108
return workingDir.equals("./") ? "" : workingDir + "/";
131109
}
132-
133-
private static class AppendOnlyObjectOutputStream extends ObjectOutputStream {
134-
public AppendOnlyObjectOutputStream(OutputStream out) throws IOException {
135-
super(out);
136-
}
137-
138-
@Override
139-
protected void writeStreamHeader() throws IOException {
140-
reset();
141-
}
142-
}
143110
}

core/src/main/java/dev/keva/core/aof/AOFManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void init() {
3434
try {
3535
List<Command> commands = aof.read();
3636
if (commands != null) {
37-
log.info("Loading AOF file");
37+
log.info("Processing {} commands from AOF file", commands.size());
3838
for (Command command : commands) {
3939
val name = command.getName();
4040
val commandWrapper = commandMapper.getMethods().get(new BytesKey(name));

core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,7 @@ public void init() {
109109
}
110110
Object[] objects = new Object[types.length];
111111
command.toArguments(objects, types, ctx);
112-
// If not in AOF mode, then recycle(),
113-
// else, the command will be recycled in the AOF dump
114-
if (!kevaConfig.getAof() || kevaConfig.getAofInterval() == 0) {
115-
command.recycle();
116-
}
112+
command.recycle();
117113
return (Reply<?>) method.invoke(instance, objects);
118114
} finally {
119115
lock.unlock();

core/src/test/java/dev/keva/core/server/AOFTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
import lombok.val;
66
import org.junit.jupiter.api.Test;
77
import org.junit.jupiter.api.Timeout;
8+
import org.junit.jupiter.api.io.TempDir;
89
import redis.clients.jedis.Jedis;
910

11+
import java.io.IOException;
12+
import java.nio.file.Path;
1013
import java.util.concurrent.TimeUnit;
1114

1215
import static dev.keva.core.utils.PortUtil.getAvailablePort;
@@ -17,14 +20,17 @@
1720
public class AOFTest {
1821
static String host = "localhost";
1922

23+
@TempDir
24+
static Path tempDir;
25+
2026
Server startServer(int port) throws Exception {
2127
val config = KevaConfig.builder()
2228
.hostname(host)
2329
.port(port)
2430
.persistence(false)
2531
.aof(true)
2632
.aofInterval(1000)
27-
.workDirectory(System.getProperty("java.io.tmpdir"))
33+
.workDirectory(tempDir.toString())
2834
.build();
2935
val server = KevaServer.of(config);
3036
new Thread(() -> {

0 commit comments

Comments
 (0)