Skip to content

Commit b091d3c

Browse files
committed
Fixes two issues related to QuicStream data transmission.
Motivation: Ending a QuicStream prematurely terminates the stream, the stream should wait until all pending messages have been written before shutting down the stream output. QuicStream message delivery does not correctly uses the pending inbound queue of messages to apply flow QuicFlowControlTest Changes: Fixes both.
1 parent 1dcc793 commit b091d3c

File tree

4 files changed

+180
-84
lines changed

4 files changed

+180
-84
lines changed

vertx-core/src/main/java/io/vertx/core/net/impl/SocketBase.java

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,8 @@ protected void handleMessage(Object msg) {
6969
context.dispatch(handler);
7070
}
7171
} else {
72-
Handler<Buffer> handler = handler();
73-
if (handler != null) {
74-
context.dispatch((Buffer) msg, handler);
75-
}
72+
MessageHandler handler = messageHandler();
73+
handler.handle(msg);
7674
}
7775
}
7876
};
@@ -110,7 +108,7 @@ public synchronized S handler(Handler<Buffer> dataHandler) {
110108
return (S) this;
111109
}
112110

113-
private synchronized Handler<Object> messageHandler() {
111+
private synchronized MessageHandler messageHandler() {
114112
return messageHandler;
115113
}
116114

@@ -121,15 +119,12 @@ public synchronized S messageHandler(Handler<Object> handler) {
121119
} else {
122120
messageHandler = new MessageHandler() {
123121
@Override
124-
public void pause() {
125-
doPause();
122+
public boolean accept(Object msg) {
123+
return true;
126124
}
127125
@Override
128-
public void fetch(long amount) {
129-
if (amount != Long.MAX_VALUE) {
130-
throw new IllegalArgumentException("Only accepts resume");
131-
}
132-
doResume();
126+
public Object transform(Object msg) {
127+
return msg;
133128
}
134129
@Override
135130
public void handle(Object msg) {
@@ -158,13 +153,13 @@ public synchronized S eventHandler(Handler<Object> handler) {
158153

159154
@Override
160155
public synchronized S pause() {
161-
messageHandler.pause();
156+
pending.pause();
162157
return (S) this;
163158
}
164159

165160
@Override
166161
public S fetch(long amount) {
167-
messageHandler.fetch(amount);
162+
pending.fetch(amount);
168163
return (S) this;
169164
}
170165

@@ -268,8 +263,15 @@ protected void handleEnd() {
268263

269264
@Override
270265
protected void handleMessage(Object msg) {
271-
Handler<Object> handler = messageHandler();
272-
handler.handle(msg);
266+
MessageHandler handler = messageHandler();
267+
if (handler.accept(msg)) {
268+
pending.write(handler.transform(msg));
269+
} else {
270+
if (msg instanceof ReferenceCounted) {
271+
ReferenceCounted refCounter = (ReferenceCounted) msg;
272+
refCounter.release();
273+
}
274+
}
273275
}
274276

275277
@Override
@@ -293,38 +295,32 @@ protected void handleEvent(Object event) {
293295
}
294296
}
295297

296-
interface MessageHandler extends Handler<Object> {
297-
void pause();
298-
void fetch(long amount);
298+
interface MessageHandler {
299+
300+
boolean accept(Object msg);
301+
302+
Object transform(Object msg);
303+
304+
void handle(Object msg);
299305
}
300306

301307
private class DataMessageHandler implements MessageHandler {
302308

303309
@Override
304-
public void handle(Object msg) {
305-
if (msg instanceof ByteBuf) {
306-
Buffer buffer = BufferInternal.safeBuffer((ByteBuf) msg);
307-
pending.write(buffer);
308-
} else {
309-
handleInvalid(msg);
310-
}
310+
public boolean accept(Object msg) {
311+
return msg instanceof ByteBuf;
311312
}
312313

313314
@Override
314-
public void pause() {
315-
pending.pause();
315+
public Object transform(Object msg) {
316+
return BufferInternal.safeBuffer((ByteBuf) msg);
316317
}
317318

318319
@Override
319-
public void fetch(long amount) {
320-
pending.fetch(amount);
321-
}
322-
323-
private void handleInvalid(Object msg) {
324-
// ByteBuf are eagerly released when the message is processed
325-
if (msg instanceof ReferenceCounted && (!(msg instanceof ByteBuf))) {
326-
ReferenceCounted refCounter = (ReferenceCounted) msg;
327-
refCounter.release();
320+
public void handle(Object msg) {
321+
Handler<Buffer> handler = handler();
322+
if (handler != null) {
323+
context.dispatch((Buffer)msg, handler);
328324
}
329325
}
330326
}

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicStreamImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
*/
1111
package io.vertx.core.net.impl.quic;
1212

13+
import io.netty.buffer.Unpooled;
1314
import io.netty.channel.ChannelFuture;
1415
import io.netty.channel.ChannelHandlerContext;
1516
import io.netty.channel.socket.ChannelInputShutdownEvent;
@@ -23,6 +24,7 @@
2324
import io.vertx.core.internal.ContextInternal;
2425
import io.vertx.core.internal.PromiseInternal;
2526
import io.vertx.core.internal.quic.QuicStreamInternal;
27+
import io.vertx.core.net.impl.MessageWrite;
2628
import io.vertx.core.net.impl.SocketBase;
2729
import io.vertx.core.net.QuicConnection;
2830
import io.vertx.core.net.QuicStream;
@@ -91,8 +93,10 @@ protected long sizeof(Object msg) {
9193
@Override
9294
public Future<Void> end() {
9395
PromiseInternal<Void> promise = context.promise();
94-
ChannelFuture shutdownPromise = channel.shutdownOutput();
95-
shutdownPromise.addListener(promise);
96+
writeToChannel(() -> {
97+
ChannelFuture shutdownPromise = channel.shutdownOutput();
98+
shutdownPromise.addListener(promise);
99+
});
96100
return promise.future();
97101
}
98102

vertx-core/src/test/java/io/vertx/tests/net/VertxConnectionTest.java

Lines changed: 37 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -373,53 +373,45 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
373373

374374
@Test
375375
public void testConsolidateFlushInDrainWhenResume() throws Exception {
376-
connectHandler = conn -> {
377-
ChannelHandlerContext ctx = conn.channelHandlerContext();
378-
ChannelPipeline pipeline = ctx.pipeline();
379-
pipeline.addBefore("handler", "myhandler", new ChannelDuplexHandler() {
380-
@Override
381-
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
382-
switch (((ByteBuf)msg).toString(StandardCharsets.UTF_8)) {
383-
case "outbound-1":
384-
conn.resume();
385-
break;
386-
}
387-
ctx.write(msg);
388-
}
389-
@Override
390-
public void flush(ChannelHandlerContext ctx) {
391-
ctx.write(Unpooled.copiedBuffer("flush", StandardCharsets.UTF_8))
392-
.addListener((ChannelFutureListener) future -> conn.channelHandlerContext().close());
393-
ctx.flush();
394-
}
395-
});
396-
conn.messageHandler(msg -> {
397-
switch (((ByteBuf)msg).toString(StandardCharsets.UTF_8)) {
398-
case "inbound-1":
399-
conn.pause();
400-
vertx.runOnContext(v -> {
401-
pipeline.fireChannelRead(Unpooled.copiedBuffer("inbound-2", StandardCharsets.UTF_8));
402-
pipeline.fireChannelReadComplete();
403-
new Thread(() -> {
404-
conn.writeMessage(Unpooled.copiedBuffer("outbound-1", StandardCharsets.UTF_8));
405-
}).start();
406-
});
407-
break;
408-
case "inbound-2":
409-
conn.writeMessage(Unpooled.copiedBuffer("outbound-2", StandardCharsets.UTF_8));
410-
break;
376+
MessageFactory factory = new MessageFactory();
377+
EmbeddedChannel ch = new EmbeddedChannel();
378+
ChannelPipeline pipeline = ch.pipeline();
379+
pipeline.addLast("handler", VertxHandler.create(chctx -> new TestConnection(chctx)));
380+
TestConnection connection = (TestConnection) pipeline.get(VertxHandler.class).getConnection();
381+
Message inbound1 = factory.next();
382+
Message inbound2 = factory.next();
383+
Message outbound1 = factory.next();
384+
Message outbound2 = factory.next();
385+
Message flush = factory.next();
386+
pipeline.addBefore("handler", "myhandler", new ChannelDuplexHandler() {
387+
@Override
388+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
389+
if (msg == outbound1) {
390+
connection.doResume();
411391
}
412-
});
413-
};
414-
NetSocket so = awaitFuture(client.connect(1234, "localhost"));
415-
Buffer received = Buffer.buffer();
416-
so.handler(received::appendBuffer);
417-
so.closeHandler(v -> {
418-
assertEquals("outbound-1outbound-2flush", received.toString());
419-
testComplete();
392+
ctx.write(msg);
393+
}
394+
@Override
395+
public void flush(ChannelHandlerContext ctx) {
396+
ctx.write(flush);
397+
ctx.flush();
398+
}
420399
});
421-
so.write("inbound-1").await();
422-
await();
400+
connection.handler = event -> {
401+
if (event == inbound1) {
402+
connection.pause();
403+
pipeline.fireChannelRead(inbound2);
404+
connection.write(outbound1, false);
405+
pipeline.fireChannelReadComplete();
406+
} else if (event == inbound2) {
407+
connection.write(outbound2, false);
408+
}
409+
};
410+
pipeline.fireChannelRead(inbound1);
411+
assertSame(outbound1, ch.readOutbound());
412+
assertSame(outbound2, ch.readOutbound());
413+
assertSame(flush, ch.readOutbound());
414+
assertNull(ch.readOutbound());
423415
}
424416

425417
@Test
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright (c) 2011-2025 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package io.vertx.tests.net.quic;
12+
13+
import io.netty.buffer.ByteBuf;
14+
import io.netty.handler.codec.http.*;
15+
import io.netty.handler.codec.http3.*;
16+
import io.vertx.core.Completable;
17+
import io.vertx.core.buffer.Buffer;
18+
import io.vertx.core.internal.buffer.BufferInternal;
19+
import io.vertx.core.internal.quic.QuicStreamInternal;
20+
import io.vertx.core.net.*;
21+
import io.vertx.core.streams.WriteStream;
22+
import io.vertx.test.core.LinuxOrOsx;
23+
import io.vertx.test.core.TestUtils;
24+
import io.vertx.test.core.VertxTestBase;
25+
import org.junit.Test;
26+
import org.junit.runner.RunWith;
27+
28+
import java.util.concurrent.CompletableFuture;
29+
30+
import static io.vertx.tests.net.quic.QuicClientTest.clientOptions;
31+
import static io.vertx.tests.net.quic.QuicServerTest.serverOptions;
32+
33+
@RunWith(LinuxOrOsx.class)
34+
public class QuicFlowControlTest extends VertxTestBase {
35+
36+
private QuicServer server;
37+
private QuicClient client;
38+
39+
@Override
40+
public void setUp() throws Exception {
41+
super.setUp();
42+
server = QuicServer.create(vertx, serverOptions());
43+
client = QuicClient.create(vertx, clientOptions());
44+
}
45+
46+
@Override
47+
protected void tearDown() throws Exception {
48+
client.close().await();
49+
server.close().await();
50+
super.tearDown();
51+
}
52+
53+
private void pump(int times, Buffer chunk, WriteStream<Buffer> writeStream, Completable<Integer> cont) {
54+
if (writeStream.writeQueueFull()) {
55+
cont.succeed(times);
56+
} else {
57+
writeStream.write(chunk);
58+
vertx.runOnContext(v -> pump(times + 1, chunk, writeStream, cont));
59+
}
60+
}
61+
62+
@Test
63+
public void testFlowControl() {
64+
CompletableFuture<Integer> latch = new CompletableFuture<>();
65+
Buffer chunk = Buffer.buffer(TestUtils.randomAlphaString(128));
66+
server.handler(conn -> {
67+
conn.streamHandler(stream -> {
68+
pump(0, chunk, stream, onSuccess2(times -> {
69+
stream.end();
70+
latch.complete(times);
71+
}));
72+
});
73+
});
74+
server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).await();
75+
client.bind(SocketAddress.inetSocketAddress(0, "localhost")).await();
76+
QuicConnection connection = client.connect(SocketAddress.inetSocketAddress(9999, "localhost")).await();
77+
connection
78+
.createStream()
79+
.onComplete(onSuccess2(stream -> {
80+
stream.pause();
81+
QuicStreamInternal streamInternal = (QuicStreamInternal) stream;
82+
Buffer expected = Buffer.buffer();
83+
latch.whenComplete((times, err) -> {
84+
for (int i = 0; i < times; i++) {
85+
expected.appendBuffer(chunk);
86+
}
87+
stream.resume();
88+
});
89+
Buffer cumulation = Buffer.buffer();
90+
streamInternal.messageHandler(msg -> {
91+
ByteBuf buff = (ByteBuf) msg;
92+
Buffer buffer = BufferInternal.safeBuffer(buff);
93+
cumulation.appendBuffer(buffer);
94+
});
95+
streamInternal.endHandler(v -> {
96+
assertEquals(expected, cumulation);
97+
testComplete();
98+
});
99+
stream.write("ping");
100+
}));
101+
102+
await();
103+
}
104+
}

0 commit comments

Comments
 (0)