diff --git a/vertx-core/src/main/java/io/vertx/core/Vertx.java b/vertx-core/src/main/java/io/vertx/core/Vertx.java index 534357dcd59..f794e7cdbe4 100644 --- a/vertx-core/src/main/java/io/vertx/core/Vertx.java +++ b/vertx-core/src/main/java/io/vertx/core/Vertx.java @@ -229,6 +229,22 @@ default NetClient createNetClient() { */ HttpServer createHttpServer(HttpServerOptions options); + /** + * Create an HTTP3 client using the specified options + * + * @param options the options to use + * @return the server + */ + HttpClientAgent createHttpClient(Http3ClientOptions options); + + /** + * Create an HTTP3 server using the specified options + * + * @param options the options to use + * @return the server + */ + HttpServer createHttpServer(Http3ServerOptions options); + /** * Create an HTTP/HTTPS server using default options * diff --git a/vertx-core/src/main/java/io/vertx/core/http/Http3ClientOptions.java b/vertx-core/src/main/java/io/vertx/core/http/Http3ClientOptions.java new file mode 100644 index 00000000000..304e68a4097 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/Http3ClientOptions.java @@ -0,0 +1,21 @@ +package io.vertx.core.http; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.core.net.QLogConfig; +import io.vertx.core.net.QuicClientOptions; + +@DataObject +public class Http3ClientOptions extends QuicClientOptions { + + public Http3ClientOptions() { + } + + public Http3ClientOptions(Http3ClientOptions other) { + super(other); + } + + @Override + public Http3ClientOptions setQLogConfig(QLogConfig qLogConfig) { + return (Http3ClientOptions)super.setQLogConfig(qLogConfig); + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/Http3ServerOptions.java b/vertx-core/src/main/java/io/vertx/core/http/Http3ServerOptions.java new file mode 100644 index 00000000000..9ed5ce72658 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/Http3ServerOptions.java @@ -0,0 +1,45 @@ +package io.vertx.core.http; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.core.net.KeyCertOptions; +import io.vertx.core.net.QLogConfig; +import io.vertx.core.net.QuicClientAddressValidation; +import io.vertx.core.net.QuicServerOptions; + +import java.time.Duration; + +@DataObject +public class Http3ServerOptions extends QuicServerOptions { + + public Http3ServerOptions() { + } + + public Http3ServerOptions(Http3ServerOptions other) { + super(other); + } + + @Override + public Http3ServerOptions setQLogConfig(QLogConfig qLogConfig) { + return (Http3ServerOptions)super.setQLogConfig(qLogConfig); + } + + @Override + public Http3ServerOptions setLoadBalanced(boolean loadBalanced) { + return (Http3ServerOptions)super.setLoadBalanced(loadBalanced); + } + + @Override + public Http3ServerOptions setClientAddressValidation(QuicClientAddressValidation clientAddressValidation) { + return (Http3ServerOptions)super.setClientAddressValidation(clientAddressValidation); + } + + @Override + public Http3ServerOptions setClientAddressValidationTimeWindow(Duration clientAddressValidationTimeWindow) { + return (Http3ServerOptions)super.setClientAddressValidationTimeWindow(clientAddressValidationTimeWindow); + } + + @Override + public Http3ServerOptions setClientAddressValidationKey(KeyCertOptions validationKey) { + return (Http3ServerOptions)super.setClientAddressValidationKey(validationKey); + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http3ChannelConnector.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http3ChannelConnector.java new file mode 100644 index 00000000000..39c959181e9 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http3ChannelConnector.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2011-2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http.impl; + +import io.netty.handler.codec.http3.Http3; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.Http3ClientOptions; +import io.vertx.core.http.impl.http3.Http3ClientConnection; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.VertxInternal; +import io.vertx.core.internal.quic.QuicConnectionInternal; +import io.vertx.core.net.*; +import io.vertx.core.spi.metrics.ClientMetrics; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class Http3ChannelConnector implements HttpChannelConnector { + + private final VertxInternal vertx; + private final Lock lock; + private Future clientFuture; + private final Http3ClientOptions options; + + public Http3ChannelConnector(VertxInternal vertxInternal, Http3ClientOptions options) { + + options = new Http3ClientOptions(options); + options.getSslOptions().setApplicationLayerProtocols(Arrays.asList(Http3.supportedApplicationProtocols())); + options.getTransportOptions().setInitialMaxData(10000000L); + options.getTransportOptions().setInitialMaxStreamDataBidirectionalLocal(1000000L); + options.getTransportOptions().setInitialMaxStreamDataBidirectionalRemote(1000000L); + options.getTransportOptions().setInitialMaxStreamDataUnidirectional(1000000L); + options.getTransportOptions().setInitialMaxStreamsBidirectional(100L); + options.getTransportOptions().setInitialMaxStreamsUnidirectional(100L); + + this.vertx = vertxInternal; + this.lock = new ReentrantLock(); + this.options = options; + } + + @Override + public Future httpConnect(ContextInternal context, SocketAddress server, HostAndPort authority, HttpConnectParams params, long maxLifetimeMillis, ClientMetrics metrics) { + + lock.lock(); + Future fut = clientFuture; + if (fut == null) { + QuicClient client = QuicClient.create(vertx, this.options); + fut = client.bind(SocketAddress.inetSocketAddress(0, "localhost")).map(client); + clientFuture = fut; + lock.unlock(); + } else { + lock.unlock(); + } + Promise promise = context.promise(); + + fut.onComplete((res, err) -> { + if (err == null) { + Future f = res.connect(server); + f.onComplete((res2, err2) -> { + if (err2 == null) { + Http3ClientConnection c = new Http3ClientConnection((QuicConnectionInternal) res2); + c.init(); + promise.complete(c); + } else { + promise.fail(err2); + } + }); + } else { + promise.fail(err); + } + }); + + return promise.future(); + } + + @Override + public Future shutdown(Duration timeout) { + if (clientFuture == null) { + return vertx.getOrCreateContext().succeededFuture(); + } else { + return clientFuture.compose(client -> client.shutdown(timeout)); + } + } + + @Override + public Future close() { + if (clientFuture == null) { + return vertx.getOrCreateContext().succeededFuture(); + } else { + return clientFuture.compose(QuicEndpoint::close); + } + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java index 327fcfab087..15698842620 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java @@ -45,14 +45,14 @@ */ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal, MetricsProvider { - static class Config { - List nonProxyHosts; - boolean verifyHost; - boolean defaultSsl; - String defaultHost; - int defaultPort; - int maxRedirects; - int initialPoolKind; + public static class Config { + public List nonProxyHosts; + public boolean verifyHost; + public boolean defaultSsl; + public String defaultHost; + public int defaultPort; + public int maxRedirects; + public int initialPoolKind; } // Pattern to check we are not dealing with an absoluate URI diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ClientConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ClientConnection.java new file mode 100644 index 00000000000..65d1ad54055 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ClientConnection.java @@ -0,0 +1,131 @@ +package io.vertx.core.http.impl.http3; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http3.DefaultHttp3Headers; +import io.netty.handler.codec.http3.Http3ClientConnectionHandler; +import io.netty.handler.codec.http3.Http3RequestStreamInitializer; +import io.netty.handler.codec.quic.QuicStreamChannel; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.http.*; +import io.vertx.core.http.impl.HttpClientConnection; +import io.vertx.core.http.impl.HttpClientStream; +import io.vertx.core.http.impl.headers.HttpRequestHeaders; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.quic.QuicConnectionInternal; +import io.vertx.core.internal.quic.QuicStreamInternal; +import io.vertx.core.net.HostAndPort; + +import java.util.function.Consumer; +import java.util.function.Function; + +public class Http3ClientConnection extends Http3Connection implements HttpClientConnection { + + public Http3ClientConnection(QuicConnectionInternal connection) { + super(connection); + } + + public void init() { + + super.init(); + + Http3ClientConnectionHandler http3Handler = new Http3ClientConnectionHandler(); + + ChannelPipeline pipeline = connection.channelHandlerContext().pipeline(); + + pipeline.addBefore("handler", "http3", http3Handler); + + + + } + + @Override + public MultiMap newHttpRequestHeaders() { + return new HttpRequestHeaders(new DefaultHttp3Headers()); + } + + @Override + public long activeStreams() { + return 0; + } + + @Override + public long concurrency() { + // For now hardcode + return 10; + } + + @Override + public HostAndPort authority() { + return HostAndPort.authority("localhost", 8443); + } + + @Override + public HttpClientConnection evictionHandler(Handler handler) { + return null; + } + + @Override + public HttpClientConnection invalidMessageHandler(Handler handler) { + return null; + } + + @Override + public HttpClientConnection concurrencyChangeHandler(Handler handler) { + return null; + } + + @Override + public ChannelHandlerContext channelHandlerContext() { + return null; + } + + @Override + public Future createStream(ContextInternal context) { + return connection.createStream(context, true, new Function, ChannelInitializer>() { + @Override + public ChannelInitializer apply(Consumer quicStreamChannelConsumer) { + return new Http3RequestStreamInitializer() { + @Override + protected void initRequestStream(QuicStreamChannel ch) { + quicStreamChannelConsumer.accept(ch); + } + }; + } + }).map(stream -> { + QuicStreamInternal streamInternal = (QuicStreamInternal) stream; + Http3ClientStream http3Stream = new Http3ClientStream(this, streamInternal, context); + http3Stream.init(); + return http3Stream; + }); + } + + @Override + public ContextInternal context() { + return context; + } + + @Override + public boolean isValid() { + return false; + } + + @Override + public Object metric() { + return null; + } + + @Override + public long lastResponseReceivedTimestamp() { + return 0; + } + + @Override + public String indicatedServerName() { + return ""; + } + +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ClientStream.java b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ClientStream.java new file mode 100644 index 00000000000..fe17a8a0d51 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ClientStream.java @@ -0,0 +1,159 @@ +package io.vertx.core.http.impl.http3; + +import io.netty.handler.codec.http3.*; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpVersion; +import io.vertx.core.http.StreamPriority; +import io.vertx.core.http.impl.*; +import io.vertx.core.http.impl.headers.HttpRequestHeaders; +import io.vertx.core.http.impl.headers.HttpResponseHeaders; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.buffer.BufferInternal; +import io.vertx.core.internal.quic.QuicStreamInternal; + +public class Http3ClientStream extends Http3Stream implements HttpClientStream { + + private Handler headHandler; + private boolean endReceived; + + public Http3ClientStream(Http3ClientConnection connection, QuicStreamInternal stream, ContextInternal context) { + super(connection, stream, context); + } + + @Override + protected boolean handleHead(Http3Headers headers) { + HttpResponseHeaders responseHeaders = new HttpResponseHeaders(headers); + if (responseHeaders.validate()) { + HttpResponseHead head = new HttpResponseHead( + responseHeaders.status(), + null, + responseHeaders); + Handler handler = headHandler; + if (handler != null) { + context.emit(head, handler); + } + return true; + } else { + // Not yet implemented + return false; + } + } + + @Override + protected void handleEnd() { + endReceived = true; + super.handleEnd(); + } + + @Override + protected void handleReset(int code) { + if (!endReceived) { + stream.reset(Http3ErrorCode.H3_REQUEST_CANCELLED.code()); + } + super.handleReset(code); + } + + @Override + public Object trace() { + return null; + } + + @Override + public HttpClientConnection connection() { + return connection; + } + + @Override + public Future writeHead(HttpRequestHead request, boolean chunked, Buffer buf, boolean end, StreamPriority priority, boolean connect) { + + HttpRequestHeaders headers = ((HttpRequestHeaders)request.headers()); + + headers.authority(request.authority); + headers.method(request.method); + if (request.method != HttpMethod.CONNECT) { + headers.path(request.uri); + headers.scheme("https"); + } + + headers.prepare(); + + Http3HeadersFrame frame = new DefaultHttp3HeadersFrame((Http3Headers) headers.unwrap()); + + Future fut = stream.writeMessage(frame); + + if (buf != null) { + fut = stream.writeMessage(new DefaultHttp3DataFrame(((BufferInternal)buf).getByteBuf())); + } + + if (end) { + fut = stream.end(); + } + + return fut; + } + + @Override + public HttpClientStream headHandler(Handler handler) { + this.headHandler = handler; + return this; + } + + @Override + public HttpClientStream exceptionHandler(Handler handler) { + return null; + } + + @Override + public HttpClientStream continueHandler(Handler handler) { + return null; + } + + @Override + public HttpClientStream earlyHintsHandler(Handler handler) { + return null; + } + + @Override + public HttpClientStream pushHandler(Handler handler) { + return null; + } + + @Override + public HttpClientStream priorityChangeHandler(Handler handler) { + return null; + } + + @Override + public HttpClientStream closeHandler(Handler handler) { + return null; + } + + @Override + public HttpClientStream updatePriority(StreamPriority streamPriority) { + return null; + } + + @Override + public HttpVersion version() { + return null; + } + + @Override + public Object metric() { + return null; + } + + @Override + public Future writeFrame(int type, int flags, Buffer payload) { + return null; + } + + @Override + public StreamPriority priority() { + return null; + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3Connection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3Connection.java new file mode 100644 index 00000000000..1f42abcbc3e --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3Connection.java @@ -0,0 +1,257 @@ +package io.vertx.core.http.impl.http3; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.http3.DefaultHttp3GoAwayFrame; +import io.netty.handler.codec.http3.Http3; +import io.netty.handler.codec.http3.Http3ControlStreamFrame; +import io.netty.handler.codec.http3.Http3GoAwayFrame; +import io.netty.handler.codec.quic.QuicStreamChannel; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.GoAway; +import io.vertx.core.http.Http2Settings; +import io.vertx.core.http.HttpConnection; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.PromiseInternal; +import io.vertx.core.internal.quic.QuicConnectionInternal; +import io.vertx.core.internal.quic.QuicStreamInternal; +import io.vertx.core.net.QuicConnectionClose; +import io.vertx.core.net.SocketAddress; + +import javax.net.ssl.SSLSession; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class Http3Connection implements HttpConnection { + + final ContextInternal context; + final QuicConnectionInternal connection; + private QuicStreamInternal controlStream; + private Future shutdownFuture; + private long mostRecentRemoteStreamId; + private long remoteGoAway; + private long localGoAway; + private Handler shutdownHandler; + + + public Http3Connection(QuicConnectionInternal connection) { + this.context = connection.context(); + this.connection = connection; + this.remoteGoAway = -1L; + this.localGoAway = -1L; + } + + void handleHttpStream(QuicStreamInternal quicStream) { + } + + private void handleControlStream(QuicStreamInternal quicStream) { + quicStream.messageHandler(msg -> { + if (msg instanceof Http3ControlStreamFrame) { + if (msg instanceof Http3GoAwayFrame) { + Http3GoAwayFrame goAwayFrame = (Http3GoAwayFrame)msg; + handleGoAwayFrame(goAwayFrame.id()); + } + } else { + System.out.println("Unhandled message " + msg); + } + }); + } + + public void init() { + connection.streamHandler(stream -> { + QuicStreamInternal quicStream = (QuicStreamInternal) stream; + + boolean isStream = false; + for (Map.Entry e : quicStream.channelHandlerContext().pipeline()) { + if (e.getValue().getClass().getSimpleName().equals("Http3FrameCodec")) { + isStream = true; + break; + } + } + if (isStream) { + mostRecentRemoteStreamId = stream.id(); + handleHttpStream(quicStream); + } else { + controlStream = quicStream; + handleControlStream(quicStream); + } + }); + + } + + @Override + public final SocketAddress remoteAddress() { + return connection.remoteAddress(); + } + + @Override + public final SocketAddress remoteAddress(boolean real) { + return connection.remoteAddress(); + } + + @Override + public final SocketAddress localAddress() { + return connection.localAddress(); + } + + @Override + public final SocketAddress localAddress(boolean real) { + return connection.localAddress(); + } + + @Override + public final boolean isSsl() { + return true; + } + + @Override + public final SSLSession sslSession() { + return connection.sslSession(); + } + + @Override + public HttpConnection goAwayHandler(@Nullable Handler handler) { + return null; + } + + @Override + public HttpConnection shutdownHandler(@Nullable Handler handler) { + shutdownHandler = handler; + return this; + } + + @Override + public HttpConnection goAway(long errorCode) { + return goAway(errorCode, -1); + } + + @Override + public HttpConnection goAway(long errorCode, int lastStreamId) { + return goAway(errorCode, lastStreamId, (Buffer)null); + } + + @Override + public HttpConnection goAway(long errorCode, int lastStreamId, Buffer debugData) { + EventLoop eventLoop = context.nettyEventLoop(); + if (eventLoop.inEventLoop()) { + goAway(lastStreamId, context.promise()); + } else { + eventLoop.execute(() -> goAway(errorCode, lastStreamId, (Buffer)null)); + } + return this; + } + + private void handleGoAwayFrame(long id) { + remoteGoAway = id; + // Should cancel streams... + Handler handler = shutdownHandler; + if (handler != null) { + context.dispatch(handler); + } + } + + @Override + public Future shutdown(long timeout, TimeUnit unit) { + if (timeout < 0) { + throw new IllegalArgumentException("Timeout must be >= 0"); + } + Promise promise = context.promise(); + EventLoop eventLoop = context.nettyEventLoop(); + if (eventLoop.inEventLoop()) { + shutdown(Duration.ofMillis(unit.toMillis(timeout)), promise); + } else { + eventLoop.execute(() -> shutdown(Duration.ofMillis(unit.toMillis(timeout)), promise)); + } + return promise.future(); + } + + private void shutdown(Duration timeout, Promise promise) { + if (shutdownFuture != null) { + shutdownFuture.onComplete(promise); + return; + } + QuicStreamChannel localControlStream = Http3.getLocalControlStream(connection.channelHandlerContext().channel()); + if (localControlStream != null) { + shutdownFuture = promise.future(); + if (localGoAway != -1L) { + // Go away already sent + connection + .shutdown(timeout) + .onComplete(promise); + } else { + PromiseInternal p = context.promise(); + goAway(mostRecentRemoteStreamId, p); + p.onComplete(ar -> { + connection + .shutdown(timeout) + .onComplete(promise); + }); + } + } else { + promise.fail("No control stream"); + } + } + + public void goAway(long lastStreamId, PromiseInternal promise) { + QuicStreamChannel localControlStream = Http3.getLocalControlStream(connection.channelHandlerContext().channel()); + if (localControlStream != null) { + if (localGoAway == -1 || lastStreamId < localGoAway) { + localGoAway = lastStreamId; + Http3GoAwayFrame frame = new DefaultHttp3GoAwayFrame(lastStreamId); + ChannelFuture fut = localControlStream.writeAndFlush(frame); + fut.addListener(promise); + } else { + promise.fail("Last stream id " + lastStreamId + " must be < " + localGoAway); + } + } else { + promise.fail("No control stream"); + } + } + + + @Override + public HttpConnection closeHandler(Handler handler) { + return this; + } + + @Override + public Http2Settings settings() { + throw new UnsupportedOperationException("Implement me"); + } + + @Override + public Future updateSettings(Http2Settings settings) { + return context.failedFuture("HTTP/3 settings cannot be updated"); + } + + @Override + public Http2Settings remoteSettings() { + throw new UnsupportedOperationException("Implement me"); + } + + @Override + public HttpConnection remoteSettingsHandler(Handler handler) { + return null; + } + + @Override + public Future ping(Buffer data) { + return context.failedFuture("Ping not supported"); + } + + @Override + public HttpConnection pingHandler(@Nullable Handler handler) { + return this; + } + + @Override + public HttpConnection exceptionHandler(Handler handler) { + return this; + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3Server.java b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3Server.java new file mode 100644 index 00000000000..240ec476bf0 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3Server.java @@ -0,0 +1,147 @@ +package io.vertx.core.http.impl.http3; + +import io.netty.handler.codec.http3.*; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.http.*; +import io.vertx.core.http.impl.HttpServerRequestImpl; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.VertxInternal; +import io.vertx.core.internal.quic.QuicConnectionInternal; +import io.vertx.core.net.*; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +public class Http3Server implements HttpServer { + + private final VertxInternal vertx; + private final Http3ServerOptions options; + private volatile Handler requestHandler; + private QuicServer quicServer; + + public Http3Server(VertxInternal vertx, Http3ServerOptions options) { + + options = new Http3ServerOptions(options); + options.getSslOptions().setApplicationLayerProtocols(Arrays.asList(Http3.supportedApplicationProtocols())); + options.getTransportOptions().setInitialMaxData(10000000L); + options.getTransportOptions().setInitialMaxStreamDataBidirectionalLocal(1000000L); + options.getTransportOptions().setInitialMaxStreamDataBidirectionalRemote(1000000L); + options.getTransportOptions().setInitialMaxStreamDataUnidirectional(1000000L); + options.getTransportOptions().setInitialMaxStreamsBidirectional(100L); + options.getTransportOptions().setInitialMaxStreamsUnidirectional(100L); + + this.vertx = vertx; + this.options = options; + } + + @Override + public HttpServer requestHandler(Handler handler) { + this.requestHandler = handler; + return this; + } + + @Override + public Handler requestHandler() { + return requestHandler; + } + + @Override + public HttpServer invalidRequestHandler(Handler handler) { + throw new UnsupportedOperationException(); + } + + @Override + public HttpServer connectionHandler(Handler handler) { + throw new UnsupportedOperationException(); + } + + @Override + public HttpServer webSocketHandshakeHandler(Handler handler) { + throw new UnsupportedOperationException(); + } + + @Override + public HttpServer exceptionHandler(Handler handler) { + throw new UnsupportedOperationException(); + } + + @Override + public HttpServer webSocketHandler(Handler handler) { + throw new UnsupportedOperationException(); + } + + @Override + public Handler webSocketHandler() { + throw new UnsupportedOperationException(); + } + + @Override + public Future updateSSLOptions(ServerSSLOptions options, boolean force) { + throw new UnsupportedOperationException(); + } + + @Override + public Future updateTrafficShapingOptions(TrafficShapingOptions options) { + throw new UnsupportedOperationException(); + } + + @Override + public Future listen() { + return listen(SocketAddress.inetSocketAddress(443, "0.0.0.0")); + } + + private void handleConnection(QuicConnection connection) { + String host = connection.localAddress().host(); + int port = connection.localAddress().port(); + String serverOrigin = "https://" + host + ":" + port; + + QuicConnectionInternal connectionInternal = (QuicConnectionInternal) connection; + + Http3ServerConnection http3Connection = new Http3ServerConnection(connectionInternal); + + http3Connection.init(); + + http3Connection.streamHandler(stream -> { + HttpServerRequestImpl request = new HttpServerRequestImpl(requestHandler, stream, stream.context(), + false, HttpServerOptions.DEFAULT_MAX_FORM_ATTRIBUTE_SIZE, + HttpServerOptions.DEFAULT_MAX_FORM_FIELDS, HttpServerOptions.DEFAULT_MAX_FORM_BUFFERED_SIZE, serverOrigin); + request.init(); + }); + } + + @Override + public Future listen(SocketAddress address) { + synchronized (this) { + if (quicServer != null) { + return vertx.getOrCreateContext().failedFuture("Already listening on port " + address.port()); + } + quicServer = QuicServer.create(vertx, options); + } + quicServer.handler(this::handleConnection); + return quicServer + .bind(address) + .map(this); + } + + @Override + public Future shutdown(long timeout, TimeUnit unit) { + QuicServer s; + synchronized (this) { + s = quicServer; + if (s == null) { + return vertx.getOrCreateContext().succeededFuture(); + } + quicServer = null; + } + return s.shutdown(Duration.ofMillis(unit.toMillis(timeout))); + } + + @Override + public int actualPort() { + throw new UnsupportedOperationException(); + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ServerConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ServerConnection.java new file mode 100644 index 00000000000..9fcc555729b --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ServerConnection.java @@ -0,0 +1,92 @@ +package io.vertx.core.http.impl.http3; + +import io.netty.channel.*; +import io.netty.handler.codec.Headers; +import io.netty.handler.codec.http3.*; +import io.netty.handler.codec.quic.QuicStreamChannel; +import io.vertx.core.Handler; +import io.vertx.core.http.impl.HttpServerConnection; +import io.vertx.core.http.impl.HttpServerStream; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.quic.QuicConnectionInternal; +import io.vertx.core.internal.quic.QuicStreamInternal; + +import java.util.function.Supplier; + +public class Http3ServerConnection extends Http3Connection implements HttpServerConnection { + + private final Supplier streamContextProvider; + private Handler streamHandler; + private QuicStreamChannel outboundControlStream; + + public Http3ServerConnection(QuicConnectionInternal connection) { + super(connection); + + this.streamContextProvider = connection.context()::duplicate; + } + + void handleHttpStream(QuicStreamInternal quicStream) { + ContextInternal streamContext = streamContextProvider.get(); + Http3ServerStream http3Stream = new Http3ServerStream(this, quicStream, streamContext); + http3Stream.init(); + Handler handler = streamHandler; + streamContext.emit(http3Stream, handler); + } + + public void init() { + + super.init(); + + Http3ServerConnectionHandler http3Handler = new Http3ServerConnectionHandler( + new ChannelInitializer() { + @Override + protected void initChannel(QuicStreamChannel ch) { + // Nothing to do + } + }, + new ChannelInitializer() { + @Override + protected void initChannel(QuicStreamChannel ch) { + outboundControlStream = ch; + } + }, + null, + null, + true + ); + + ChannelPipeline pipeline = connection.channelHandlerContext().pipeline(); + pipeline.addBefore("handler", "http3", http3Handler); + } + + @Override + public HttpServerConnection streamHandler(Handler handler) { + streamHandler = handler; + return this; + } + + @Override + public Headers newHeaders() { + return new DefaultHttp3Headers(); + } + + @Override + public boolean supportsSendFile() { + return false; + } + + @Override + public ContextInternal context() { + return context; + } + + @Override + public ChannelHandlerContext channelHandlerContext() { + return connection.channelHandlerContext(); + } + + @Override + public String indicatedServerName() { + return ""; + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ServerStream.java b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ServerStream.java new file mode 100644 index 00000000000..870642b25a8 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3ServerStream.java @@ -0,0 +1,169 @@ +package io.vertx.core.http.impl.http3; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http3.*; +import io.netty.handler.stream.ChunkedInput; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpFrame; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpVersion; +import io.vertx.core.http.StreamPriority; +import io.vertx.core.http.impl.HttpRequestHead; +import io.vertx.core.http.impl.HttpResponseHead; +import io.vertx.core.http.impl.HttpServerConnection; +import io.vertx.core.http.impl.HttpServerStream; +import io.vertx.core.http.impl.headers.HttpHeaders; +import io.vertx.core.http.impl.headers.HttpRequestHeaders; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.buffer.BufferInternal; +import io.vertx.core.internal.quic.QuicStreamInternal; +import io.vertx.core.net.HostAndPort; + +public class Http3ServerStream extends Http3Stream implements HttpServerStream { + + private Handler headHandler; + private boolean endReceived; + + public Http3ServerStream(Http3ServerConnection connection, QuicStreamInternal stream, ContextInternal context) { + super(connection, stream, context); + } + + @Override + protected boolean handleHead(Http3Headers headers) { + HttpRequestHeaders requestHeaders = new HttpRequestHeaders(headers); + if (requestHeaders.validate()) { + HttpRequestHead head = new HttpRequestHead( + requestHeaders.method(), + requestHeaders.path(), + requestHeaders, + requestHeaders.authority(), + null, + null); + Handler handler = headHandler; + if (handler != null) { + context.emit(head, handler); + } + return true; + } else { + return false; + } + } + + @Override + protected void handleEnd() { + endReceived = true; + super.handleEnd(); + } + + @Override + protected void handleReset(int code) { + super.handleReset(code); + if (!endReceived) { + stream.reset(Http3ErrorCode.H3_REQUEST_INCOMPLETE.code()); + } + } + + @Override + public void routed(String route) { + } + + @Override + public long bytesWritten() { + return 0; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + public HttpServerConnection connection() { + return connection; + } + + @Override + public Future writeHead(HttpResponseHead head, Buffer chunk, boolean end) { + Http3Headers headers = (Http3Headers) ((HttpHeaders) head.headers).unwrap(); + headers.status("" + head.statusCode); + Future fut = stream.writeMessage(new DefaultHttp3HeadersFrame(headers)); + if (chunk != null) { + fut = stream.writeMessage(new DefaultHttp3DataFrame(((BufferInternal)chunk).getByteBuf())); + } + if (end) { + fut = stream.end(); + } + return fut; + } + + @Override + public Future writeHeaders(MultiMap headers, boolean end) { + Http3Headers http3Headers = (Http3Headers) ((HttpHeaders) headers).unwrap(); + Future fut = stream.writeMessage(new DefaultHttp3HeadersFrame(http3Headers)); + if (end) { + fut = stream.end(); + } + return fut; + } + + @Override + public Future sendPush(HostAndPort authority, HttpMethod method, MultiMap headers, String path, StreamPriority priority) { + return null; + } + + @Override + public HttpServerStream headHandler(Handler handler) { + this.headHandler = handler; + return this; + } + + @Override + public HttpServerStream exceptionHandler(Handler handler) { + return null; + } + + @Override + public HttpServerStream priorityChangeHandler(Handler handler) { + return this; + } + + @Override + public HttpServerStream closeHandler(Handler handler) { + stream.closeHandler(handler); + return this; + } + + @Override + public void sendFile(ChunkedInput file, Promise promise) { + + } + + @Override + public HttpServerStream updatePriority(StreamPriority streamPriority) { + return null; + } + + @Override + public HttpVersion version() { + throw new UnsupportedOperationException(); + } + + @Override + public Object metric() { + return null; + } + + @Override + public Future writeFrame(int type, int flags, Buffer payload) { + return null; + } + + @Override + public StreamPriority priority() { + return null; + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3Stream.java b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3Stream.java new file mode 100644 index 00000000000..ba397d2843c --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/http3/Http3Stream.java @@ -0,0 +1,209 @@ +package io.vertx.core.http.impl.http3; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http2.EmptyHttp2Headers; +import io.netty.handler.codec.http3.*; +import io.netty.util.ReferenceCountUtil; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpFrame; +import io.vertx.core.http.impl.HttpClientStream; +import io.vertx.core.http.impl.HttpFrameImpl; +import io.vertx.core.http.impl.HttpServerStream; +import io.vertx.core.http.impl.headers.HttpHeaders; +import io.vertx.core.http.impl.headers.HttpResponseHeaders; +import io.vertx.core.impl.buffer.VertxByteBufAllocator; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.buffer.BufferInternal; +import io.vertx.core.internal.quic.QuicStreamInternal; +import io.vertx.core.net.impl.VertxHandler; + +public abstract class Http3Stream, C extends Http3Connection> { + + // Uses HTTP/2 STUFF FOR NOW + static final HttpResponseHeaders EMPTY = new HttpResponseHeaders(EmptyHttp2Headers.INSTANCE); + + final ContextInternal context; + final C connection; + final QuicStreamInternal stream; + + private boolean headReceived; + private Http3Headers headers; + private Handler trailersHandler; + private Handler dataHandler; + private Handler unknownFrameHandler; + private Handler resetHandler; + + public Http3Stream(C connection, QuicStreamInternal stream, ContextInternal context) { + this.stream = stream; + this.context = context; + this.connection = connection; + } + + protected boolean handleHead(Http3Headers headers) { + return true; + } + + protected void handleHeaders(Http3Headers headers) { + } + + protected void handleUnknownFrame(long type, Buffer buffer) { + Handler handler = unknownFrameHandler; + if (handler != null) { + handler.handle(new HttpFrameImpl((int)type, 0, buffer)); + } + } + + protected void handleData(Buffer buffer) { + Handler handler = dataHandler; + if (handler != null) { + handler.handle(buffer); + } + } + + protected void handleReset(int code) { + Handler handler = resetHandler; + if (handler != null) { + handler.handle((long)code); + } + } + + protected void handleEnd() { + Handler handler = trailersHandler; + if (handler != null) { + MultiMap trailers; + if (headers != null) { + trailers = new HttpHeaders(headers); + } else { + trailers = EMPTY; + } + handler.handle(trailers); + } + } + + void init() { + stream.messageHandler(msg -> { + ByteBuf buffer; + ByteBuf content; + if (msg instanceof Http3Frame) { + Http3Frame http3Frame = (Http3Frame) msg; + try { + switch ((int)http3Frame.type()) { + case 0x01: + // Headers frame + Http3HeadersFrame http3HeadersFrame = (Http3HeadersFrame) http3Frame; + if (!headReceived) { + headReceived = true; + if (!handleHead(http3HeadersFrame.headers())) { + // Not yet implemented + } + } else { + headers = http3HeadersFrame.headers(); + handleHeaders(http3HeadersFrame.headers()); + } + break; + case 0x00: + // Data frame + Http3DataFrame http3DataFrame = (Http3DataFrame) http3Frame; + content = http3DataFrame.content(); + buffer = VertxHandler.copyBuffer(content); + handleData(BufferInternal.buffer(buffer)); + break; + default: + if (http3Frame instanceof Http3UnknownFrame) { + Http3UnknownFrame unknownFrame = (Http3UnknownFrame)http3Frame; + content = unknownFrame.content(); + buffer = VertxHandler.copyBuffer(content); + handleUnknownFrame(unknownFrame.type(), BufferInternal.buffer(buffer)); + } else { + System.out.println("Frame type " + http3Frame.type() + " not implemented"); + } + break; + } + } finally { + ReferenceCountUtil.release(http3Frame); + } + } + }); + stream.resetHandler(code -> { + handleReset(code); + }); + stream.endHandler(v -> { + handleEnd(); + }); + stream.shutdownHandler(v -> { + // Not used at the moment + }); + } + + public final S customFrameHandler(Handler handler) { + unknownFrameHandler = handler; + return (S)this; + } + + public final S trailersHandler(Handler handler) { + this.trailersHandler = handler; + return (S)this; + } + + public final S dataHandler(Handler handler) { + this.dataHandler = handler; + return (S)this; + } + + public S resetHandler(Handler handler) { + this.resetHandler = handler; + return (S)this; + } + + public final boolean isWritable() { + return !stream.writeQueueFull(); + } + + public final S drainHandler(Handler handler) { + stream.drainHandler(handler); + return (S)this; + } + + public final S setWriteQueueMaxSize(int maxSize) { + stream.setWriteQueueMaxSize(maxSize); + return (S)this; + } + + public final S pause() { + stream.pause(); + return (S)this; + } + + public final S fetch(long amount) { + stream.fetch(amount); + return (S)this; + } + + public Future writeChunk(Buffer chunk, boolean end) { + Future fut; + if (chunk != null) { + fut = stream.writeMessage(new DefaultHttp3DataFrame(((BufferInternal)chunk).getByteBuf())); + } else { + fut = null; + } + if (end) { + fut = stream.end(); + } + return fut; + } + + public Future writeReset(long code) { + return stream.reset((int)code); + } + + public final int id() { + return (int)stream.id(); + } + + public final ContextInternal context() { + return context; + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index fab8d506866..6aa05454387 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -33,6 +33,7 @@ import io.vertx.core.http.*; import io.vertx.core.http.impl.*; import io.vertx.core.http.impl.Http1xOrH2ChannelConnector; +import io.vertx.core.http.impl.http3.Http3Server; import io.vertx.core.impl.deployment.DefaultDeploymentManager; import io.vertx.core.impl.deployment.DefaultDeployment; import io.vertx.core.internal.deployment.Deployment; @@ -399,6 +400,32 @@ public HttpServer createHttpServer(HttpServerOptions serverOptions) { return new HttpServerImpl(this, serverOptions); } + @Override + public HttpClientAgent createHttpClient(Http3ClientOptions options) { + + Http3ChannelConnector connector = new Http3ChannelConnector(this, options); + + HttpClientImpl.Config config = new HttpClientImpl.Config(); + + config.nonProxyHosts = null; + config.verifyHost = false; + config.defaultSsl = true; + config.defaultHost = "localhost"; + config.defaultPort = 8443; + config.maxRedirects = HttpClientOptions.DEFAULT_MAX_REDIRECTS; + config.initialPoolKind = 1; // Multiplexed + + HttpClientImpl client = new HttpClientImpl(this, null, null, connector, null, null, + new PoolOptions(), null, options.getSslOptions(), config); + + return client; + } + + @Override + public HttpServer createHttpServer(Http3ServerOptions options) { + return new Http3Server(this, options); + } + @Override public WebSocketClient createWebSocketClient(WebSocketClientOptions options) { HttpClientOptions o = new HttpClientOptions(options); diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java index 82771460706..d91dea97a94 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java @@ -85,6 +85,16 @@ public HttpServer createHttpServer(HttpServerOptions options) { return delegate.createHttpServer(options); } + @Override + public HttpClientAgent createHttpClient(Http3ClientOptions options) { + return delegate.createHttpClient(options); + } + + @Override + public HttpServer createHttpServer(Http3ServerOptions options) { + return delegate.createHttpServer(options); + } + @Override public HttpClientBuilder httpClientBuilder() { return delegate.httpClientBuilder(); diff --git a/vertx-core/src/main/java/module-info.java b/vertx-core/src/main/java/module-info.java index c9fa1ca3dc2..9473b9e88eb 100644 --- a/vertx-core/src/main/java/module-info.java +++ b/vertx-core/src/main/java/module-info.java @@ -8,6 +8,7 @@ requires io.netty.codec.dns; requires io.netty.codec.http; requires io.netty.codec.http2; + requires io.netty.codec.http3; requires io.netty.common; requires io.netty.handler; requires io.netty.handler.proxy; diff --git a/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3ClientTest.java b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3ClientTest.java new file mode 100644 index 00000000000..fa6a1e4c660 --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3ClientTest.java @@ -0,0 +1,274 @@ +package io.vertx.tests.http.http3; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.http3.DefaultHttp3Headers; +import io.netty.handler.codec.quic.QuicException; +import io.netty.util.NetUtil; +import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.*; +import io.vertx.core.net.QuicClient; +import io.vertx.core.net.SocketAddress; +import io.vertx.test.core.LinuxOrOsx; +import io.vertx.test.core.VertxTestBase; +import io.vertx.test.tls.Cert; +import io.vertx.test.tls.Trust; +import io.vertx.tests.net.quic.QuicClientTest; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@RunWith(LinuxOrOsx.class) +public class Http3ClientTest extends VertxTestBase { + + private HttpServer server; + private Http3ClientOptions clientOptions; + private HttpClientAgent client; + + @Override + public void setUp() throws Exception { + super.setUp(); + Http3ServerOptions serverOptions = new Http3ServerOptions(); + serverOptions.getSslOptions().setKeyCertOptions(Cert.SERVER_JKS.get()); + clientOptions = new Http3ClientOptions(); + clientOptions.getSslOptions().setTrustOptions(Trust.SERVER_JKS.get()); + clientOptions.getSslOptions().setHostnameVerificationAlgorithm(""); + server = vertx.createHttpServer(serverOptions); + client = vertx.createHttpClient(clientOptions); + } + + @Override + protected void tearDown() throws Exception { + server.close().await(); + client.close().await(); + super.tearDown(); + } + + @Test + public void testGet() { + server.requestHandler(req -> { + req.response().end("Hello World"); + }); + server.listen(8443, "localhost").await(); + + HttpClientConnection connection = client.connect(new HttpConnectOptions() + .setHost("localhost") + .setPort(8443)).await(); + + Buffer response = connection.request(HttpMethod.GET, 8443, "localhost", "/") + .compose(request -> request + .send() + .expecting(HttpResponseExpectation.SC_OK) + .compose(HttpClientResponse::body)) + .await(); + + assertEquals("Hello World", response.toString()); + } + + @Test + public void testPost() { + server.requestHandler(req -> { + req.bodyHandler(buff -> { + req.response().end(buff); + }); + }); + server.listen(8443, "localhost").await(); + + HttpClientConnection connection = client.connect(new HttpConnectOptions() + .setHost("localhost") + .setPort(8443)).await(); + + Buffer response = connection.request(HttpMethod.POST, 8443, "localhost", "/") + .compose(request -> request + .setChunked(true) + .send("Hello World")) + .expecting(HttpResponseExpectation.SC_OK) + .compose(HttpClientResponse::body) + .await(); + + assertEquals("Hello World", response.toString()); + } + + @Test + public void testResponseTrailers() throws Exception { + server.requestHandler(req -> { + req.response() + .putTrailer("trailer_key", "trailer_value") + .end(); + }); + server.listen(8443, "localhost").await(); + + MultiMap trailers = client.request(HttpMethod.GET, 8443, "localhost", "/") + .compose(request -> request + .send() + .expecting(HttpResponseExpectation.SC_OK) + .compose(resp -> resp.end().map(v -> resp.trailers()))) + .await(); + + assertEquals(1, trailers.size()); + assertEquals("trailer_value", trailers.get("trailer_key")); + } + + @Test + public void testServerConnectionGoAway() { + + CompletableFuture shutdown = new CompletableFuture<>(); + + server.requestHandler(req -> { + Future fut = req.connection().shutdown(); + shutdown.whenComplete((s, err) -> { + long now = System.currentTimeMillis(); + fut.onComplete(onSuccess2(v -> { + assertTrue(System.currentTimeMillis() - now >= 1000); + testComplete(); + })); + vertx.setTimer(1000, id -> { + req.response().end("done"); + }); + }); + }); + + server.listen(8443, "localhost").await(); + + HttpClientConnection connection = client.connect(new HttpConnectOptions() + .setHost("localhost") + .setPort(8443)).await(); + + connection.shutdownHandler(v -> { + shutdown.complete(null); + }); + + Buffer response = connection.request(HttpMethod.GET, 8443, "localhost", "/") + .compose(request -> request + .send() + .expecting(HttpResponseExpectation.SC_OK) + .compose(HttpClientResponse::body)) + .await(); + + assertEquals("done", response.toString()); + + await(); + } + + @Test + public void testClientConnectinoGoAway() throws Exception { + + CompletableFuture shutdown = new CompletableFuture<>(); + + server.requestHandler(req -> { + shutdown.complete(null); + req.connection().shutdownHandler(v -> { + vertx.setTimer(1000, id -> { + req.response().end(); + }); + }); +// Future fut = req.connection().shutdown(); +// shutdown.whenComplete((s, err) -> { +// long now = System.currentTimeMillis(); +// fut.onComplete(onSuccess2(v -> { +// assertTrue(System.currentTimeMillis() - now >= 1000); +// testComplete(); +// })); +// vertx.setTimer(1000, id -> { +// req.response().end("done"); +// }); +// }); + }); + + server.listen(8443, "localhost").await(); + + HttpClientConnection connection = client.connect(new HttpConnectOptions() + .setHost("localhost") + .setPort(8443)).await(); + + HttpClientRequest request = connection.request(HttpMethod.GET, 8443, "localhost", "/") + .await(); + + request.end().await(); + + shutdown.get(10, TimeUnit.SECONDS.SECONDS); + + connection.shutdown(10, TimeUnit.SECONDS).await(); + +// await(); + } + + @Test + public void testClientRequestResetUponClientPartialRequestResetByServer() throws Exception { + + server.requestHandler(req -> { + req.handler(buff -> { + req.response().reset(); + }); + req.exceptionHandler(err -> { + assertEquals(StreamResetException.class, err.getClass()); + complete(); + }); + }); + + server.listen(8443, "localhost").await(); + + HttpClientConnection connection = client.connect(new HttpConnectOptions() + .setHost("localhost") + .setPort(8443)).await(); + + HttpClientRequest request = connection + .request(HttpMethod.GET, 8443, "localhost", "/") + .await(); + + request.setChunked(true).write("chunk").await(); + + try { + request.response().await(); + fail(); + } catch (StreamResetException expected) { + } + + await(); + } + + @Test + public void testServerResponseReset() throws Exception { + + CompletableFuture continuation = new CompletableFuture<>(); + + server.requestHandler(req -> { + req.endHandler(buff -> { + HttpServerResponse response = req.response(); + continuation.whenComplete((v, err) -> { + response.reset(); + }); + response + .setChunked(true) + .write("chunk"); + }); + }); + + server.listen(8443, "localhost").await(); + + HttpClientConnection connection = client.connect(new HttpConnectOptions() + .setHost("localhost") + .setPort(8443)).await(); + + HttpClientRequest request = connection + .request(HttpMethod.GET, 8443, "localhost", "/") + .await(); + + request.end().await(); + + HttpClientResponse response = request.response().await(); + Future body = response.body(); + + continuation.complete(null); + try { + body.await(); + fail(); + } catch (StreamResetException expected) { + } + } +} diff --git a/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3ContextTest.java b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3ContextTest.java new file mode 100644 index 00000000000..201dc7ce86b --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3ContextTest.java @@ -0,0 +1,130 @@ +package io.vertx.tests.http.http3; + +import io.vertx.core.*; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.*; +import io.vertx.core.http.impl.HttpServerConnection; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.VertxInternal; +import io.vertx.test.core.LinuxOrOsx; +import io.vertx.test.core.VertxTestBase; +import io.vertx.test.tls.Cert; +import io.vertx.test.tls.Trust; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(LinuxOrOsx.class) +public class Http3ContextTest extends VertxTestBase { + + private HttpServer server; + private Http3ClientOptions clientOptions; + private HttpClientAgent client; + + @Override + public void setUp() throws Exception { + super.setUp(); + Http3ServerOptions serverOptions = new Http3ServerOptions(); + serverOptions.getSslOptions().setKeyCertOptions(Cert.SERVER_JKS.get()); + clientOptions = new Http3ClientOptions(); + clientOptions.getSslOptions().setTrustOptions(Trust.SERVER_JKS.get()); + clientOptions.getSslOptions().setHostnameVerificationAlgorithm(""); + server = vertx.createHttpServer(serverOptions); + client = vertx.createHttpClient(clientOptions); + } + + @Override + protected void tearDown() throws Exception { + server.close().await(); + client.close().await(); + super.tearDown(); + } + + @Test + public void testServerRequestEventLoopContext() { + testServerRequestContext(ThreadingModel.EVENT_LOOP); + } + + @Test + public void testServerRequestWorkerContext() { + testServerRequestContext(ThreadingModel.WORKER); + } + + private void testServerRequestContext(ThreadingModel threadingModel) { + + server.requestHandler(req -> { + Context ctx = Vertx.currentContext(); + assertEquals(threadingModel, ctx.threadingModel()); + assertIsDuplicate(req, ctx); + Buffer body = Buffer.buffer(); + req.handler(chunk -> { + assertSame(ctx, Vertx.currentContext()); + body.appendBuffer(chunk); + }); + req.endHandler(v -> { + assertSame(ctx, Vertx.currentContext()); + req.response().end(body); + }); + }); + + ContextInternal serverCtx = ((VertxInternal) vertx).createContext(threadingModel); + Future.future(p -> serverCtx.runOnContext(vertx -> server.listen(8443, "localhost").onComplete(p))).await(); + + Buffer response = client.request(HttpMethod.POST, 8443, "localhost", "/") + .compose(request -> request + .send(Buffer.buffer("payload")) + .expecting(HttpResponseExpectation.SC_OK) + ) + .compose(HttpClientResponse::body).await(); + + assertEquals("payload", response.toString()); + } + + @Test + public void testClientRequestEventLoopContext() { + testClientRequestContext(ThreadingModel.EVENT_LOOP); + } + + @Test + public void testClientRequestWorkerContext() { + testClientRequestContext(ThreadingModel.WORKER); + } + + public void testClientRequestContext(ThreadingModel threadingModel) { + + server.requestHandler(req -> { + req.response().end("Hello World"); + }); + + server.listen(8443, "localhost").await(); + + ContextInternal connectionCtx = ((VertxInternal) vertx).createContext(threadingModel); + ContextInternal streamCtx = ((VertxInternal) vertx).createContext(threadingModel); + + HttpClientRequest request1 = Future.future(p -> connectionCtx.runOnContext(v -> client.request(HttpMethod.POST, 8443, "localhost", "/").onComplete(p))).await(); + + Buffer body = request1.send().compose(response -> { + assertSame(connectionCtx, Vertx.currentContext()); + return response.body(); + }).await(); + assertEquals("Hello World", body.toString()); + + HttpClientRequest request2 = Future.future(p -> streamCtx.runOnContext(v -> client.request(HttpMethod.POST, 8443, "localhost", "/").onComplete(p))).await(); + body = request2.send().compose(response -> { + assertSame(streamCtx, Vertx.currentContext()); + return response.body(); + }).await(); + assertEquals("Hello World", body.toString()); + + assertSame(request1.connection(), request2.connection()); + } + + private void assertIsDuplicate(HttpServerRequest request, Context context) { + assertIsDuplicate(request, (ContextInternal)context); + } + + private void assertIsDuplicate(HttpServerRequest request, ContextInternal context) { + assertTrue(context.isDuplicate()); + HttpServerConnection connection = (HttpServerConnection) request.connection(); + assertSame(connection.context(), context.unwrap()); + } +} diff --git a/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3FlowControlTest.java b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3FlowControlTest.java new file mode 100644 index 00000000000..7e58b6dc9df --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3FlowControlTest.java @@ -0,0 +1,91 @@ +package io.vertx.tests.http.http3; + +import io.vertx.core.Completable; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.*; +import io.vertx.core.streams.WriteStream; +import io.vertx.test.core.LinuxOrOsx; +import io.vertx.test.core.TestUtils; +import io.vertx.test.core.VertxTestBase; +import io.vertx.test.tls.Cert; +import io.vertx.test.tls.Trust; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; + +@RunWith(LinuxOrOsx.class) +public class Http3FlowControlTest extends VertxTestBase { + + private HttpServer server; + private Http3ClientOptions clientOptions; + private HttpClientAgent client; + + @Override + public void setUp() throws Exception { + super.setUp(); + Http3ServerOptions serverOptions = new Http3ServerOptions(); + serverOptions.getSslOptions().setKeyCertOptions(Cert.SERVER_JKS.get()); + clientOptions = new Http3ClientOptions(); + clientOptions.getSslOptions().setTrustOptions(Trust.SERVER_JKS.get()); + clientOptions.getSslOptions().setHostnameVerificationAlgorithm(""); + server = vertx.createHttpServer(serverOptions); + client = vertx.createHttpClient(clientOptions); + } + + @Override + protected void tearDown() throws Exception { + server.close().await(); + client.close().await(); + super.tearDown(); + } + + private void pump(int times, Buffer chunk, WriteStream writeStream, Completable cont) { + if (writeStream.writeQueueFull()) { + cont.succeed(times); + } else { + writeStream.write(chunk); + vertx.runOnContext(v -> pump(times + 1, chunk, writeStream, cont)); + } + } + + @Test + public void testHttpServerResponseFlowControl() { + + Buffer chunk = Buffer.buffer(TestUtils.randomAlphaString(128)); + CompletableFuture latch = new CompletableFuture<>(); + + server.requestHandler(req -> { + pump(0, chunk, req.response(), onSuccess2(times -> { + req.response().end(); + latch.complete(times); + })); + }); + server.listen(8443, "localhost").await(); + + client.request(HttpMethod.GET, 8443, "localhost", "/") + .compose(request -> request + .send() + .expecting(HttpResponseExpectation.SC_OK)) + .onComplete(onSuccess2(resp -> { + resp.pause(); + Buffer expected = Buffer.buffer(); + latch.whenComplete((times, err) -> { + for (int i = 0; i < times; i++) { + expected.appendBuffer(chunk); + } + resp.resume(); + }); + Buffer cumulation = Buffer.buffer(); + resp.handler(cumulation::appendBuffer); + resp.endHandler(v -> { + assertEquals(expected, cumulation); + testComplete(); + }); + })); + + await(); + } +} diff --git a/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3NettyTest.java b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3NettyTest.java index fc196c5927c..f1299b7cfa3 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3NettyTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3NettyTest.java @@ -1,23 +1,13 @@ package io.vertx.tests.http.http3; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.*; import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.handler.codec.http3.DefaultHttp3DataFrame; -import io.netty.handler.codec.http3.DefaultHttp3HeadersFrame; -import io.netty.handler.codec.http3.Http3; -import io.netty.handler.codec.http3.Http3ClientConnectionHandler; -import io.netty.handler.codec.http3.Http3DataFrame; -import io.netty.handler.codec.http3.Http3HeadersFrame; -import io.netty.handler.codec.http3.Http3RequestStreamInboundHandler; -import io.netty.handler.codec.http3.Http3ServerConnectionHandler; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http3.*; import io.netty.handler.codec.quic.InsecureQuicTokenHandler; import io.netty.handler.codec.quic.QuicChannel; import io.netty.handler.codec.quic.QuicSslContext; @@ -26,12 +16,19 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.CharsetUtil; -import io.netty.util.NetUtil; import io.netty.util.ReferenceCountUtil; -import org.junit.Test; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.junit.Assert.assertTrue; public class Http3NettyTest { @@ -42,12 +39,12 @@ public static void main(String[] args) throws Exception { EventLoopGroup group = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory()); - try { - Channel channel = server(group, PORT); - client(group, PORT); - } finally { - group.shutdownGracefully(); - } +// try { +// Channel channel = server(group, PORT); +// client(group, PORT); +// } finally { +// group.shutdownGracefully(); +// } } @@ -113,64 +110,332 @@ protected void channelInputClosed(ChannelHandlerContext ctx) { return channel; } - public static void client(EventLoopGroup group, int port) throws Exception { + public static Client client(EventLoopGroup group) throws Exception { + Client client = new Client(group); + client.bind(0); + return client; + } - QuicSslContext context = QuicSslContextBuilder.forClient() - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .applicationProtocols(Http3.supportedApplicationProtocols()).build(); - ChannelHandler codec = Http3.newQuicClientCodecBuilder() - .sslContext(context) - .maxIdleTimeout(5000, TimeUnit.MILLISECONDS) - .initialMaxData(10000000) - .initialMaxStreamDataBidirectionalLocal(1000000) - .build(); + public static class Client { - Bootstrap bs = new Bootstrap(); - Channel channel = bs.group(group) - .channel(NioDatagramChannel.class) - .handler(codec) - .bind(0).sync().channel(); - QuicChannel quicChannel = QuicChannel.newBootstrap(channel) - .handler(new Http3ClientConnectionHandler()) - .remoteAddress(new InetSocketAddress(NetUtil.LOCALHOST4, port)) - .connect() - .get(); + private final EventLoopGroup group; + private Channel channel; - QuicStreamChannel streamChannel = Http3.newRequestStream(quicChannel, - new Http3RequestStreamInboundHandler() { - @Override - protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame frame) { - ReferenceCountUtil.release(frame); + public Client(EventLoopGroup group) { + this.group = group; + } + + public Client bind(int port) throws Exception { + QuicSslContext context = QuicSslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .applicationProtocols(Http3.supportedApplicationProtocols()).build(); + ChannelHandler codec = Http3.newQuicClientCodecBuilder() + .sslContext(context) + .maxIdleTimeout(5000, TimeUnit.MILLISECONDS) + .initialMaxData(10000000) + .initialMaxStreamDataBidirectionalLocal(1000000) + .build(); + + Bootstrap bs = new Bootstrap(); + channel = bs.group(group) + .channel(NioDatagramChannel.class) + .handler(codec) + .bind(0).sync().channel(); + + return this; + } + + public void close() throws Exception { + if (channel != null) { + channel.close().sync(); + } + } + + public Connection connect(InetSocketAddress server) throws Exception { + + Channel ch = channel; + + if (ch == null) { + throw new IllegalStateException("Not bound"); + } + + AtomicReference goAwayHandlerRef = new AtomicReference<>(); + + QuicChannel quicChannel = QuicChannel.newBootstrap(channel) + .handler(new Http3ClientConnectionHandler(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http3ControlStreamFrame) { + Http3ControlStreamFrame controlStreamFrame = (Http3ControlStreamFrame)msg; + switch ((int)controlStreamFrame.type()) { + case 7: + // Go away + Runnable runnable = goAwayHandlerRef.get(); + if (runnable != null) { + runnable.run(); + } + break; + } + } else { + super.channelRead(ctx, msg); + } + } + }, null, null, null, true)) + .remoteAddress(server) + .connect() + .get(); + + return new Connection(quicChannel, goAwayHandlerRef, server); + } + + public class Connection { + + private final QuicChannel channel; + private final InetSocketAddress address; + private final AtomicReference goAwayHandlerRef; + + private Connection(QuicChannel quicChannel, AtomicReference goAwayHandlerRef, InetSocketAddress address) { + this.channel = quicChannel; + this.address = address; + this.goAwayHandlerRef = goAwayHandlerRef; + } + + public Connection goAwayHandler(Runnable handler) { + goAwayHandlerRef.set(handler); + return this; + } + + public Stream stream() throws Exception { + Stream stream = new Stream(this); + stream.streamChannel = Http3.newRequestStream(channel, stream).sync().getNow(); + return stream; + } + + public void close() throws Exception { + channel.close().sync(); + } + } + + public class Stream extends Http3RequestStreamInboundHandler { + + private final Connection connection; + private QuicStreamChannel streamChannel; + private Consumer headersHandler; + private Consumer chunkHandler; + private Consumer endHandler; + private boolean headersSent; + private Http3Headers responseHeaders; + private ByteArrayOutputStream responseCumulation = new ByteArrayOutputStream(); + private CompletableFuture responseBody = new CompletableFuture<>(); + + public Stream(Connection connection) { + this.connection = connection; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + responseBody.completeExceptionally(cause); + super.exceptionCaught(ctx, cause); + } + + @Override + protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame frame) { + ReferenceCountUtil.release(frame); + Consumer handler = headersHandler; + Http3Headers headers = frame.headers(); + responseHeaders = headers; + if (handler != null) { + handler.accept(headers); } + } - @Override - protected void channelRead(ChannelHandlerContext ctx, Http3DataFrame frame) { - System.out.print(frame.content().toString(CharsetUtil.US_ASCII)); - ReferenceCountUtil.release(frame); + @Override + protected void channelRead(ChannelHandlerContext ctx, Http3DataFrame frame) { + byte[] chunk = ByteBufUtil.getBytes(frame.content()); + ReferenceCountUtil.release(frame); + responseCumulation.writeBytes(chunk); + Consumer handler = chunkHandler; + if (handler != null) { + handler.accept(chunk); } + } - @Override - protected void channelInputClosed(ChannelHandlerContext ctx) { - ctx.close(); + @Override + protected void channelInputClosed(ChannelHandlerContext ctx) { + try { + responseCumulation.close(); + } catch (IOException ignore) { + } + responseBody.complete(responseCumulation.toByteArray()); + Consumer handler = endHandler; + if (handler != null) { + handler.accept(null); + } + ctx.close(); // ??? + } + + public Stream headersHandler(Consumer headersHandler) { + this.headersHandler = headersHandler; + return this; + } + + public Stream endHandler(Consumer endHandler) { + this.endHandler = endHandler; + return this; + } + + public Stream chunkHandler(Consumer chunkHandler) { + this.chunkHandler = chunkHandler; + return this; + } + + public void GET(String path) throws Exception{ + Http3Headers headers = new DefaultHttp3Headers(); + headers.method("GET"); + headers.path(path); + end(headers); + } + + public void POST(String path, byte[] body) throws Exception{ + Http3Headers headers = new DefaultHttp3Headers(); + headers.method("POST"); + headers.path(path); + headers.set(HttpHeaderNames.CONTENT_LENGTH, "" + body.length); + write(headers, false); + writeBody(body); + } + + public void writeUnknownFrame(int type, byte[] payload) throws Exception { + DefaultHttp3UnknownFrame frame = new DefaultHttp3UnknownFrame(type, Unpooled.wrappedBuffer(payload)); + ChannelFuture fut = streamChannel.writeAndFlush(frame); + fut.sync(); + } + + public void write(Http3Headers headers) throws Exception { + write(headers, false); + } + + public void end(Http3Headers headers) throws Exception { + write(headers, true); + } + + public void write(Http3Headers headers, boolean end) throws Exception { + if (!headersSent) { + headersSent = true; + if (headers.authority() == null) { + headers.authority( connection.address.getHostName()+ ":" + connection.address.getPort()); + } + if (headers.scheme() == null) { + headers.scheme("https"); + } + if (headers.method() == null) { + headers.method("GET"); + } + if (headers.path() == null) { + headers.path("/"); + } + } + Http3HeadersFrame frame = new DefaultHttp3HeadersFrame(headers); + ChannelFuture fut = streamChannel.writeAndFlush(frame); + if (end) { + fut.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT); + } + fut.sync(); + } + + public void write(byte[] chunk) throws Exception { + write(chunk, false); + } + + public void end() throws Exception { + write(new byte[0], true); + } + + public void end(byte[] chunk) throws Exception { + write(chunk, true); + } + + public void write(byte[] chunk, boolean end) throws Exception { + if (!headersSent) { + throw new IllegalStateException(); + } + Http3DataFrame frame = new DefaultHttp3DataFrame(Unpooled.wrappedBuffer(chunk)); + ChannelFuture fut = streamChannel.writeAndFlush(frame); + if (end) { + fut.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT); } - }).sync().getNow(); - - // Write the Header frame and send the FIN to mark the end of the request. - // After this its not possible anymore to write any more data. - Http3HeadersFrame frame = new DefaultHttp3HeadersFrame(); - frame.headers().method("GET").path("/") - .authority(NetUtil.LOCALHOST4.getHostAddress() + ":" + port) - .scheme("https"); - streamChannel.writeAndFlush(frame) - .addListener(QuicStreamChannel.SHUTDOWN_OUTPUT).sync(); - - // Wait for the stream channel and quic channel to be closed (this will happen after we received the FIN). - // After this is done we will close the underlying datagram channel. - streamChannel.closeFuture().sync(); - - // After we received the response lets also close the underlying QUIC channel and datagram channel. - quicChannel.close().sync(); - channel.close().sync(); + fut.sync(); + } + + public void writeBody(byte[] chunk) throws Exception { + write(chunk, true); + } + + public Http3Headers responseHeaders() { + return responseHeaders; + } + + public byte[] responseBody() throws Exception { + try { + return responseBody.get(10, TimeUnit.SECONDS); + } catch (ExecutionException e) { + throw (Exception)e.getCause(); + } + } + + public void reset(int errorCode) throws Exception { + if (streamChannel == null) { + throw new IllegalStateException(); + } + streamChannel.shutdownOutput(errorCode).sync(); + } + } } + +// public static void client(EventLoopGroup group, int port) throws Exception { +// +// +// QuicChannel quicChannel = QuicChannel.newBootstrap(channel) +// .handler(new Http3ClientConnectionHandler()) +// .remoteAddress(new InetSocketAddress(NetUtil.LOCALHOST4, port)) +// .connect() +// .get(); +// +// QuicStreamChannel streamChannel = Http3.newRequestStream(quicChannel, +// new Http3RequestStreamInboundHandler() { +// @Override +// protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame frame) { +// ReferenceCountUtil.release(frame); +// } +// +// @Override +// protected void channelRead(ChannelHandlerContext ctx, Http3DataFrame frame) { +// System.out.print(frame.content().toString(CharsetUtil.US_ASCII)); +// ReferenceCountUtil.release(frame); +// } +// +// @Override +// protected void channelInputClosed(ChannelHandlerContext ctx) { +// ctx.close(); +// } +// }).sync().getNow(); +// +// // Write the Header frame and send the FIN to mark the end of the request. +// // After this its not possible anymore to write any more data. +// Http3HeadersFrame frame = new DefaultHttp3HeadersFrame(); +// frame.headers().method("GET").path("/") +// .authority(NetUtil.LOCALHOST4.getHostAddress() + ":" + port) +// .scheme("https"); +// streamChannel.writeAndFlush(frame) +// .addListener(QuicStreamChannel.SHUTDOWN_OUTPUT).sync(); +// +// // Wait for the stream channel and quic channel to be closed (this will happen after we received the FIN). +// // After this is done we will close the underlying datagram channel. +// streamChannel.closeFuture().sync(); +// +// // After we received the response lets also close the underlying QUIC channel and datagram channel. +// quicChannel.close().sync(); +// channel.close().sync(); +// } } diff --git a/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3ServerTest.java b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3ServerTest.java new file mode 100644 index 00000000000..fab67550b39 --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/tests/http/http3/Http3ServerTest.java @@ -0,0 +1,242 @@ +package io.vertx.tests.http.http3; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.http3.DefaultHttp3Headers; +import io.netty.handler.codec.quic.QuicException; +import io.netty.util.NetUtil; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.*; +import io.vertx.test.core.LinuxOrOsx; +import io.vertx.test.core.VertxTestBase; +import io.vertx.test.tls.Cert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@RunWith(LinuxOrOsx.class) +public class Http3ServerTest extends VertxTestBase { + + private HttpServer server; + private Http3NettyTest.Client client; + + @Override + public void setUp() throws Exception { + super.setUp(); + client = Http3NettyTest.client(new NioEventLoopGroup(1)); + Http3ServerOptions options = new Http3ServerOptions(); + options.getSslOptions().setKeyCertOptions(Cert.SERVER_JKS.get()); +// options.setClientAddressValidation(QuicClientAddressValidation.NONE); +// options.setKeyLogFile("/Users/julien/keylogfile.txt"); + server = vertx.createHttpServer(options); + } + + @Override + protected void tearDown() throws Exception { + server.close().await(); + client.close(); + super.tearDown(); + } + + @Test + public void testGet() throws Exception{ + + server.requestHandler(req -> { + req.endHandler(v -> { + req.response().end("Hello World"); + }); + }); + + server.listen(8443, "localhost").await(); + + Http3NettyTest.Client.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 8443)); + Http3NettyTest.Client.Stream stream = connection.stream(); + stream.GET("/"); + assertEquals("Hello World", new String(stream.responseBody())); + } + + @Test + public void testPost() throws Exception{ + + server.requestHandler(req -> { + assertEquals(HttpMethod.POST, req.method()); + req.bodyHandler(body -> { + req.response().end(body); + }); + }); + + server.listen(8443, "localhost").await(); + + Http3NettyTest.Client.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 8443)); + Http3NettyTest.Client.Stream stream = connection.stream(); + stream.POST("/", "Hello World".getBytes(StandardCharsets.UTF_8)); + assertEquals("Hello World", new String(stream.responseBody())); + } + + @Test + public void testTrailers() throws Exception{ + + server.requestHandler(req -> { + req.bodyHandler(body -> { + // No API to get client trailers + req.response().end(body); + }); + }); + + server.listen(8443, "localhost").await(); + + Http3NettyTest.Client.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 8443)); + Http3NettyTest.Client.Stream stream = connection.stream(); + + stream.write(new DefaultHttp3Headers().method("GET").path("/")); + stream.write("chunk".getBytes(StandardCharsets.UTF_8)); + stream.end(new DefaultHttp3Headers().set("key", "value")); + + + assertEquals("chunk", new String(stream.responseBody())); + } + + @Test + public void testUnknownFrame() throws Exception{ + + server.requestHandler(req -> { + Buffer content = Buffer.buffer(); + req.customFrameHandler(frame -> { + assertEquals(64, frame.type()); + assertEquals(0, frame.flags()); + content.appendBuffer(frame.payload()); + }); + req.endHandler(v -> { + req.response().end(content); + }); + }); + + server.listen(8443, "localhost").await(); + + Http3NettyTest.Client.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 8443)); + Http3NettyTest.Client.Stream stream = connection.stream(); + + stream.write(new DefaultHttp3Headers().method("GET").path("/")); + stream.writeUnknownFrame(64, "ping".getBytes(StandardCharsets.UTF_8)); + stream.end(); + + assertEquals("ping", new String(stream.responseBody())); + } + + @Test + public void testServerConnectionShutdown() throws Exception{ + + disableThreadChecks(); + waitFor(2); + + server.requestHandler(req -> { + long now = System.currentTimeMillis(); + req.connection().shutdown(10, TimeUnit.SECONDS) + .onComplete(onSuccess2(v -> { + assertTrue(System.currentTimeMillis() - now >= 1000); + complete(); + })); + vertx.setTimer(1000, id -> { + req.response().end("Hello World"); + }); + }); + + server.listen(8443, "localhost").await(); + + Http3NettyTest.Client.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 8443)); + connection.goAwayHandler(this::complete); + Http3NettyTest.Client.Stream stream = connection.stream(); + stream.GET("/"); + assertEquals("Hello World", new String(stream.responseBody())); + + await(); + } + + @Test + public void testServerConnectionShutdownTimeout() throws Exception{ + + disableThreadChecks(); + waitFor(2); + + server.requestHandler(req -> { + long now = System.currentTimeMillis(); + req.connection().shutdown(1, TimeUnit.SECONDS) + .onComplete(onSuccess2(v -> { + assertTrue(System.currentTimeMillis() - now <= 2000); + complete(); + })); + }); + + server.listen(8443, "localhost").await(); + + Http3NettyTest.Client.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 8443)); + connection.goAwayHandler(this::complete); + Http3NettyTest.Client.Stream stream = connection.stream(); + stream.GET("/"); + assertEquals("", new String(stream.responseBody())); + + await(); + } + + @Test + public void testServerResetPartialResponse() throws Exception { + + server.requestHandler(req -> { + HttpServerResponse response = req.response(); + response.setChunked(true).write("chunk") + .onComplete(onSuccess2(v1 -> { + response.reset().onComplete(onSuccess(v2 -> testComplete())); + })); + }); + + server.listen(8443, "localhost").await(); + + Http3NettyTest.Client.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 8443)); + Http3NettyTest.Client.Stream stream = connection.stream(); + stream.GET("/"); + try { + stream.responseBody(); + fail(); + } catch (QuicException expected) { + assertEquals("STREAM_RESET", expected.getMessage()); + } + + await(); + } + + @Test + public void testServerResponseResetUponClientPartialRequestResetByClient() throws Exception { + + CountDownLatch latch = new CountDownLatch(1); + + server.requestHandler(req -> { + req.exceptionHandler(err -> { + if (err instanceof StreamResetException) { + testComplete(); + } + }); + latch.countDown(); + }); + + server.listen(8443, "localhost").await(); + + Http3NettyTest.Client.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 8443)); + Http3NettyTest.Client.Stream stream = connection.stream(); + stream.write(new DefaultHttp3Headers()); + + awaitLatch(latch); + stream.reset(4); + + try { + stream.responseBody(); + fail(); + } catch (QuicException e) { + assertEquals("STREAM_RESET", e.getMessage()); + } + + await(); + } +}