Skip to content

Commit c0430d8

Browse files
committed
- Only keep track in the stream group of streams we can shutdown
- Add Quic stream channel initializer in internal connection - Correctly name the connection channel handler so it can be referenced when updating the quic connection pipeline
1 parent 86755ae commit c0430d8

File tree

9 files changed

+89
-15
lines changed

9 files changed

+89
-15
lines changed

vertx-core/src/main/java/io/vertx/core/internal/quic/QuicConnectionInternal.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,27 @@
1111
package io.vertx.core.internal.quic;
1212

1313
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.channel.ChannelInitializer;
15+
import io.netty.handler.codec.quic.QuicStreamChannel;
16+
import io.vertx.core.Future;
17+
import io.vertx.core.internal.ContextInternal;
1418
import io.vertx.core.net.QuicConnection;
19+
import io.vertx.core.net.QuicStream;
20+
21+
import java.util.function.Consumer;
22+
import java.util.function.Function;
1523

1624
/**
1725
* @author <a href="mailto:[email protected]">Julien Viet</a>
1826
*/
1927
public interface QuicConnectionInternal extends QuicConnection {
2028

29+
ContextInternal context();
30+
31+
Future<QuicStream> createStream(boolean bidirectional, Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> blah);
32+
2133
ChannelHandlerContext channelHandlerContext();
2234

35+
Future<Void> closeFuture();
36+
2337
}

vertx-core/src/main/java/io/vertx/core/internal/quic/QuicStreamInternal.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.vertx.codegen.annotations.Nullable;
1414
import io.vertx.core.Handler;
1515
import io.vertx.core.buffer.Buffer;
16+
import io.vertx.core.internal.ContextInternal;
1617
import io.vertx.core.internal.net.SocketInternal;
1718
import io.vertx.core.net.QuicStream;
1819

@@ -21,6 +22,8 @@
2122
*/
2223
public interface QuicStreamInternal extends QuicStream, SocketInternal {
2324

25+
ContextInternal context();
26+
2427
@Override
2528
QuicStreamInternal messageHandler(Handler<Object> handler);
2629

vertx-core/src/main/java/io/vertx/core/net/QuicStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
@VertxGen
2525
public interface QuicStream extends Socket {
2626

27+
long id();
28+
2729
/**
2830
* @return whether the stream is unidirectional or bidirectional
2931
*/

vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,23 @@ private VertxHandler(Function<ChannelHandlerContext, C> connectionFactory) {
3939
this.connectionFactory = connectionFactory;
4040
}
4141

42+
public static ByteBuf copyBuffer(ByteBuf byteBuf) {
43+
Class<?> allocClass;
44+
if (byteBuf != Unpooled.EMPTY_BUFFER &&
45+
((allocClass = byteBuf.alloc().getClass()) == AdaptiveByteBufAllocator.class
46+
|| allocClass == PooledByteBufAllocator.class
47+
|| byteBuf instanceof CompositeByteBuf)) {
48+
if (byteBuf.isReadable()) {
49+
ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(byteBuf.readableBytes());
50+
buffer.writeBytes(byteBuf, byteBuf.readerIndex(), byteBuf.readableBytes());
51+
return buffer;
52+
} else {
53+
return Unpooled.EMPTY_BUFFER;
54+
}
55+
}
56+
return byteBuf;
57+
}
58+
4259
/**
4360
* Pooled {@code byteBuf} are copied and released, otherwise it is returned as is.
4461
*

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public Future<QuicConnection> connect(SocketAddress address, QLogConfig qLogConf
9494
protected void initChannel(Channel ch) {
9595
connectionGroup.add(ch);
9696
QuicConnectionHandler handler = new QuicConnectionHandler(context, metrics, promise::tryComplete);
97-
ch.pipeline().addLast(handler);
97+
ch.pipeline().addLast("handler", handler);
9898
}
9999
})
100100
.remoteAddress(new InetSocketAddress(address.host(), address.port()));

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicConnectionImpl.java

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212

1313
import io.netty.buffer.ByteBuf;
1414
import io.netty.buffer.Unpooled;
15-
import io.netty.channel.Channel;
16-
import io.netty.channel.ChannelFuture;
17-
import io.netty.channel.ChannelHandlerContext;
18-
import io.netty.channel.ChannelInitializer;
15+
import io.netty.channel.*;
1916
import io.netty.handler.codec.quic.QuicChannel;
2017
import io.netty.handler.codec.quic.QuicStreamChannel;
2118
import io.netty.handler.codec.quic.QuicStreamType;
@@ -42,6 +39,9 @@
4239
import javax.net.ssl.SSLEngine;
4340
import java.time.Duration;
4441
import java.util.concurrent.TimeUnit;
42+
import java.util.function.Consumer;
43+
import java.util.function.Function;
44+
import java.util.function.Supplier;
4545

4646
/**
4747
* @author <a href="mailto:[email protected]">Julien Viet</a>
@@ -93,7 +93,10 @@ public NetworkMetrics<?> metrics() {
9393
}
9494

9595
void handleStream(QuicStreamChannel streamChannel) {
96-
streamGroup.add(streamChannel);
96+
if (streamChannel.type() == QuicStreamType.BIDIRECTIONAL || streamChannel.isLocalCreated()) {
97+
// Only consider stream we can end for shutdown, e.g. this excludes remote opened HTTP/3 control stream
98+
streamGroup.add(streamChannel);
99+
}
97100
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, context, streamChannel, streamMetrics, chctx));
98101
handler.addHandler(stream -> {
99102
Handler<QuicStream> h = QuicConnectionImpl.this.handler;
@@ -163,22 +166,47 @@ public QuicConnection streamHandler(Handler<QuicStream> handler) {
163166

164167
@Override
165168
public Future<QuicStream> createStream(boolean bidirectional) {
169+
Function<Supplier<ChannelHandler>, ChannelInitializer<QuicStreamChannel>> blah = new Function<Supplier<ChannelHandler>, ChannelInitializer<QuicStreamChannel>>() {
170+
@Override
171+
public ChannelInitializer<QuicStreamChannel> apply(Supplier<ChannelHandler> channelHandlerSupplier) {
172+
return new ChannelInitializer<>() {
173+
@Override
174+
protected void initChannel(QuicStreamChannel ch) throws Exception {
175+
ChannelHandler abc = channelHandlerSupplier.get();
176+
ch.pipeline().addLast("handler", abc);
177+
}
178+
};
179+
}
180+
};
181+
Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> blah2 = new Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>>() {
182+
@Override
183+
public ChannelInitializer<QuicStreamChannel> apply(Consumer<QuicStreamChannel> quicStreamChannelConsumer) {
184+
return new ChannelInitializer<>() {
185+
@Override
186+
protected void initChannel(QuicStreamChannel ch) throws Exception {
187+
quicStreamChannelConsumer.accept(ch);
188+
}
189+
};
190+
}
191+
};
192+
return createStream(bidirectional, blah2);
193+
}
194+
195+
@Override
196+
public Future<QuicStream> createStream(boolean bidirectional, Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> blah) {
166197
// TODO : should use get or create context and test it ....
167198
Promise<QuicStream> promise = context.promise();
168199
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, context, (QuicStreamChannel) chctx.channel(), streamMetrics, chctx));
169200
handler.addHandler(stream -> {
170201
promise.tryComplete(stream);
171202
});
172203
QuicStreamType type = bidirectional ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
173-
io.netty.util.concurrent.Future<QuicStreamChannel> future = channel.createStream(type, new ChannelInitializer<>() {
174-
@Override
175-
protected void initChannel(Channel ch) {
176-
ch.pipeline().addLast("handler", handler);
177-
}
204+
ChannelInitializer<QuicStreamChannel> initializer = blah.apply(ch -> {
205+
ch.pipeline().addLast("handler", handler);
178206
});
207+
io.netty.util.concurrent.Future<QuicStreamChannel> future = channel.createStream(type, initializer);
179208
future.addListener(future1 -> {
180209
if (!future1.isSuccess()) {
181-
future1.cause().printStackTrace(System.out);
182210
promise.tryFail(future1.cause());
183211
}
184212
});

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicEndpointImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,12 @@ private Future<Channel> bind(ContextInternal context, SocketAddress bindAddr, Ss
128128
Bootstrap bootstrap = new Bootstrap()
129129
.group(context.nettyEventLoop())
130130
.channelFactory(vertx.transport().datagramChannelFactory());
131-
InetSocketAddress addr = new InetSocketAddress(bindAddr.hostName(), bindAddr.port());
131+
InetSocketAddress addr;
132+
if (bindAddr.hostAddress() != null) {
133+
addr = new InetSocketAddress(bindAddr.hostAddress(), bindAddr.port());
134+
} else {
135+
addr = new InetSocketAddress(bindAddr.hostName(), bindAddr.port());
136+
}
132137
ChannelHandler handler;
133138
try {
134139
handler = channelHandler(context, bindAddr, sslContextProvider, metrics);

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicServerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ protected void initChannel(Channel ch) {
146146
QuicChannel channel = (QuicChannel) ch;
147147
QuicConnectionHandler handler = new QuicConnectionHandler(context, metrics, QuicServerImpl.this.handler);
148148
ChannelPipeline pipeline = channel.pipeline();
149-
pipeline.addLast(handler);
149+
pipeline.addLast("handler", handler);
150150
}
151151

152152
/*

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicStreamImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class QuicStreamImpl extends SocketBase<QuicStreamImpl> implements QuicSt
4343
private final boolean localCreated;
4444
private Handler<Integer> resetHandler;
4545

46-
QuicStreamImpl(QuicConnection connection, ContextInternal context, QuicStreamChannel channel, NetworkMetrics<?> streamMetrics, ChannelHandlerContext chctx) {
46+
public QuicStreamImpl(QuicConnection connection, ContextInternal context, QuicStreamChannel channel, NetworkMetrics<?> streamMetrics, ChannelHandlerContext chctx) {
4747
super(context, chctx);
4848
this.connection = connection;
4949
this.context = context;
@@ -53,6 +53,11 @@ public class QuicStreamImpl extends SocketBase<QuicStreamImpl> implements QuicSt
5353
this.localCreated = channel.isLocalCreated();
5454
}
5555

56+
@Override
57+
public long id() {
58+
return channel.streamId();
59+
}
60+
5661
@Override
5762
public QuicStream resetHandler(@Nullable Handler<Integer> handler) {
5863
this.resetHandler = handler;

0 commit comments

Comments
 (0)