Skip to content

Commit 0bb4732

Browse files
author
haiyang.zhou
committed
feat: add configurable parallel execution mode
Add a parallelExecution() option to RespServer.Builder that controls the command execution threading model: - Serial mode (default): commands are enqueued to a single-thread RxJava scheduler, providing global serialization. This preserves backward compatibility. - Parallel mode: commands execute directly on Netty I/O threads, giving per-connection serialization via EventLoop binding. This yields higher throughput for stateless or thread-safe command implementations. StateHolder is changed from HashMap to ConcurrentHashMap to ensure correctness when parallel execution is enabled and multiple I/O threads may access shared state concurrently. Made-with: Cursor
1 parent 160b490 commit 0bb4732

File tree

3 files changed

+38
-9
lines changed

3 files changed

+38
-9
lines changed

src/main/java/com/github/tonivade/resp/RespServer.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ public static class Builder implements Recoverable {
228228
private String host = DEFAULT_HOST;
229229
private int port = DEFAULT_PORT;
230230
private CommandSuite commands = new CommandSuite();
231+
private boolean serialExecution = true;
231232

232233
public Builder host(String host) {
233234
this.host = host;
@@ -254,8 +255,20 @@ public Builder commands(CommandSuite commands) {
254255
return this;
255256
}
256257

258+
/**
259+
* When enabled (default), all commands are executed on a single-thread
260+
* executor, providing global serialization. When disabled, commands
261+
* execute directly on Netty I/O threads, giving per-connection
262+
* serialization with higher throughput for stateless or thread-safe
263+
* command implementations.
264+
*/
265+
public Builder parallelExecution() {
266+
this.serialExecution = false;
267+
return this;
268+
}
269+
257270
public RespServer build() {
258-
return new RespServer(new RespServerContext(host, port, commands));
271+
return new RespServer(new RespServerContext(host, port, commands, SessionListener.nullListener(), serialExecution));
259272
}
260273
}
261274
}

src/main/java/com/github/tonivade/resp/RespServerContext.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,23 @@ public class RespServerContext implements ServerContext {
3838
private final int port;
3939
private final CommandSuite commands;
4040
private final SessionListener sessionListener;
41+
private final boolean serialExecution;
4142

4243
public RespServerContext(String host, int port, CommandSuite commands) {
4344
this(host, port, commands, nullListener());
4445
}
4546

4647
public RespServerContext(String host, int port, CommandSuite commands, SessionListener sessionListener) {
48+
this(host, port, commands, sessionListener, true);
49+
}
50+
51+
public RespServerContext(String host, int port, CommandSuite commands,
52+
SessionListener sessionListener, boolean serialExecution) {
4753
this.host = checkNonEmpty(host);
4854
this.port = checkRange(port, 1024, 65535);
4955
this.commands = checkNonNull(commands);
5056
this.sessionListener = checkNonNull(sessionListener);
57+
this.serialExecution = serialExecution;
5158
}
5259

5360
public void start() {
@@ -106,12 +113,21 @@ void processCommand(Request request) {
106113
LOGGER.debug("received command: {}", request);
107114

108115
var command = getCommand(request.getCommand());
109-
try {
110-
enqueue(Observable.fromCallable(() -> executeCommand(command, request)))
111-
.subscribe(response -> processResponse(request, response),
112-
ex -> LOGGER.error("error executing command: " + request, ex));
113-
} catch (RuntimeException ex) {
114-
LOGGER.error("error executing command: " + request, ex);
116+
if (serialExecution) {
117+
try {
118+
enqueue(Observable.fromCallable(() -> executeCommand(command, request)))
119+
.subscribe(response -> processResponse(request, response),
120+
ex -> LOGGER.error("error executing command: " + request, ex));
121+
} catch (RuntimeException ex) {
122+
LOGGER.error("error executing command: " + request, ex);
123+
}
124+
} else {
125+
try {
126+
RedisToken response = executeCommand(command, request);
127+
processResponse(request, response);
128+
} catch (RuntimeException ex) {
129+
LOGGER.error("error executing command: " + request, ex);
130+
}
115131
}
116132
}
117133

src/main/java/com/github/tonivade/resp/StateHolder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
*/
55
package com.github.tonivade.resp;
66

7-
import java.util.HashMap;
87
import java.util.Map;
98
import java.util.Optional;
9+
import java.util.concurrent.ConcurrentHashMap;
1010

1111
public class StateHolder {
1212

13-
private final Map<String, Object> state = new HashMap<>();
13+
private final Map<String, Object> state = new ConcurrentHashMap<>();
1414

1515
@SuppressWarnings("unchecked")
1616
public <T> Optional<T> getValue(String key) {

0 commit comments

Comments
 (0)