feat: add parallelExecution mode for direct I/O thread command execution#191
Conversation
|
i've implmented a high performace IP query serivce using RESP project. and the query service is a read only service, a stateless service. So i propose this 'parallel execution mode'. After some performance benchmarking tests, I found that using "parallel execution mode" can improve query performance a lot.
|
|
I like the idea, but I have some concerns, later I will enter into details, now I'm busy. |
|
sure, take your time. |
|
Hi @fanson, thanks for you interest to contribute. My main concern is that after this change, when the parallel execution is selected we are going to have an unused scheduler (line 38). And when the serial execution is selected we are going to have to access to the So maybe it would be better to instead to have a parallelExecution boolean property in config, add the ability to define the number of threads of the thread pool used in the scheduler, and in the other hand, pass in the constructor of wdyt? |
Address maintainer feedback on PR tonivade#191: - Replace `boolean serialExecution` with `int numThreads` parameter in RespServerContext and RespServer.Builder - StateHolder now accepts a Map implementation via constructor: HashMap for single-thread (numThreads=1), ConcurrentHashMap for multi-thread (numThreads>1) - Scheduler always used (no bypass path), thread pool size matches numThreads: newSingleThreadExecutor for 1, newFixedThreadPool for >1 - Preserve upstream daemon thread naming ("resp-server") - Remove processCommand() if/else branching — unified scheduler path This eliminates the two concerns raised: 1. No unused scheduler in any mode 2. No unnecessary ConcurrentHashMap synchronization in single-thread mode Made-with: Cursor
a6fa0a2 to
fdb93bb
Compare
|
Hi @tonivade, thanks for the great feedback! I've updated the PR to address your concerns: Changes made:
All tests pass, including new tests for multi-threaded mode and Let me know if you'd like any further adjustments! |
Performance observation with
|
| Clients | Serial (ops/s) | Parallel (ops/s) | Speedup |
|---|---|---|---|
| 1 | 75,891 | 71,897 | 0.95x |
| 2 | 64,661 | 59,885 | 0.93x |
| 4 | 126,233 | 137,764 | 1.09x |
| 8 | 135,131 | 162,182 | 1.20x |
| 16 | 111,313 | 41,108 | 0.37x |
Analysis:
The numThreads > 1 path always routes through Observable.fromCallable(...).subscribeOn(scheduler), which introduces per-command overhead:
- RxJava Observable creation + subscribe/dispose lifecycle
- Thread pool task queue submission (BlockingQueue lock contention)
- Context switch from Netty I/O thread → scheduler thread → back
For ultra-fast commands (~25ns compute), this scheduling overhead (~500-2000ns) dominates — it's 20-80x the actual work.
At 16 clients with numThreads = availableProcessors() (~10), the fixed thread pool becomes saturated. 16 Netty I/O threads compete for 10 scheduler threads through the shared BlockingQueue, causing severe lock contention. This explains the 0.37x regression.
Controlled benchmark: numThreads(N) vs parallelExecution (direct I/O)
I ran a more rigorous comparison using redis-benchmark (2M requests, pipeline=16, loopback) with a full production dataset (88MB IPv4 with 10.5M IP ranges + 36MB IPv6 with 2.8M ranges). All three configurations share the same I/O-layer optimizations (batch flush, session caching, zero-alloc parsing) to isolate the effect of the command execution strategy.
1. Serial (single-thread scheduler, HashMap state) — upstream default
| Clients | ops/s | p50 latency |
|---|---|---|
| 1 | 85,903 | 0.055ms |
| 2 | 148,943 | 0.063ms |
| 4 | 147,308 | 0.135ms |
| 8 | 154,955 | 0.575ms |
| 16 | 146,231 | 1.439ms |
| 32 | 139,266 | 3.335ms |
| 50 | 143,978 | 5.239ms |
2. numThreads(14) (RxJava FixedThreadPool, ConcurrentHashMap state) — current PR
| Clients | ops/s | vs Serial | p50 |
|---|---|---|---|
| 1 | 81,826 | -5.0% | 0.055ms |
| 2 | 138,927 | -8.1% | 0.071ms |
| 4 | 139,626 | -4.4% | 0.135ms |
| 8 | 144,540 | -6.7% | 0.623ms |
| 16 | 137,052 | -5.7% | 1.527ms |
| 32 | 136,407 | -4.5% | 3.415ms |
| 50 | 136,454 | -2.8% | 5.519ms |
numThreads(14) is consistently slower than serial at every concurrency level. The BlockingQueue contention in the RxJava scheduler adds pure overhead for fast commands.
3. parallelExecution (direct Netty I/O thread, ConcurrentHashMap, no scheduler)
This mode bypasses the RxJava scheduler entirely — commands execute directly on the Netty I/O thread that decoded them.
| Clients | ops/s | vs Serial | p50 | p50 vs Serial |
|---|---|---|---|---|
| 1 | 84,588 | -1.5% | 0.039ms | -29% |
| 2 | 155,678 | +4.5% | 0.055ms | -13% |
| 4 | 156,789 | +6.4% | 0.111ms | -18% |
| 8 | 164,826 | +6.4% | 0.543ms | -6% |
| 16 | 153,586 | +5.0% | 1.367ms | -5% |
| 32 | 145,296 | +4.3% | 3.183ms | -5% |
| 50 | 150,421 | +4.5% | 4.967ms | -5% |
Summary
For commands with sub-microsecond execution time, the RxJava subscribeOn path introduces measurable per-request overhead (Observable.fromCallable() allocation, BlockingQueue.offer()/poll() contention, thread context switch). With numThreads(N), multiple I/O threads compete for the shared queue, making it worse than serial.
Proposal
I'd like to update this PR to offer two clean modes that address your original concerns:
- Default (serial) — single-thread scheduler with
HashMap(unchanged from upstream) parallelExecution()— bypass scheduler, execute on I/O threads, useConcurrentHashMap
This cleanly avoids both issues you raised:
- No unused scheduler in parallel mode (it's never created)
- No unnecessary
ConcurrentHashMapin single-thread mode
The numThreads(N > 1) option would be removed since it's a net negative for the workloads that motivated this PR. If there's a future need for scheduler-based parallelism (e.g., commands with blocking I/O), it could be revisited as a separate feature.
What do you think?
3b19532 to
45e360d
Compare
Update: PR code now matches the proposed designI've updated the PR implementation to match the proposal above:
The diff is minimal — 4 files changed, focused solely on the execution mode switch. Ready for review when you have a chance. |
Add a `parallelExecution()` builder option that bypasses the RxJava single-thread scheduler and executes commands directly on Netty I/O threads. Changes: - RespServerContext: accept boolean parallelExecution flag; when true, skip scheduler and execute commands inline on I/O thread; use ConcurrentHashMap for thread-safe state; when false, use existing single-thread scheduler with HashMap (unchanged default behavior) - RespServer.Builder: add parallelExecution() method - StateHolder: accept Map implementation via constructor, allowing HashMap (serial) or ConcurrentHashMap (parallel) Benchmark results (redis-benchmark, 2M requests, pipeline=16, full dataset): - Serial baseline: ~155K ops/s peak - parallelExecution: ~165K ops/s peak (+6.4%), p50 latency -5% to -29% Made-with: Cursor
StateHolder's Map-based constructor is already exercised indirectly through RespServerContextTest.processCommandParallelExecution. Testing JDK's ConcurrentHashMap put/get/remove semantics adds no value. Made-with: Cursor
6be074c to
a934c11
Compare
|
Thanks a lot @fanson I see that the scheduler adds an overhead and if we are not going to serialize request it doesn't make sense to try to keep using the scheduler. I have one minor concern, I will add a comment directly in the code. |
these 4 PRs aim to improve the performance of resp |
tonivade
left a comment
There was a problem hiding this comment.
please, take a look to these some minor changes
| ex -> LOGGER.error("error executing command: " + request, ex)); | ||
| } catch (RuntimeException ex) { | ||
| LOGGER.error("error executing command: " + request, ex); | ||
| if (parallelExecution) { |
There was a problem hiding this comment.
use the condition scheduler == null
Summary
Add
parallelExecution()builder option toRespServerthat bypasses the RxJava single-thread scheduler and executes commands directly on Netty I/O threads, improving throughput for fast, stateless commands.Motivation
The default single-thread executor serializes all commands globally, which is safe but limits throughput for read-heavy, stateless workloads. For such commands, executing directly on the I/O thread that decoded the request eliminates scheduling overhead (Observable allocation, BlockingQueue contention, context switch).
Design
HashMapinStateHolder— global serialization, backward compatible, zero synchronization overhead. Unchanged from upstream.parallelExecution(): bypass scheduler entirely, execute commands on Netty I/O threads withConcurrentHashMapinStateHolder— parallel execution with thread-safe state access, no unused scheduler created.This cleanly addresses both concerns from review:
ConcurrentHashMapwhen serial mode is selectedChanges
RespServer.java: addparallelExecution()builder method (boolean flag)RespServerContext.java: acceptboolean parallelExecution; when true, skip RxJava scheduler, execute commands directly on I/O thread, useConcurrentHashMapfor state; when false, use existing single-thread scheduler withHashMap(default behavior unchanged)StateHolder.java: acceptMapimplementation via constructor (HashMapfor serial,ConcurrentHashMapfor parallel)Benchmark
redis-benchmark (2M requests, pipeline=16, full production dataset, loopback):
p50 latency improvement: -5% to -29% across concurrency levels.
Test plan