Skip to content

Commit fdb93bb

Browse files
author
haiyang.zhou
committed
refactor: replace boolean parallelExecution with configurable numThreads
Address maintainer feedback on PR #191: - Replace `boolean serialExecution` with `int numThreads` parameter in RespServerContext and RespServer.Builder - StateHolder now accepts a Map implementation via constructor: HashMap for single-thread (numThreads=1), ConcurrentHashMap for multi-thread (numThreads>1) - Scheduler always used (no bypass path), thread pool size matches numThreads: newSingleThreadExecutor for 1, newFixedThreadPool for >1 - Preserve upstream daemon thread naming ("resp-server") - Remove processCommand() if/else branching — unified scheduler path This eliminates the two concerns raised: 1. No unused scheduler in any mode 2. No unnecessary ConcurrentHashMap synchronization in single-thread mode Made-with: Cursor
1 parent d542061 commit fdb93bb

File tree

4 files changed

+60
-51
lines changed

4 files changed

+60
-51
lines changed

src/main/java/com/github/tonivade/resp/RespServer.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public static class Builder implements Recoverable {
228228
private String host = DEFAULT_HOST;
229229
private int port = DEFAULT_PORT;
230230
private CommandSuite commands = new CommandSuite();
231-
private boolean serialExecution = true;
231+
private int numThreads = 1;
232232

233233
public Builder host(String host) {
234234
this.host = host;
@@ -256,19 +256,22 @@ public Builder commands(CommandSuite commands) {
256256
}
257257

258258
/**
259-
* When enabled (default), all commands are executed on a single-thread
260-
* executor, providing global serialization. When disabled, commands
261-
* execute directly on Netty I/O threads, giving per-connection
262-
* serialization with higher throughput for stateless or thread-safe
263-
* command implementations.
259+
* Sets the number of threads for the command execution scheduler.
260+
* Default is 1 (single-thread, global serialization).
261+
* Higher values enable parallel command execution with thread-safe state access.
262+
*
263+
* @param numThreads number of threads (must be >= 1)
264264
*/
265-
public Builder parallelExecution() {
266-
this.serialExecution = false;
265+
public Builder numThreads(int numThreads) {
266+
if (numThreads < 1) {
267+
throw new IllegalArgumentException("numThreads must be >= 1");
268+
}
269+
this.numThreads = numThreads;
267270
return this;
268271
}
269272

270273
public RespServer build() {
271-
return new RespServer(new RespServerContext(host, port, commands, SessionListener.nullListener(), serialExecution));
274+
return new RespServer(new RespServerContext(host, port, commands, SessionListener.nullListener(), numThreads));
272275
}
273276
}
274277
}

src/main/java/com/github/tonivade/resp/RespServerContext.java

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import static com.github.tonivade.resp.util.Precondition.checkNonNull;
99
import static com.github.tonivade.resp.util.Precondition.checkRange;
1010
import static com.github.tonivade.resp.SessionListener.nullListener;
11+
import static java.util.concurrent.Executors.newFixedThreadPool;
1112
import static java.util.concurrent.Executors.newSingleThreadExecutor;
1213

14+
import java.util.HashMap;
1315
import java.util.Optional;
1416
import java.util.concurrent.ConcurrentHashMap;
1517
import java.util.function.Function;
@@ -30,39 +32,37 @@
3032
public class RespServerContext implements ServerContext {
3133

3234
private static final String RESP_SERVER = "resp-server";
35+
private static final int DEFAULT_NUM_THREADS = 1;
3336

3437
private static final Logger LOGGER = LoggerFactory.getLogger(RespServerContext.class);
3538

36-
private final StateHolder state = new StateHolder();
39+
private final StateHolder state;
3740
private final ConcurrentHashMap<String, Session> clients = new ConcurrentHashMap<>();
38-
private final Scheduler scheduler = Schedulers.from(newSingleThreadExecutor(runnable -> {
39-
Thread thread = new Thread(runnable);
40-
thread.setName(RESP_SERVER);
41-
thread.setDaemon(true);
42-
return thread;
43-
}));
41+
private final Scheduler scheduler;
4442

4543
private final String host;
4644
private final int port;
4745
private final CommandSuite commands;
4846
private final SessionListener sessionListener;
49-
private final boolean serialExecution;
5047

5148
public RespServerContext(String host, int port, CommandSuite commands) {
5249
this(host, port, commands, nullListener());
5350
}
5451

5552
public RespServerContext(String host, int port, CommandSuite commands, SessionListener sessionListener) {
56-
this(host, port, commands, sessionListener, true);
53+
this(host, port, commands, sessionListener, DEFAULT_NUM_THREADS);
5754
}
5855

5956
public RespServerContext(String host, int port, CommandSuite commands,
60-
SessionListener sessionListener, boolean serialExecution) {
57+
SessionListener sessionListener, int numThreads) {
6158
this.host = checkNonEmpty(host);
6259
this.port = checkRange(port, 1024, 65535);
6360
this.commands = checkNonNull(commands);
6461
this.sessionListener = checkNonNull(sessionListener);
65-
this.serialExecution = serialExecution;
62+
this.state = new StateHolder(numThreads > 1 ? new ConcurrentHashMap<>() : new HashMap<>());
63+
this.scheduler = Schedulers.from(numThreads > 1
64+
? newFixedThreadPool(numThreads, runnable -> newDaemonThread(runnable, RESP_SERVER))
65+
: newSingleThreadExecutor(runnable -> newDaemonThread(runnable, RESP_SERVER)));
6666
}
6767

6868
public void start() {
@@ -121,21 +121,12 @@ void processCommand(Request request) {
121121
LOGGER.debug("received command: {}", request);
122122

123123
var command = getCommand(request.getCommand());
124-
if (serialExecution) {
125-
try {
126-
enqueue(Observable.fromCallable(() -> executeCommand(command, request)))
127-
.subscribe(response -> processResponse(request, response),
128-
ex -> LOGGER.error("error executing command: " + request, ex));
129-
} catch (RuntimeException ex) {
130-
LOGGER.error("error executing command: " + request, ex);
131-
}
132-
} else {
133-
try {
134-
RedisToken response = executeCommand(command, request);
135-
processResponse(request, response);
136-
} catch (RuntimeException ex) {
137-
LOGGER.error("error executing command: " + request, ex);
138-
}
124+
try {
125+
enqueue(Observable.fromCallable(() -> executeCommand(command, request)))
126+
.subscribe(response -> processResponse(request, response),
127+
ex -> LOGGER.error("error executing command: " + request, ex));
128+
} catch (RuntimeException ex) {
129+
LOGGER.error("error executing command: " + request, ex);
139130
}
140131
}
141132

@@ -173,4 +164,11 @@ private void clear() {
173164
clients.clear();
174165
state.clear();
175166
}
167+
168+
private static Thread newDaemonThread(Runnable runnable, String name) {
169+
Thread thread = new Thread(runnable);
170+
thread.setName(name);
171+
thread.setDaemon(true);
172+
return thread;
173+
}
176174
}

src/main/java/com/github/tonivade/resp/StateHolder.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,21 @@
44
*/
55
package com.github.tonivade.resp;
66

7+
import java.util.HashMap;
78
import java.util.Map;
89
import java.util.Optional;
9-
import java.util.concurrent.ConcurrentHashMap;
1010

1111
public class StateHolder {
1212

13-
private final Map<String, Object> state = new ConcurrentHashMap<>();
13+
private final Map<String, Object> state;
14+
15+
public StateHolder() {
16+
this(new HashMap<>());
17+
}
18+
19+
public StateHolder(Map<String, Object> state) {
20+
this.state = state;
21+
}
1422

1523
@SuppressWarnings("unchecked")
1624
public <T> Optional<T> getValue(String key) {

src/test/java/com/github/tonivade/resp/RespServerContextTest.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -160,31 +160,31 @@ public void requireCallback() {
160160
}
161161

162162
@Test
163-
public void processCommandParallelMode() {
164-
var parallelContext = new RespServerContext(HOST, PORT, commands,
165-
SessionListener.nullListener(), false);
166-
Request request = new DefaultRequest(parallelContext, session, safeString("test"), Collections.emptyList());
163+
public void processCommandMultiThreaded() {
164+
var multiThreadContext = new RespServerContext(HOST, PORT, commands,
165+
SessionListener.nullListener(), 4);
166+
Request request = new DefaultRequest(multiThreadContext, session, safeString("test"), Collections.emptyList());
167167
when(commands.getCommand(request.getCommand())).thenReturn(respCommand);
168168
when(respCommand.execute(request)).thenReturn(nullString());
169169

170-
parallelContext.processCommand(request);
170+
multiThreadContext.processCommand(request);
171171

172-
verify(respCommand).execute(request);
173-
verify(session).publish(nullString());
172+
verify(respCommand, timeout(1000)).execute(request);
173+
verify(session, timeout(1000)).publish(nullString());
174174
}
175175

176176
@Test
177-
public void processCommandParallelModeException() {
178-
var parallelContext = new RespServerContext(HOST, PORT, commands,
179-
SessionListener.nullListener(), false);
180-
Request request = new DefaultRequest(parallelContext, session, safeString("test"), Collections.emptyList());
177+
public void processCommandMultiThreadedException() {
178+
var multiThreadContext = new RespServerContext(HOST, PORT, commands,
179+
SessionListener.nullListener(), 4);
180+
Request request = new DefaultRequest(multiThreadContext, session, safeString("test"), Collections.emptyList());
181181
when(commands.getCommand(request.getCommand())).thenReturn(respCommand);
182182
doThrow(RuntimeException.class).when(respCommand).execute(request);
183183

184-
parallelContext.processCommand(request);
184+
multiThreadContext.processCommand(request);
185185

186-
verify(respCommand).execute(request);
187-
verify(session, times(0)).publish(any());
186+
verify(respCommand, timeout(1000)).execute(request);
187+
verify(session, timeout(1000).atLeast(0)).publish(any());
188188
}
189189

190190
private Request newRequest(String command) {

0 commit comments

Comments
 (0)