Skip to content

Commit 137eeff

Browse files
authored
replication base (#146)
* config improvement (#104) * add config improvement * fix unit test fails * fix unit test fails * fix unit test fails * fix unit test fails * fix container name on docs (#106) Co-authored-by: Nguyen Dang Dai Hiep <[email protected]> * #18 add base classes for replication feature * fix bug making server failed to start * add base of communication flow * add partial sync overall logic * add PSYNC command and outline for master logic * update PSYNC impl and add Master Test * update ReplicationBuffer dump method, test cmd replicating * add base for REPLCONF command for master-slave communication * implementing REPLCONF, add slave context * update slave context when REPLCONF and start cmd forwarding thread when receive REPLCONF ACK * working replication with 1 master 1 slave * resolve conflict, update code to match * add 200ms sleep before checking values replicated * fix AOF test assume dump file already present
1 parent 0e807f5 commit 137eeff

File tree

21 files changed

+794
-9
lines changed

21 files changed

+794
-9
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ subprojects {
2828

2929
// SLF4J as a facade
3030
implementation 'org.slf4j:slf4j-api:1.7.2'
31+
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'
3132
}
3233

3334
publishing {

core/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies {
2424
implementation 'org.reflections:reflections:0.10.2'
2525
implementation 'org.apache.commons:commons-lang3:3.12.0'
2626

27-
testImplementation 'redis.clients:jedis:3.7.0'
27+
implementation 'redis.clients:jedis:3.7.0'
2828
}
2929

3030
publishing {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package dev.keva.core.command.impl.replication;
2+
3+
import com.google.common.io.Files;
4+
import dev.keva.core.command.annotation.CommandImpl;
5+
import dev.keva.core.command.annotation.Execute;
6+
import dev.keva.core.command.annotation.Mutate;
7+
import dev.keva.core.command.annotation.ParamLength;
8+
import dev.keva.core.config.KevaConfig;
9+
import dev.keva.core.replication.ConnSlaveMap;
10+
import dev.keva.core.replication.ReplicationBuffer;
11+
import dev.keva.core.replication.SlaveContext;
12+
import dev.keva.ioc.annotation.Autowired;
13+
import dev.keva.ioc.annotation.Component;
14+
import dev.keva.protocol.resp.reply.BulkReply;
15+
import dev.keva.protocol.resp.reply.MultiBulkReply;
16+
import dev.keva.protocol.resp.reply.Reply;
17+
import dev.keva.protocol.resp.reply.StatusReply;
18+
import io.netty.channel.ChannelHandlerContext;
19+
import lombok.SneakyThrows;
20+
21+
import java.io.File;
22+
import java.util.ArrayList;
23+
24+
import static dev.keva.core.command.annotation.ParamLength.Type.EXACT;
25+
26+
@Component
27+
@CommandImpl("psync")
28+
@ParamLength(type = EXACT, value = 2)
29+
@Mutate
30+
public class Psync {
31+
32+
private final ReplicationBuffer repBuffer;
33+
private final String persistenceFilePath;
34+
private final ConnSlaveMap connSlaveMap;
35+
36+
@Autowired
37+
public Psync(KevaConfig kevaConfig, ReplicationBuffer repBuffer, ConnSlaveMap connSlaveMap) {
38+
this.repBuffer = repBuffer;
39+
this.persistenceFilePath = kevaConfig.getWorkDirectory() + "/dump.kdb";
40+
this.connSlaveMap = connSlaveMap;
41+
}
42+
43+
@Execute
44+
public StatusReply execute(byte[] replicationId,
45+
byte[] startingOffset,
46+
ChannelHandlerContext ctx) {
47+
// PSYNC replicationId startingOffset
48+
String repId = new String(replicationId);
49+
String masterRepId = String.valueOf(repBuffer.getReplicationId());
50+
long slaveStartingOffset = Long.parseLong(new String(startingOffset));
51+
boolean needFullResync = !masterRepId.equalsIgnoreCase(repId) || repBuffer.getStartingOffset() > slaveStartingOffset;
52+
ArrayList<String> cmdList = repBuffer.dump();
53+
54+
SlaveContext slaveContext = connSlaveMap.get(SlaveContext.getConnKey(ctx.channel().remoteAddress()));
55+
slaveContext.setStatus(SlaveContext.Status.SYNCING);
56+
57+
if (needFullResync) {
58+
ctx.write(new BulkReply("FULLRESYNC " + repBuffer.getReplicationId() + " " + repBuffer.getStartingOffset()));
59+
60+
// send snapshot
61+
ctx.write(new BulkReply(readSnapshotFileToString()));
62+
63+
// send buffered commands
64+
Reply<?>[] cmdReplies = new Reply[cmdList.size()];
65+
for (int i = 0; i < cmdList.size(); i++) {
66+
String cmd = cmdList.get(i);
67+
cmdReplies[i] = new BulkReply(cmd);
68+
}
69+
ctx.write(new MultiBulkReply(cmdReplies));
70+
} else {
71+
ctx.write(new StatusReply("CONTINUE"));
72+
ctx.write(new BulkReply(cmdList.toString()));
73+
}
74+
75+
return StatusReply.OK;
76+
}
77+
78+
@SneakyThrows
79+
private byte[] readSnapshotFileToString() {
80+
return Files.asByteSource(new File(persistenceFilePath)).read();
81+
}
82+
83+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package dev.keva.core.command.impl.replication;
2+
3+
import dev.keva.core.command.annotation.CommandImpl;
4+
import dev.keva.core.command.annotation.Execute;
5+
import dev.keva.core.command.annotation.Mutate;
6+
import dev.keva.core.command.annotation.ParamLength;
7+
import dev.keva.core.replication.ConnSlaveMap;
8+
import dev.keva.core.replication.ReplConstants;
9+
import dev.keva.core.replication.ReplicationBuffer;
10+
import dev.keva.core.replication.SlaveContext;
11+
import dev.keva.ioc.annotation.Autowired;
12+
import dev.keva.ioc.annotation.Component;
13+
import dev.keva.protocol.resp.reply.Reply;
14+
import dev.keva.protocol.resp.reply.StatusReply;
15+
import io.netty.channel.ChannelHandlerContext;
16+
import lombok.extern.slf4j.Slf4j;
17+
import org.apache.commons.lang3.StringUtils;
18+
19+
import static dev.keva.core.command.annotation.ParamLength.Type.AT_LEAST;
20+
21+
@Component
22+
@CommandImpl("replconf")
23+
@ParamLength(type = AT_LEAST, value = 1)
24+
@Mutate
25+
@Slf4j
26+
public class Replconf {
27+
28+
private final ConnSlaveMap connSlaveMap;
29+
private final ReplicationBuffer repBuffer;
30+
31+
@Autowired
32+
public Replconf(ConnSlaveMap connSlaveMap, ReplicationBuffer repBuffer) {
33+
this.connSlaveMap = connSlaveMap;
34+
this.repBuffer = repBuffer;
35+
}
36+
37+
@Execute
38+
public Reply<String> execute(byte[][] args, ChannelHandlerContext ctx) {
39+
String connKey = SlaveContext.getConnKey(ctx.channel().remoteAddress());
40+
SlaveContext slaveContext;
41+
if (connSlaveMap.contains(connKey)) {
42+
slaveContext = connSlaveMap.get(connKey);
43+
} else {
44+
slaveContext = SlaveContext.builder()
45+
.status(SlaveContext.Status.STARTING)
46+
.build();
47+
}
48+
49+
String[] cmdArgs = StringUtils.split(new String(args[0]), " ");
50+
// REPLCONF ACK offset
51+
if (cmdArgs[0].equalsIgnoreCase("ACK")) {
52+
slaveContext.setOffset(Long.parseLong(cmdArgs[1]));
53+
54+
// mark slave as online
55+
if (slaveContext.getStatus() == SlaveContext.Status.SYNCING) {
56+
slaveContext.setStatus(SlaveContext.Status.ONLINE);
57+
}
58+
// start sending buffered commands periodically
59+
new Thread(() -> slaveContext.startForwardCommandJob(repBuffer)).start();
60+
log.info("Started command forwarding thread for slave {}", slaveContext.slaveName());
61+
62+
return StatusReply.OK;
63+
}
64+
65+
for (int i = 0; i < cmdArgs.length; i++) {
66+
String option = cmdArgs[i];
67+
if (option.equalsIgnoreCase(ReplConstants.IP_ADDRESS)) {
68+
slaveContext.setIpAddress(cmdArgs[i + 1]);
69+
continue;
70+
}
71+
if (option.equalsIgnoreCase(ReplConstants.LISTENING_PORT)) {
72+
slaveContext.setPort(cmdArgs[i + 1]);
73+
}
74+
}
75+
log.info("Slave context {}", slaveContext.toString());
76+
connSlaveMap.put(connKey, slaveContext);
77+
return StatusReply.OK;
78+
}
79+
}

core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import dev.keva.core.command.impl.transaction.manager.TransactionContext;
1010
import dev.keva.core.command.impl.transaction.manager.TransactionManager;
1111
import dev.keva.core.config.KevaConfig;
12+
import dev.keva.core.replication.ReplicationBuffer;
1213
import dev.keva.ioc.KevaIoC;
1314
import dev.keva.ioc.annotation.Autowired;
1415
import dev.keva.ioc.annotation.Component;
@@ -54,6 +55,9 @@ public class CommandMapper {
5455
@Autowired
5556
private AOFContainer aof;
5657

58+
@Autowired
59+
private ReplicationBuffer repBuf;
60+
5761
public void init() {
5862
Reflections reflections = new Reflections("dev.keva.core.command.impl");
5963
Set<Class<?>> annotated = reflections.getTypesAnnotatedWith(CommandImpl.class);
@@ -110,6 +114,7 @@ public void init() {
110114
log.error("Error writing to AOF", e);
111115
}
112116
}
117+
repBuf.buffer(command);
113118
Object[] objects = new Object[types.length];
114119
command.toArguments(objects, types, ctx);
115120
command.recycle();

core/src/main/java/dev/keva/core/config/KevaConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public class KevaConfig {
4848
@CliProp(name = "io-threads", type = CliPropType.VAL)
4949
private Integer ioThreads;
5050

51+
@ConfigProp(name = "replicaof", defaultVal = "")
52+
@CliProp(name = "replicaof", type = CliPropType.VAL)
53+
private String replicaOf;
54+
5155
@Bean
5256
public static KevaConfig ofDefaults() {
5357
return builder()
@@ -57,6 +61,7 @@ public static KevaConfig ofDefaults() {
5761
.persistence(true)
5862
.aof(false)
5963
.aofInterval(1000)
64+
.ioThreads(-1)
6065
.build();
6166
}
6267
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package dev.keva.core.replication;
2+
3+
import dev.keva.ioc.annotation.Component;
4+
import lombok.extern.slf4j.Slf4j;
5+
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
9+
@Component
10+
@Slf4j
11+
public class ConnSlaveMap {
12+
private static final Map<String, SlaveContext> connSlaveMap = new HashMap<>();
13+
14+
public void put(String key, SlaveContext ctx) {
15+
connSlaveMap.put(key, ctx);
16+
}
17+
18+
public SlaveContext get(String key) {
19+
return connSlaveMap.get(key);
20+
}
21+
22+
public boolean contains(String key) {
23+
return connSlaveMap.containsKey(key);
24+
}
25+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package dev.keva.core.replication;
2+
3+
public class ReplConstants {
4+
public static final String IP_ADDRESS = "ip-address";
5+
public static final String LISTENING_PORT = "listening-port";
6+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package dev.keva.core.replication;
2+
3+
import dev.keva.ioc.annotation.Component;
4+
import dev.keva.protocol.resp.Command;
5+
import lombok.Getter;
6+
import lombok.extern.slf4j.Slf4j;
7+
8+
import java.util.ArrayList;
9+
import java.util.Arrays;
10+
import java.util.List;
11+
import java.util.concurrent.ConcurrentLinkedDeque;
12+
import java.util.stream.Collectors;
13+
14+
@Component
15+
@Slf4j
16+
public class ReplicationBuffer {
17+
18+
@Getter
19+
private ConcurrentLinkedDeque<Command> buffer;
20+
@Getter
21+
private long currentOffset = 0;
22+
@Getter
23+
private long startingOffset = 0;
24+
@Getter
25+
private long currentSize = 0;
26+
@Getter
27+
private long limit; // in bytes
28+
@Getter
29+
private long replicationId;
30+
31+
private final List<ConcurrentLinkedDeque<Command>> slaveBuffers = new ArrayList<>();
32+
33+
public void init() {
34+
buffer = new ConcurrentLinkedDeque<>();
35+
replicationId = System.currentTimeMillis();
36+
limit = 1024 * 1024; // default to 1 MB
37+
}
38+
39+
public void rebase(long replicationId, long startingOffset) {
40+
this.startingOffset = startingOffset;
41+
this.currentOffset = startingOffset;
42+
this.replicationId = replicationId;
43+
this.currentSize = 0;
44+
buffer.clear();
45+
}
46+
47+
private boolean isWriteCommand(byte[] cmdName) {
48+
return Arrays.stream(WriteCommand.values()).anyMatch(writeCommand -> writeCommand.name().equalsIgnoreCase(new String(cmdName)));
49+
}
50+
51+
public void buffer(Command command) {
52+
if (!isWriteCommand(command.getName())) {
53+
return;
54+
}
55+
if (currentSize >= limit) {
56+
Command removed = buffer.removeFirst();
57+
startingOffset++;
58+
currentSize = currentSize - removed.getByteSize();
59+
}
60+
// need a new instance because the original object will get recycled
61+
Command cmdInstance = Command.newInstance(command.getObjects(), false);
62+
buffer.add(cmdInstance);
63+
currentOffset++;
64+
currentSize += currentSize + command.getByteSize();
65+
slaveBuffers.forEach(buf -> {
66+
buf.addLast(cmdInstance);
67+
});
68+
log.trace(Arrays.toString(buffer.toArray()));
69+
}
70+
71+
public ArrayList<String> dump() {
72+
return buffer.stream().map(cmd -> cmd.toCommandString(true))
73+
.collect(Collectors.toCollection(ArrayList::new));
74+
}
75+
76+
public void register(ConcurrentLinkedDeque<Command> slaveBuffer) {
77+
slaveBuffers.add(slaveBuffer);
78+
}
79+
80+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package dev.keva.core.replication;
2+
3+
import redis.clients.jedis.commands.ProtocolCommand;
4+
import redis.clients.jedis.util.SafeEncoder;
5+
6+
public enum ReplicationCommand implements ProtocolCommand {
7+
PSYNC, // for master - slave data synchronization
8+
REPLCONF // for master - slave information
9+
;
10+
11+
private final byte[] raw;
12+
13+
ReplicationCommand() {
14+
raw = SafeEncoder.encode(name());
15+
}
16+
17+
@Override
18+
public byte[] getRaw() {
19+
return raw;
20+
21+
}
22+
23+
}

0 commit comments

Comments
 (0)