Skip to content

Commit 02f2d1d

Browse files
authored
Quic improvements
- Test Quic connection max idle timeout - Only keep track in the stream group of streams we can shutdown - Add Quic stream channel initializer in internal connection - Correctly name the connection channel handler so it can be referenced when updating the quic connection pipeline - Implement fine grained Quic connection/stream contexts.
2 parents 4a8cdd7 + 53bccea commit 02f2d1d

File tree

13 files changed

+363
-32
lines changed

13 files changed

+363
-32
lines changed

vertx-core/src/main/java/io/vertx/core/internal/quic/QuicConnectionInternal.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,33 @@
1111
package io.vertx.core.internal.quic;
1212

1313
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.channel.ChannelInitializer;
15+
import io.netty.handler.codec.quic.QuicStreamChannel;
16+
import io.vertx.core.Future;
17+
import io.vertx.core.internal.ContextInternal;
1418
import io.vertx.core.net.QuicConnection;
19+
import io.vertx.core.net.QuicStream;
20+
21+
import java.util.function.Consumer;
22+
import java.util.function.Function;
1523

1624
/**
1725
* @author <a href="mailto:[email protected]">Julien Viet</a>
1826
*/
1927
public interface QuicConnectionInternal extends QuicConnection {
2028

29+
ContextInternal context();
30+
31+
QuicConnectionInternal streamContextProvider(Function<ContextInternal, ContextInternal> provider);
32+
33+
Future<QuicStream> createStream(ContextInternal context);
34+
35+
Future<QuicStream> createStream(ContextInternal context, boolean bidirectional);
36+
37+
Future<QuicStream> createStream(ContextInternal context, boolean bidirectional, Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> initializerProvider);
38+
2139
ChannelHandlerContext channelHandlerContext();
2240

41+
Future<Void> closeFuture();
42+
2343
}

vertx-core/src/main/java/io/vertx/core/internal/quic/QuicStreamInternal.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.vertx.codegen.annotations.Nullable;
1414
import io.vertx.core.Handler;
1515
import io.vertx.core.buffer.Buffer;
16+
import io.vertx.core.internal.ContextInternal;
1617
import io.vertx.core.internal.net.SocketInternal;
1718
import io.vertx.core.net.QuicStream;
1819

@@ -21,6 +22,8 @@
2122
*/
2223
public interface QuicStreamInternal extends QuicStream, SocketInternal {
2324

25+
ContextInternal context();
26+
2427
@Override
2528
QuicStreamInternal messageHandler(Handler<Object> handler);
2629

vertx-core/src/main/java/io/vertx/core/net/QuicStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
@VertxGen
2525
public interface QuicStream extends Socket {
2626

27+
long id();
28+
2729
/**
2830
* @return whether the stream is unidirectional or bidirectional
2931
*/

vertx-core/src/main/java/io/vertx/core/net/impl/SocketBase.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
import io.netty.channel.ChannelFuture;
1717
import io.netty.channel.ChannelHandlerContext;
1818
import io.netty.channel.ChannelPromise;
19+
import io.netty.channel.EventLoop;
1920
import io.netty.util.CharsetUtil;
2021
import io.netty.util.ReferenceCounted;
2122
import io.vertx.codegen.annotations.Nullable;
2223
import io.vertx.core.Future;
2324
import io.vertx.core.Handler;
2425
import io.vertx.core.Promise;
26+
import io.vertx.core.ThreadingModel;
2527
import io.vertx.core.buffer.Buffer;
28+
import io.vertx.core.impl.EventLoopExecutor;
2629
import io.vertx.core.internal.ContextInternal;
2730
import io.vertx.core.internal.PromiseInternal;
2831
import io.vertx.core.internal.buffer.BufferInternal;
@@ -51,8 +54,16 @@ public abstract class SocketBase<S extends SocketBase<S>> extends VertxConnectio
5154

5255
public SocketBase(ContextInternal context, ChannelHandlerContext channel) {
5356
super(context, channel);
57+
58+
EventLoopExecutor executor;
59+
if (context.threadingModel() == ThreadingModel.EVENT_LOOP && context.nettyEventLoop() == chctx.executor()) {
60+
executor = (EventLoopExecutor) context.executor();
61+
} else {
62+
executor = new EventLoopExecutor((EventLoop)chctx.executor());
63+
}
64+
5465
this.messageHandler = new DataMessageHandler();
55-
this.pending = new InboundMessageQueue<>(context.eventLoop(), context.executor()) {
66+
this.pending = new InboundMessageQueue<>(executor, context.executor()) {
5667
@Override
5768
protected void handleResume() {
5869
SocketBase.this.doResume();

vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ public VertxConnection(ContextInternal context, ChannelHandlerContext chctx, boo
8383
super(context, chctx);
8484

8585
EventLoopExecutor executor;
86-
if (context.threadingModel() == ThreadingModel.EVENT_LOOP) {
86+
if (context.threadingModel() == ThreadingModel.EVENT_LOOP && context.nettyEventLoop() == chctx.executor()) {
8787
executor = (EventLoopExecutor) context.executor();
8888
} else {
89-
executor = new EventLoopExecutor(context.nettyEventLoop());
89+
executor = new EventLoopExecutor((EventLoop)chctx.executor());
9090
}
9191

9292
this.channelWritable = chctx.channel().isWritable();

vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,23 @@ private VertxHandler(Function<ChannelHandlerContext, C> connectionFactory) {
3939
this.connectionFactory = connectionFactory;
4040
}
4141

42+
public static ByteBuf copyBuffer(ByteBuf byteBuf) {
43+
Class<?> allocClass;
44+
if (byteBuf != Unpooled.EMPTY_BUFFER &&
45+
((allocClass = byteBuf.alloc().getClass()) == AdaptiveByteBufAllocator.class
46+
|| allocClass == PooledByteBufAllocator.class
47+
|| byteBuf instanceof CompositeByteBuf)) {
48+
if (byteBuf.isReadable()) {
49+
ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(byteBuf.readableBytes());
50+
buffer.writeBytes(byteBuf, byteBuf.readerIndex(), byteBuf.readableBytes());
51+
return buffer;
52+
} else {
53+
return Unpooled.EMPTY_BUFFER;
54+
}
55+
}
56+
return byteBuf;
57+
}
58+
4259
/**
4360
* Pooled {@code byteBuf} are copied and released, otherwise it is returned as is.
4461
*

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public Future<QuicConnection> connect(SocketAddress address, QLogConfig qLogConf
9494
protected void initChannel(Channel ch) {
9595
connectionGroup.add(ch);
9696
QuicConnectionHandler handler = new QuicConnectionHandler(context, metrics, promise::tryComplete);
97-
ch.pipeline().addLast(handler);
97+
ch.pipeline().addLast("handler", handler);
9898
}
9999
})
100100
.remoteAddress(new InetSocketAddress(address.host(), address.port()));

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicConnectionImpl.java

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212

1313
import io.netty.buffer.ByteBuf;
1414
import io.netty.buffer.Unpooled;
15-
import io.netty.channel.Channel;
16-
import io.netty.channel.ChannelFuture;
17-
import io.netty.channel.ChannelHandlerContext;
18-
import io.netty.channel.ChannelInitializer;
15+
import io.netty.channel.*;
1916
import io.netty.handler.codec.quic.QuicChannel;
2017
import io.netty.handler.codec.quic.QuicStreamChannel;
2118
import io.netty.handler.codec.quic.QuicStreamType;
@@ -42,6 +39,9 @@
4239
import javax.net.ssl.SSLEngine;
4340
import java.time.Duration;
4441
import java.util.concurrent.TimeUnit;
42+
import java.util.function.Consumer;
43+
import java.util.function.Function;
44+
import java.util.function.Supplier;
4545

4646
/**
4747
* @author <a href="mailto:[email protected]">Julien Viet</a>
@@ -52,6 +52,7 @@ public class QuicConnectionImpl extends ConnectionBase implements QuicConnection
5252
private final QuicChannel channel;
5353
private final TransportMetrics<?> metrics;
5454
private final ConnectionGroup streamGroup;
55+
private Function<ContextInternal, ContextInternal> streamContextProvider;
5556
private Handler<QuicStream> handler;
5657
private Handler<Buffer> datagramHandler;
5758
private QuicConnectionClose closePayload;
@@ -93,8 +94,18 @@ public NetworkMetrics<?> metrics() {
9394
}
9495

9596
void handleStream(QuicStreamChannel streamChannel) {
96-
streamGroup.add(streamChannel);
97-
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, context, streamChannel, streamMetrics, chctx));
97+
if (streamChannel.type() == QuicStreamType.BIDIRECTIONAL || streamChannel.isLocalCreated()) {
98+
// Only consider stream we can end for shutdown, e.g. this excludes remote opened HTTP/3 control stream
99+
streamGroup.add(streamChannel);
100+
}
101+
Function<ContextInternal, ContextInternal> provider = streamContextProvider;
102+
ContextInternal streamContext;
103+
if (provider != null) {
104+
streamContext = provider.apply(context);
105+
} else {
106+
streamContext = context;
107+
}
108+
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, streamContext, streamChannel, streamMetrics, chctx));
98109
handler.addHandler(stream -> {
99110
Handler<QuicStream> h = QuicConnectionImpl.this.handler;
100111
if (h != null) {
@@ -161,30 +172,66 @@ public QuicConnection streamHandler(Handler<QuicStream> handler) {
161172
return this;
162173
}
163174

175+
@Override
176+
public Future<QuicStream> createStream(ContextInternal context) {
177+
return createStream(context, true);
178+
}
179+
164180
@Override
165181
public Future<QuicStream> createStream(boolean bidirectional) {
166-
// TODO : should use get or create context and test it ....
182+
return createStream(vertx.getOrCreateContext(), bidirectional);
183+
}
184+
185+
@Override
186+
public Future<QuicStream> createStream(ContextInternal context, boolean bidirectional) {
187+
Function<Supplier<ChannelHandler>, ChannelInitializer<QuicStreamChannel>> blah = new Function<Supplier<ChannelHandler>, ChannelInitializer<QuicStreamChannel>>() {
188+
@Override
189+
public ChannelInitializer<QuicStreamChannel> apply(Supplier<ChannelHandler> channelHandlerSupplier) {
190+
return new ChannelInitializer<>() {
191+
@Override
192+
protected void initChannel(QuicStreamChannel ch) throws Exception {
193+
ChannelHandler abc = channelHandlerSupplier.get();
194+
ch.pipeline().addLast("handler", abc);
195+
}
196+
};
197+
}
198+
};
199+
Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> initializerProvider =
200+
quicStreamChannelConsumer -> new ChannelInitializer<>() {
201+
@Override
202+
protected void initChannel(QuicStreamChannel ch) throws Exception {
203+
quicStreamChannelConsumer.accept(ch);
204+
}
205+
};
206+
return createStream(context, bidirectional, initializerProvider);
207+
}
208+
209+
@Override
210+
public Future<QuicStream> createStream(ContextInternal context, boolean bidirectional, Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> initializerProvider) {
167211
Promise<QuicStream> promise = context.promise();
168212
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, context, (QuicStreamChannel) chctx.channel(), streamMetrics, chctx));
169213
handler.addHandler(stream -> {
170214
promise.tryComplete(stream);
171215
});
172216
QuicStreamType type = bidirectional ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
173-
io.netty.util.concurrent.Future<QuicStreamChannel> future = channel.createStream(type, new ChannelInitializer<>() {
174-
@Override
175-
protected void initChannel(Channel ch) {
176-
ch.pipeline().addLast("handler", handler);
177-
}
217+
ChannelInitializer<QuicStreamChannel> initializer = initializerProvider.apply(ch -> {
218+
ch.pipeline().addLast("handler", handler);
178219
});
220+
io.netty.util.concurrent.Future<QuicStreamChannel> future = channel.createStream(type, initializer);
179221
future.addListener(future1 -> {
180222
if (!future1.isSuccess()) {
181-
future1.cause().printStackTrace(System.out);
182223
promise.tryFail(future1.cause());
183224
}
184225
});
185226
return promise.future();
186227
}
187228

229+
@Override
230+
public QuicConnectionInternal streamContextProvider(Function<ContextInternal, ContextInternal> provider) {
231+
streamContextProvider = provider;
232+
return this;
233+
}
234+
188235
@Override
189236
public QuicConnection datagramHandler(Handler<Buffer> handler) {
190237
datagramHandler = handler;

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicEndpointImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,12 @@ private Future<Channel> bind(ContextInternal context, SocketAddress bindAddr, Ss
128128
Bootstrap bootstrap = new Bootstrap()
129129
.group(context.nettyEventLoop())
130130
.channelFactory(vertx.transport().datagramChannelFactory());
131-
InetSocketAddress addr = new InetSocketAddress(bindAddr.hostName(), bindAddr.port());
131+
InetSocketAddress addr;
132+
if (bindAddr.hostAddress() != null) {
133+
addr = new InetSocketAddress(bindAddr.hostAddress(), bindAddr.port());
134+
} else {
135+
addr = new InetSocketAddress(bindAddr.hostName(), bindAddr.port());
136+
}
132137
ChannelHandler handler;
133138
try {
134139
handler = channelHandler(context, bindAddr, sslContextProvider, metrics);
@@ -153,7 +158,7 @@ protected QuicCodecBuilder<?> initQuicCodecBuilder(ContextInternal context, SslC
153158
codecBuilder.initialMaxStreamDataUnidirectional(transportOptions.getInitialMaxStreamDataUnidirectional());
154159
codecBuilder.activeMigration(transportOptions.getActiveMigration());
155160
if (transportOptions.getMaxIdleTimeout() != null) {
156-
codecBuilder.maxIdleTimeout(transportOptions.getMaxIdleTimeout().get(ChronoUnit.MILLIS), TimeUnit.MILLISECONDS);
161+
codecBuilder.maxIdleTimeout(transportOptions.getMaxIdleTimeout().toMillis(), TimeUnit.MILLISECONDS);
157162
}
158163
if (transportOptions.isEnableDatagrams()) {
159164
codecBuilder.datagram(transportOptions.getDatagramReceiveQueueLength(), transportOptions.getDatagramSendQueueLength());

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicServerImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,9 @@
1010
*/
1111
package io.vertx.core.net.impl.quic;
1212

13-
import io.netty.channel.Channel;
14-
import io.netty.channel.ChannelHandler;
15-
import io.netty.channel.ChannelHandlerContext;
16-
import io.netty.channel.ChannelInitializer;
17-
import io.netty.channel.ChannelPipeline;
13+
import io.netty.channel.*;
1814
import io.netty.channel.nio.AbstractNioChannel;
15+
import io.netty.channel.socket.DatagramPacket;
1916
import io.netty.channel.socket.nio.NioDatagramChannel;
2017
import io.netty.channel.unix.UnixChannelOption;
2118
import io.netty.handler.codec.quic.InsecureQuicTokenHandler;
@@ -146,7 +143,7 @@ protected void initChannel(Channel ch) {
146143
QuicChannel channel = (QuicChannel) ch;
147144
QuicConnectionHandler handler = new QuicConnectionHandler(context, metrics, QuicServerImpl.this.handler);
148145
ChannelPipeline pipeline = channel.pipeline();
149-
pipeline.addLast(handler);
146+
pipeline.addLast("handler", handler);
150147
}
151148

152149
/*

0 commit comments

Comments
 (0)