Skip to content

Commit 40bdf1e

Browse files
committed
VertxConnection message handling fast path: avoid creating a pending queue when handling a message in VertxConnection when it is not necessary.
1 parent c5f7743 commit 40bdf1e

File tree

2 files changed

+75
-15
lines changed

2 files changed

+75
-15
lines changed

vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public class VertxConnection extends ConnectionBase {
6161
private Handler<Void> shutdownHandler;
6262

6363
// State accessed exclusively from the event loop thread
64-
private final Deque<Object> pending;
64+
private Deque<Object> pending;
65+
private boolean reentrant;
6566
private boolean read;
6667
private boolean needsFlush;
6768
private boolean draining;
@@ -75,7 +76,6 @@ public VertxConnection(ContextInternal context, ChannelHandlerContext chctx) {
7576
this.channelWritable = chctx.channel().isWritable();
7677
this.messageQueue = new InternalMessageChannel(chctx.channel().eventLoop());
7778
this.voidPromise = new VoidChannelPromise(chctx.channel(), false);
78-
this.pending = new ArrayDeque<>();
7979
this.autoRead = true;
8080
}
8181

@@ -218,26 +218,59 @@ void channelWritabilityChanged() {
218218
* This method is exclusively called by {@code VertxHandler} to read a message on the event-loop thread.
219219
*/
220220
final void read(Object msg) {
221-
read = true;
222221
if (METRICS_ENABLED) {
223222
reportBytesRead(msg);
224223
}
224+
read = true;
225+
if (!reentrant && !paused && pending == null) {
226+
// Fast path
227+
reentrant = true;
228+
try {
229+
handleMessage(msg);
230+
} finally {
231+
reentrant = false;
232+
}
233+
} else {
234+
addPending(msg);
235+
}
236+
}
237+
238+
private void addPending(Object msg) {
239+
if (pending == null) {
240+
pending = new ArrayDeque<>();
241+
}
225242
pending.add(msg);
226-
checkPendingMessages();
243+
if (!reentrant) {
244+
checkPendingMessages();
245+
}
227246
}
228247

229248
/**
230249
* This method is exclusively called by {@code VertxHandler} to signal read completion on the event-loop thread.
231250
*/
232251
final void readComplete() {
233252
if (read) {
234-
checkPendingMessages();
253+
if (pending != null) {
254+
checkPendingMessages();
255+
}
235256
read = false;
236257
checkFlush();
237258
checkAutoRead();
238259
}
239260
}
240261

262+
private void checkPendingMessages() {
263+
Object msg;
264+
reentrant = true;
265+
try {
266+
while (!paused && (msg = pending.poll()) != null) {
267+
handleMessage(msg);
268+
}
269+
} finally {
270+
reentrant = false;
271+
}
272+
}
273+
241274
public final void doPause() {
242275
assert chctx.executor().inEventLoop();
243276
paused = true;
@@ -272,25 +305,18 @@ private void checkFlush() {
272305

273306
private void checkAutoRead() {
274307
if (autoRead) {
275-
if (pending.size() >= 8) {
308+
if (pending != null && pending.size() >= 8) {
276309
autoRead = false;
277310
chctx.channel().config().setAutoRead(false);
278311
}
279312
} else {
280-
if (pending.isEmpty()) {
313+
if (pending == null || pending.isEmpty()) {
281314
autoRead = true;
282315
chctx.channel().config().setAutoRead(true);
283316
}
284317
}
285318
}
286319

287-
private void checkPendingMessages() {
288-
Object msg;
289-
while (!paused && (msg = pending.poll()) != null) {
290-
handleMessage(msg);
291-
}
292-
}
293-
294320
/**
295321
* Like {@link #write(Object, boolean, ChannelPromise)}.
296322
*/

vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,37 @@ public void flush(ChannelHandlerContext ctx) {
420420
await();
421421
}
422422

423+
@Test
424+
public void testReentrantRead() throws Exception {
425+
connectHandler = conn -> {
426+
VertxConnection vertxConn = (VertxConnection) conn;
427+
ChannelHandlerContext ctx = conn.channelHandlerContext();
428+
ChannelPipeline pipeline = ctx.pipeline();
429+
AtomicInteger reentrant = new AtomicInteger();
430+
conn.messageHandler(msg -> {
431+
assertEquals(0, reentrant.getAndIncrement());
432+
switch (((ByteBuf)msg).toString(StandardCharsets.UTF_8)) {
433+
case "inbound-1":
434+
pipeline.fireChannelRead(Unpooled.copiedBuffer("inbound-2", StandardCharsets.UTF_8));
435+
break;
436+
case "inbound-2":
437+
conn.end(Buffer.buffer("outbound-1"));
438+
break;
439+
}
440+
reentrant.decrementAndGet();
441+
});
442+
};
443+
NetSocket so = awaitFuture(client.connect(1234, "localhost"));
444+
Buffer received = Buffer.buffer();
445+
so.handler(received::appendBuffer);
446+
so.closeHandler(v -> {
447+
assertEquals("outbound-1", received.toString());
448+
testComplete();
449+
});
450+
so.write("inbound-1").await();
451+
await();
452+
}
453+
423454
@Test
424455
public void testResumeWhenRead() throws Exception {
425456
connectHandler = conn -> {
@@ -434,13 +465,16 @@ public void testResumeWhenRead() throws Exception {
434465
vertxConn.doResume();
435466
break;
436467
case "inbound-2":
437-
conn.close();
468+
conn.end(Buffer.buffer("outbound-1"));
438469
break;
439470
}
440471
});
441472
};
442473
NetSocket so = awaitFuture(client.connect(1234, "localhost"));
474+
Buffer received = Buffer.buffer();
475+
so.handler(received::appendBuffer);
443476
so.closeHandler(v -> {
477+
assertEquals("outbound-1", received.toString());
444478
testComplete();
445479
});
446480
so.write("inbound-1").await();

0 commit comments

Comments
 (0)