Skip to content

Commit 53bccea

Browse files
committed
Implement fine grained Quic connection/stream contexts.
1 parent c0430d8 commit 53bccea

File tree

6 files changed

+281
-22
lines changed

6 files changed

+281
-22
lines changed

vertx-core/src/main/java/io/vertx/core/internal/quic/QuicConnectionInternal.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@ public interface QuicConnectionInternal extends QuicConnection {
2828

2929
ContextInternal context();
3030

31-
Future<QuicStream> createStream(boolean bidirectional, Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> blah);
31+
QuicConnectionInternal streamContextProvider(Function<ContextInternal, ContextInternal> provider);
32+
33+
Future<QuicStream> createStream(ContextInternal context);
34+
35+
Future<QuicStream> createStream(ContextInternal context, boolean bidirectional);
36+
37+
Future<QuicStream> createStream(ContextInternal context, boolean bidirectional, Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> initializerProvider);
3238

3339
ChannelHandlerContext channelHandlerContext();
3440

vertx-core/src/main/java/io/vertx/core/net/impl/SocketBase.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
import io.netty.channel.ChannelFuture;
1717
import io.netty.channel.ChannelHandlerContext;
1818
import io.netty.channel.ChannelPromise;
19+
import io.netty.channel.EventLoop;
1920
import io.netty.util.CharsetUtil;
2021
import io.netty.util.ReferenceCounted;
2122
import io.vertx.codegen.annotations.Nullable;
2223
import io.vertx.core.Future;
2324
import io.vertx.core.Handler;
2425
import io.vertx.core.Promise;
26+
import io.vertx.core.ThreadingModel;
2527
import io.vertx.core.buffer.Buffer;
28+
import io.vertx.core.impl.EventLoopExecutor;
2629
import io.vertx.core.internal.ContextInternal;
2730
import io.vertx.core.internal.PromiseInternal;
2831
import io.vertx.core.internal.buffer.BufferInternal;
@@ -51,8 +54,16 @@ public abstract class SocketBase<S extends SocketBase<S>> extends VertxConnectio
5154

5255
public SocketBase(ContextInternal context, ChannelHandlerContext channel) {
5356
super(context, channel);
57+
58+
EventLoopExecutor executor;
59+
if (context.threadingModel() == ThreadingModel.EVENT_LOOP && context.nettyEventLoop() == chctx.executor()) {
60+
executor = (EventLoopExecutor) context.executor();
61+
} else {
62+
executor = new EventLoopExecutor((EventLoop)chctx.executor());
63+
}
64+
5465
this.messageHandler = new DataMessageHandler();
55-
this.pending = new InboundMessageQueue<>(context.eventLoop(), context.executor()) {
66+
this.pending = new InboundMessageQueue<>(executor, context.executor()) {
5667
@Override
5768
protected void handleResume() {
5869
SocketBase.this.doResume();

vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ public VertxConnection(ContextInternal context, ChannelHandlerContext chctx, boo
8383
super(context, chctx);
8484

8585
EventLoopExecutor executor;
86-
if (context.threadingModel() == ThreadingModel.EVENT_LOOP) {
86+
if (context.threadingModel() == ThreadingModel.EVENT_LOOP && context.nettyEventLoop() == chctx.executor()) {
8787
executor = (EventLoopExecutor) context.executor();
8888
} else {
89-
executor = new EventLoopExecutor(context.nettyEventLoop());
89+
executor = new EventLoopExecutor((EventLoop)chctx.executor());
9090
}
9191

9292
this.channelWritable = chctx.channel().isWritable();

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicConnectionImpl.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class QuicConnectionImpl extends ConnectionBase implements QuicConnection
5252
private final QuicChannel channel;
5353
private final TransportMetrics<?> metrics;
5454
private final ConnectionGroup streamGroup;
55+
private Function<ContextInternal, ContextInternal> streamContextProvider;
5556
private Handler<QuicStream> handler;
5657
private Handler<Buffer> datagramHandler;
5758
private QuicConnectionClose closePayload;
@@ -97,7 +98,14 @@ void handleStream(QuicStreamChannel streamChannel) {
9798
// Only consider stream we can end for shutdown, e.g. this excludes remote opened HTTP/3 control stream
9899
streamGroup.add(streamChannel);
99100
}
100-
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, context, streamChannel, streamMetrics, chctx));
101+
Function<ContextInternal, ContextInternal> provider = streamContextProvider;
102+
ContextInternal streamContext;
103+
if (provider != null) {
104+
streamContext = provider.apply(context);
105+
} else {
106+
streamContext = context;
107+
}
108+
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, streamContext, streamChannel, streamMetrics, chctx));
101109
handler.addHandler(stream -> {
102110
Handler<QuicStream> h = QuicConnectionImpl.this.handler;
103111
if (h != null) {
@@ -164,8 +172,18 @@ public QuicConnection streamHandler(Handler<QuicStream> handler) {
164172
return this;
165173
}
166174

175+
@Override
176+
public Future<QuicStream> createStream(ContextInternal context) {
177+
return createStream(context, true);
178+
}
179+
167180
@Override
168181
public Future<QuicStream> createStream(boolean bidirectional) {
182+
return createStream(vertx.getOrCreateContext(), bidirectional);
183+
}
184+
185+
@Override
186+
public Future<QuicStream> createStream(ContextInternal context, boolean bidirectional) {
169187
Function<Supplier<ChannelHandler>, ChannelInitializer<QuicStreamChannel>> blah = new Function<Supplier<ChannelHandler>, ChannelInitializer<QuicStreamChannel>>() {
170188
@Override
171189
public ChannelInitializer<QuicStreamChannel> apply(Supplier<ChannelHandler> channelHandlerSupplier) {
@@ -178,30 +196,25 @@ protected void initChannel(QuicStreamChannel ch) throws Exception {
178196
};
179197
}
180198
};
181-
Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> blah2 = new Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>>() {
199+
Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> initializerProvider =
200+
quicStreamChannelConsumer -> new ChannelInitializer<>() {
182201
@Override
183-
public ChannelInitializer<QuicStreamChannel> apply(Consumer<QuicStreamChannel> quicStreamChannelConsumer) {
184-
return new ChannelInitializer<>() {
185-
@Override
186-
protected void initChannel(QuicStreamChannel ch) throws Exception {
187-
quicStreamChannelConsumer.accept(ch);
188-
}
189-
};
202+
protected void initChannel(QuicStreamChannel ch) throws Exception {
203+
quicStreamChannelConsumer.accept(ch);
190204
}
191205
};
192-
return createStream(bidirectional, blah2);
206+
return createStream(context, bidirectional, initializerProvider);
193207
}
194208

195209
@Override
196-
public Future<QuicStream> createStream(boolean bidirectional, Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> blah) {
197-
// TODO : should use get or create context and test it ....
210+
public Future<QuicStream> createStream(ContextInternal context, boolean bidirectional, Function<Consumer<QuicStreamChannel>, ChannelInitializer<QuicStreamChannel>> initializerProvider) {
198211
Promise<QuicStream> promise = context.promise();
199212
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, context, (QuicStreamChannel) chctx.channel(), streamMetrics, chctx));
200213
handler.addHandler(stream -> {
201214
promise.tryComplete(stream);
202215
});
203216
QuicStreamType type = bidirectional ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
204-
ChannelInitializer<QuicStreamChannel> initializer = blah.apply(ch -> {
217+
ChannelInitializer<QuicStreamChannel> initializer = initializerProvider.apply(ch -> {
205218
ch.pipeline().addLast("handler", handler);
206219
});
207220
io.netty.util.concurrent.Future<QuicStreamChannel> future = channel.createStream(type, initializer);
@@ -213,6 +226,12 @@ public Future<QuicStream> createStream(boolean bidirectional, Function<Consumer<
213226
return promise.future();
214227
}
215228

229+
@Override
230+
public QuicConnectionInternal streamContextProvider(Function<ContextInternal, ContextInternal> provider) {
231+
streamContextProvider = provider;
232+
return this;
233+
}
234+
216235
@Override
217236
public QuicConnection datagramHandler(Handler<Buffer> handler) {
218237
datagramHandler = handler;

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicServerImpl.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,9 @@
1010
*/
1111
package io.vertx.core.net.impl.quic;
1212

13-
import io.netty.channel.Channel;
14-
import io.netty.channel.ChannelHandler;
15-
import io.netty.channel.ChannelHandlerContext;
16-
import io.netty.channel.ChannelInitializer;
17-
import io.netty.channel.ChannelPipeline;
13+
import io.netty.channel.*;
1814
import io.netty.channel.nio.AbstractNioChannel;
15+
import io.netty.channel.socket.DatagramPacket;
1916
import io.netty.channel.socket.nio.NioDatagramChannel;
2017
import io.netty.channel.unix.UnixChannelOption;
2118
import io.netty.handler.codec.quic.InsecureQuicTokenHandler;
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* Copyright (c) 2011-2025 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package io.vertx.tests.net.quic;
12+
13+
import io.vertx.core.Context;
14+
import io.vertx.core.Future;
15+
import io.vertx.core.Vertx;
16+
import io.vertx.core.buffer.Buffer;
17+
import io.vertx.core.internal.ContextInternal;
18+
import io.vertx.core.internal.VertxInternal;
19+
import io.vertx.core.internal.quic.QuicConnectionInternal;
20+
import io.vertx.core.net.*;
21+
import io.vertx.test.core.LinuxOrOsx;
22+
import io.vertx.test.core.VertxTestBase;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
28+
import static io.vertx.tests.net.quic.QuicClientTest.clientOptions;
29+
import static io.vertx.tests.net.quic.QuicServerTest.serverOptions;
30+
31+
@RunWith(LinuxOrOsx.class)
32+
public class QuicContextTest extends VertxTestBase {
33+
34+
private ContextInternal workerContext;
35+
private QuicServer server;
36+
private QuicClient client;
37+
38+
@Override
39+
public void setUp() throws Exception {
40+
super.setUp();
41+
QuicServerOptions serverOptions = serverOptions();
42+
QuicClientOptions clientOptions = clientOptions();
43+
serverOptions.getTransportOptions().setEnableDatagrams(true);
44+
clientOptions.getTransportOptions().setEnableDatagrams(true);
45+
server = QuicServer.create(vertx, serverOptions);
46+
client = QuicClient.create(vertx, clientOptions);
47+
workerContext = ((VertxInternal) vertx).createWorkerContext();
48+
}
49+
50+
@Override
51+
protected void tearDown() throws Exception {
52+
client.close().await();
53+
server.close().await();
54+
super.tearDown();
55+
}
56+
57+
@Test
58+
public void testServerConnectionScoped() {
59+
60+
server.handler(conn -> {
61+
assertSame(Vertx.currentContext(), workerContext);
62+
conn.streamHandler(stream -> {
63+
assertSame(Vertx.currentContext(), workerContext);
64+
stream.handler(buff -> {
65+
assertSame(Vertx.currentContext(), workerContext);
66+
stream.write(buff);
67+
});
68+
stream.endHandler(v -> {
69+
assertSame(Vertx.currentContext(), workerContext);
70+
stream.end();
71+
testComplete();
72+
});
73+
});
74+
});
75+
76+
Future.future(p -> workerContext.runOnContext(v -> server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).onComplete(p))).await();
77+
client.bind(SocketAddress.inetSocketAddress(0, "localhost")).await();
78+
QuicConnection connection = client.connect(SocketAddress.inetSocketAddress(9999, "localhost")).await();
79+
QuicStream stream = connection
80+
.createStream().await();
81+
stream.end(Buffer.buffer("ping")).await();
82+
await();
83+
}
84+
85+
@Test
86+
public void testServerStreamScoped() {
87+
88+
server.handler(conn -> {
89+
assertSame(Vertx.currentContext(), workerContext);
90+
conn.streamHandler(stream -> {
91+
assertSame(Vertx.currentContext(), workerContext);
92+
stream.handler(buff -> {
93+
assertSame(Vertx.currentContext(), workerContext);
94+
stream.write(buff);
95+
});
96+
stream.endHandler(v -> {
97+
assertSame(Vertx.currentContext(), workerContext);
98+
stream.end();
99+
testComplete();
100+
});
101+
});
102+
});
103+
104+
Future.future(p -> workerContext.runOnContext(v -> server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).onComplete(p))).await();
105+
client.bind(SocketAddress.inetSocketAddress(0, "localhost")).await();
106+
QuicConnection connection = client.connect(SocketAddress.inetSocketAddress(9999, "localhost")).await();
107+
QuicStream stream = connection.createStream().await();
108+
stream.end(Buffer.buffer("ping")).await();
109+
await();
110+
}
111+
112+
@Test
113+
public void testClientConnectionScoped() {
114+
115+
server.handler(conn -> {
116+
conn.datagramHandler(conn::writeDatagram);
117+
});
118+
119+
server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).await();
120+
client.bind(SocketAddress.inetSocketAddress(0, "localhost")).await();
121+
122+
QuicConnection connection = Future.<QuicConnection>future(p -> workerContext.runOnContext(v -> client.connect(SocketAddress.inetSocketAddress(9999, "localhost")).onComplete(p))).await();
123+
124+
connection.datagramHandler(buff -> {
125+
assertSame(workerContext, Vertx.currentContext());
126+
testComplete();
127+
});
128+
connection.writeDatagram(Buffer.buffer("ping")).await();
129+
130+
await();
131+
}
132+
133+
@Test
134+
public void testClientStreamScoped() {
135+
136+
server.handler(conn -> {
137+
conn.streamHandler(stream -> {
138+
stream.handler(buff -> stream.write(buff));
139+
stream.endHandler(v -> stream.end());
140+
});
141+
});
142+
143+
server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).await();
144+
client.bind(SocketAddress.inetSocketAddress(0, "localhost")).await();
145+
146+
QuicConnection connection = client.connect(SocketAddress.inetSocketAddress(9999, "localhost")).await();
147+
148+
QuicStream stream = Future.<QuicStream>future(p -> workerContext.runOnContext(v -> connection.createStream().onComplete(p))).await();
149+
150+
AtomicInteger cnt = new AtomicInteger();
151+
stream.handler(buff -> {
152+
assertSame(workerContext, Vertx.currentContext());
153+
cnt.incrementAndGet();
154+
});
155+
stream.endHandler(v -> {
156+
assertSame(workerContext, Vertx.currentContext());
157+
testComplete();
158+
});
159+
stream.write(Buffer.buffer("ping")).await();
160+
assertWaitUntil(() -> cnt.get() == 1);
161+
stream.end().await();
162+
163+
await();
164+
}
165+
166+
@Test
167+
public void testStreamContextProvider() {
168+
169+
server.handler(conn -> {
170+
assertNotSame(Vertx.currentContext(), workerContext);
171+
Context connectionCtx = vertx.getOrCreateContext();
172+
((QuicConnectionInternal)conn).streamContextProvider(ctx -> workerContext);
173+
conn.streamHandler(stream -> {
174+
assertSame(Vertx.currentContext(), connectionCtx);
175+
stream.handler(buff -> {
176+
assertSame(Vertx.currentContext(), workerContext);
177+
stream.write(buff);
178+
});
179+
stream.endHandler(v -> {
180+
assertSame(Vertx.currentContext(), workerContext);
181+
stream.end();
182+
testComplete();
183+
});
184+
});
185+
});
186+
187+
server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).await();
188+
client.bind(SocketAddress.inetSocketAddress(0, "localhost")).await();
189+
QuicConnection connection = client.connect(SocketAddress.inetSocketAddress(9999, "localhost")).await();
190+
QuicStream stream = connection.createStream().await();
191+
stream.end(Buffer.buffer("ping")).await();
192+
await();
193+
}
194+
195+
@Test
196+
public void testStreamContextProvided() {
197+
198+
server.handler(conn -> {
199+
conn.streamHandler(stream -> {
200+
stream.handler(buff -> stream.write(buff));
201+
stream.endHandler(v -> stream.end());
202+
});
203+
});
204+
205+
server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).await();
206+
client.bind(SocketAddress.inetSocketAddress(0, "localhost")).await();
207+
208+
QuicConnection connection = client.connect(SocketAddress.inetSocketAddress(9999, "localhost")).await();
209+
210+
QuicStream stream = ((QuicConnectionInternal)connection).createStream(workerContext).await();
211+
212+
AtomicInteger cnt = new AtomicInteger();
213+
stream.handler(buff -> {
214+
assertSame(workerContext, Vertx.currentContext());
215+
cnt.incrementAndGet();
216+
});
217+
stream.endHandler(v -> {
218+
assertSame(workerContext, Vertx.currentContext());
219+
testComplete();
220+
});
221+
stream.write(Buffer.buffer("ping")).await();
222+
assertWaitUntil(() -> cnt.get() == 1);
223+
stream.end().await();
224+
225+
await(); }
226+
}

0 commit comments

Comments
 (0)