Skip to content

Commit e451184

Browse files
author
haiyang.zhou
committed
perf: batch flush with write() and channelReadComplete()
Change Session.publish() from writeAndFlush() to write(), and add channelReadComplete() in RespConnectionHandler to flush once after all messages in a read batch are processed. This reduces the number of system calls when handling pipelined commands — instead of one flush per message, a single flush covers the entire batch. For the async serial execution path (default mode), responses are written from the scheduler thread after channelReadComplete has already fired, so processResponse() explicitly calls flush() after each write to ensure delivery. A default no-op flush() is added to the Session interface for backward compatibility with custom implementations. Made-with: Cursor
1 parent 160b490 commit e451184

File tree

4 files changed

+13
-1
lines changed

4 files changed

+13
-1
lines changed

src/main/java/com/github/tonivade/resp/RespConnectionHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
4141
}
4242
}
4343

44+
@Override
45+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
46+
ctx.flush();
47+
}
48+
4449
@Override
4550
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
4651
LOGGER.debug("channel inactive");

src/main/java/com/github/tonivade/resp/RespServerContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ protected <T> Observable<T> enqueue(Observable<T> observable) {
140140

141141
private void processResponse(Request request, RedisToken token) {
142142
request.getSession().publish(token);
143+
request.getSession().flush();
143144
if (request.isExit()) {
144145
request.getSession().close();
145146
}

src/main/java/com/github/tonivade/resp/command/DefaultSession.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,12 @@ public String getId() {
3636

3737
@Override
3838
public void publish(RedisToken msg) {
39-
ctx.writeAndFlush(msg);
39+
ctx.write(msg);
40+
}
41+
42+
@Override
43+
public void flush() {
44+
ctx.flush();
4045
}
4146

4247
@Override

src/main/java/com/github/tonivade/resp/command/Session.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
public interface Session {
1212
String getId();
1313
void publish(RedisToken msg);
14+
default void flush() {}
1415
void close();
1516
void destroy();
1617
<T> Optional<T> getValue(String key);

0 commit comments

Comments
 (0)