diff --git a/pom.xml b/pom.xml
index 59ae5d63483..b50edc08658 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,11 @@
io.netty
netty-codec-http2
+
+ io.netty.incubator
+ netty-incubator-codec-http3
+ 0.0.28.Final
+
io.netty
netty-resolver
diff --git a/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java b/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java
index f5a5efe0ef3..96450e45370 100644
--- a/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java
+++ b/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java
@@ -165,11 +165,6 @@ static void fromJson(Iterable> json, HttpCli
obj.setPoolEventLoopSize(((Number)member.getValue()).intValue());
}
break;
- case "protocolVersion":
- if (member.getValue() instanceof String) {
- obj.setProtocolVersion(io.vertx.core.http.HttpVersion.valueOf((String)member.getValue()));
- }
- break;
case "sendUnmaskedFrames":
if (member.getValue() instanceof Boolean) {
obj.setSendUnmaskedFrames((Boolean)member.getValue());
@@ -274,9 +269,6 @@ static void toJson(HttpClientOptions obj, java.util.Map json) {
json.put("pipeliningLimit", obj.getPipeliningLimit());
json.put("poolCleanerPeriod", obj.getPoolCleanerPeriod());
json.put("poolEventLoopSize", obj.getPoolEventLoopSize());
- if (obj.getProtocolVersion() != null) {
- json.put("protocolVersion", obj.getProtocolVersion().name());
- }
json.put("sendUnmaskedFrames", obj.isSendUnmaskedFrames());
json.put("shared", obj.isShared());
if (obj.getTracingPolicy() != null) {
diff --git a/src/main/generated/io/vertx/core/net/ClientOptionsBaseConverter.java b/src/main/generated/io/vertx/core/net/ClientOptionsBaseConverter.java
index a9fffd93a90..55ff5617b17 100644
--- a/src/main/generated/io/vertx/core/net/ClientOptionsBaseConverter.java
+++ b/src/main/generated/io/vertx/core/net/ClientOptionsBaseConverter.java
@@ -45,6 +45,11 @@ static void fromJson(Iterable> json, ClientO
obj.setNonProxyHosts(list);
}
break;
+ case "protocolVersion":
+ if (member.getValue() instanceof String) {
+ obj.setProtocolVersion(io.vertx.core.http.HttpVersion.valueOf((String)member.getValue()));
+ }
+ break;
case "proxyOptions":
if (member.getValue() instanceof JsonObject) {
obj.setProxyOptions(new io.vertx.core.net.ProxyOptions((io.vertx.core.json.JsonObject)member.getValue()));
@@ -76,6 +81,9 @@ static void toJson(ClientOptionsBase obj, java.util.Map json) {
obj.getNonProxyHosts().forEach(item -> array.add(item));
json.put("nonProxyHosts", array);
}
+ if (obj.getProtocolVersion() != null) {
+ json.put("protocolVersion", obj.getProtocolVersion().name());
+ }
if (obj.getProxyOptions() != null) {
json.put("proxyOptions", obj.getProxyOptions().toJson());
}
diff --git a/src/main/generated/io/vertx/core/net/SSLOptionsConverter.java b/src/main/generated/io/vertx/core/net/SSLOptionsConverter.java
index ed5e57e40c9..a502068d3ee 100644
--- a/src/main/generated/io/vertx/core/net/SSLOptionsConverter.java
+++ b/src/main/generated/io/vertx/core/net/SSLOptionsConverter.java
@@ -54,6 +54,11 @@ static void fromJson(Iterable> json, SSLOpti
obj.setEnabledSecureTransportProtocols(list);
}
break;
+ case "http3":
+ if (member.getValue() instanceof Boolean) {
+ obj.setHttp3((Boolean)member.getValue());
+ }
+ break;
case "sslHandshakeTimeout":
if (member.getValue() instanceof Number) {
obj.setSslHandshakeTimeout(((Number)member.getValue()).longValue());
@@ -98,6 +103,7 @@ static void toJson(SSLOptions obj, java.util.Map json) {
obj.getEnabledSecureTransportProtocols().forEach(item -> array.add(item));
json.put("enabledSecureTransportProtocols", array);
}
+ json.put("http3", obj.isHttp3());
json.put("sslHandshakeTimeout", obj.getSslHandshakeTimeout());
if (obj.getSslHandshakeTimeoutUnit() != null) {
json.put("sslHandshakeTimeoutUnit", obj.getSslHandshakeTimeoutUnit().name());
diff --git a/src/main/java/examples/HTTP2Examples.java b/src/main/java/examples/HTTP2Examples.java
index fc10ec876e0..6e62bb02603 100644
--- a/src/main/java/examples/HTTP2Examples.java
+++ b/src/main/java/examples/HTTP2Examples.java
@@ -113,17 +113,18 @@ public void example6(HttpServerRequest request) {
public void example7(Vertx vertx) {
HttpClientOptions options = new HttpClientOptions().
- setProtocolVersion(HttpVersion.HTTP_2).
setSsl(true).
setUseAlpn(true).
setTrustAll(true);
+ options.setProtocolVersion(HttpVersion.HTTP_2);
HttpClient client = vertx.createHttpClient(options);
}
public void example8(Vertx vertx) {
- HttpClientOptions options = new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2);
+ HttpClientOptions options = new HttpClientOptions();
+ options.setProtocolVersion(HttpVersion.HTTP_2);
HttpClient client = vertx.createHttpClient(options);
}
diff --git a/src/main/java/examples/HTTP3Examples.java b/src/main/java/examples/HTTP3Examples.java
new file mode 100644
index 00000000000..8b4aeea04ee
--- /dev/null
+++ b/src/main/java/examples/HTTP3Examples.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+
+package examples;
+
+import io.netty.incubator.codec.http3.DefaultHttp3SettingsFrame;
+import io.netty.incubator.codec.http3.Http3SettingsFrame;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.http.HttpClientResponse;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpVersion;
+
+/**
+ * @author Iman Zolfaghari
+ */
+public class HTTP3Examples {
+
+ public void example01(Vertx vertx) {
+
+ DefaultHttp3SettingsFrame settings = new DefaultHttp3SettingsFrame();
+ settings.put(Http3SettingsFrame.HTTP3_SETTINGS_MAX_FIELD_SECTION_SIZE,
+ 100000000000L);
+
+ HttpClientOptions options = new HttpClientOptions().
+ setSsl(true).
+ setUseAlpn(true).
+ setHttp3InitialSettings(settings).
+ setTrustAll(true);
+ options.setProtocolVersion(HttpVersion.HTTP_3);
+
+ HttpClient client = vertx.createHttpClient(options);
+
+ client.request(HttpMethod.GET, 443, "www.google.com", "/")
+// client.request(HttpMethod.GET, 9999, NetUtil.LOCALHOST4.getHostAddress(), "/")
+// client.request(HttpMethod.GET, 443, "www.mozilla.org", "/")
+// client.request(HttpMethod.GET, 443, "www.bing.com", "/")
+// client.request(HttpMethod.GET, 443, "www.yahoo.com", "/")
+// client.request(HttpMethod.GET, 443, "http3.is", "/")
+ .compose(req -> {
+
+ req.connection().goAwayHandler(goAway -> {
+ System.out.println(" Received goAway from server! ");
+ });
+
+ req.connection().shutdownHandler(v -> {
+ System.out.println(" Received shutdown signal! ");
+ req.connection().close();
+ vertx.close();
+ });
+
+ return req
+ .end()
+ .compose(res -> req
+ .response()
+ .onSuccess(resp -> {
+ System.out.println("The returned headers are: " + resp.headers());
+ System.out.println("The returned Alt-Svc is: " + resp.headers().get(
+ "Alt-Svc"));
+ }).compose(HttpClientResponse::body).onSuccess(body ->
+ System.out.println("The response body is: " + body.toString()))
+ );
+ })
+ .onFailure(Throwable::printStackTrace)
+ .onComplete(event -> vertx.close())
+ ;
+ }
+
+ public static void main(String[] args) {
+ new HTTP3Examples().example01(Vertx.vertx());
+ }
+}
diff --git a/src/main/java/io/vertx/core/http/Http2StreamPriority.java b/src/main/java/io/vertx/core/http/Http2StreamPriority.java
new file mode 100644
index 00000000000..0e0578fe84f
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/Http2StreamPriority.java
@@ -0,0 +1,65 @@
+package io.vertx.core.http;
+
+public class Http2StreamPriority implements StreamPriorityBase {
+ private final StreamPriority streamPriority;
+
+ public Http2StreamPriority(StreamPriority streamPriority) {
+ this.streamPriority = streamPriority;
+ }
+
+ public Http2StreamPriority() {
+ this(new StreamPriority());
+ }
+
+ public short getWeight() {
+ return this.streamPriority.getWeight();
+ }
+
+ public Http2StreamPriority setWeight(short weight) {
+ this.streamPriority.setWeight(weight);
+ return this;
+ }
+
+ public int getDependency() {
+ return this.streamPriority.getDependency();
+ }
+
+ public Http2StreamPriority setDependency(int dependency) {
+ this.streamPriority.setDependency(dependency);
+ return this;
+ }
+
+ public boolean isExclusive() {
+ return this.streamPriority.isExclusive();
+ }
+
+ public Http2StreamPriority setExclusive(boolean exclusive) {
+ this.streamPriority.setExclusive(exclusive);
+ return this;
+ }
+
+ @Override
+ public int urgency() {
+ throw new RuntimeException("Not Http3 Priority!");
+ }
+
+ @Override
+ public boolean isIncremental() {
+ throw new RuntimeException("Not Http3 Priority!");
+ }
+
+ @Override
+ public int hashCode() {
+ return this.streamPriority.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof Http2StreamPriority && this.streamPriority.equals(((Http2StreamPriority) obj).streamPriority);
+ }
+
+ @Override
+ public String toString() {
+ return this.streamPriority.toString();
+ }
+}
diff --git a/src/main/java/io/vertx/core/http/Http3StreamPriority.java b/src/main/java/io/vertx/core/http/Http3StreamPriority.java
new file mode 100644
index 00000000000..bfe1643dfd1
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/Http3StreamPriority.java
@@ -0,0 +1,64 @@
+package io.vertx.core.http;
+
+import io.netty.incubator.codec.quic.QuicStreamPriority;
+
+public class Http3StreamPriority implements StreamPriorityBase {
+ private final QuicStreamPriority quicStreamPriority;
+
+ public Http3StreamPriority(QuicStreamPriority quicStreamPriority) {
+ this.quicStreamPriority = quicStreamPriority;
+ }
+
+ public int urgency() {
+ return this.quicStreamPriority.urgency();
+ }
+
+ public boolean isIncremental() {
+ return this.quicStreamPriority.isIncremental();
+ }
+
+ @Override
+ public short getWeight() {
+ throw new RuntimeException("Not Http2 Priority!");
+ }
+
+ @Override
+ public StreamPriorityBase setWeight(short weight) {
+ throw new RuntimeException("Not Http2 Priority!");
+ }
+
+ @Override
+ public int getDependency() {
+ throw new RuntimeException("Not Http2 Priority!");
+ }
+
+ @Override
+ public StreamPriorityBase setDependency(int dependency) {
+ throw new RuntimeException("Not Http2 Priority!");
+ }
+
+ @Override
+ public boolean isExclusive() {
+ throw new RuntimeException("Not Http2 Priority!");
+ }
+
+ @Override
+ public StreamPriorityBase setExclusive(boolean exclusive) {
+ throw new RuntimeException("Not Http2 Priority!");
+ }
+
+ @Override
+ public int hashCode() {
+ return this.quicStreamPriority.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof Http3StreamPriority && this.quicStreamPriority.equals(((Http3StreamPriority) obj).quicStreamPriority);
+ }
+
+ @Override
+ public String toString() {
+ return this.quicStreamPriority.toString();
+ }
+}
diff --git a/src/main/java/io/vertx/core/http/HttpClientOptions.java b/src/main/java/io/vertx/core/http/HttpClientOptions.java
index d50cb3af581..7a001c11eb4 100755
--- a/src/main/java/io/vertx/core/http/HttpClientOptions.java
+++ b/src/main/java/io/vertx/core/http/HttpClientOptions.java
@@ -11,6 +11,8 @@
package io.vertx.core.http;
+import io.netty.incubator.codec.http3.DefaultHttp3SettingsFrame;
+import io.netty.incubator.codec.http3.Http3SettingsFrame;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.Arguments;
@@ -115,11 +117,6 @@ public class HttpClientOptions extends ClientOptionsBase {
*/
public static final int DEFAULT_DEFAULT_PORT = 80;
- /**
- * The default protocol version = HTTP/1.1
- */
- public static final HttpVersion DEFAULT_PROTOCOL_VERSION = HttpVersion.HTTP_1_1;
-
/**
* Default max HTTP chunk size = 8192
*/
@@ -249,12 +246,12 @@ public class HttpClientOptions extends ClientOptionsBase {
private int maxWebSockets;
private String defaultHost;
private int defaultPort;
- private HttpVersion protocolVersion;
private int maxChunkSize;
private int maxInitialLineLength;
private int maxHeaderSize;
private int maxWaitQueueSize;
private Http2Settings initialSettings;
+ private Http3SettingsFrame http3InitialSettings;
private List alpnVersions;
private boolean http2ClearTextUpgrade;
private boolean http2ClearTextUpgradeWithPreflightRequest;
@@ -306,12 +303,12 @@ public HttpClientOptions(HttpClientOptions other) {
this.maxWebSockets = other.maxWebSockets;
this.defaultHost = other.defaultHost;
this.defaultPort = other.defaultPort;
- this.protocolVersion = other.protocolVersion;
this.maxChunkSize = other.maxChunkSize;
this.maxInitialLineLength = other.getMaxInitialLineLength();
this.maxHeaderSize = other.getMaxHeaderSize();
this.maxWaitQueueSize = other.maxWaitQueueSize;
this.initialSettings = other.initialSettings != null ? new Http2Settings(other.initialSettings) : null;
+ this.http3InitialSettings = other.http3InitialSettings != null ? DefaultHttp3SettingsFrame.copyOf(other.http3InitialSettings) : null;
this.alpnVersions = other.alpnVersions != null ? new ArrayList<>(other.alpnVersions) : null;
this.http2ClearTextUpgrade = other.http2ClearTextUpgrade;
this.http2ClearTextUpgradeWithPreflightRequest = other.http2ClearTextUpgradeWithPreflightRequest;
@@ -371,12 +368,12 @@ private void init() {
maxWebSockets = DEFAULT_MAX_WEBSOCKETS;
defaultHost = DEFAULT_DEFAULT_HOST;
defaultPort = DEFAULT_DEFAULT_PORT;
- protocolVersion = DEFAULT_PROTOCOL_VERSION;
maxChunkSize = DEFAULT_MAX_CHUNK_SIZE;
maxInitialLineLength = DEFAULT_MAX_INITIAL_LINE_LENGTH;
maxHeaderSize = DEFAULT_MAX_HEADER_SIZE;
maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
initialSettings = new Http2Settings();
+ http3InitialSettings = new DefaultHttp3SettingsFrame();
alpnVersions = new ArrayList<>(DEFAULT_ALPN_VERSIONS);
http2ClearTextUpgrade = DEFAULT_HTTP2_CLEAR_TEXT_UPGRADE;
http2ClearTextUpgradeWithPreflightRequest = DEFAULT_HTTP2_CLEAR_TEXT_UPGRADE_WITH_PREFLIGHT_REQUEST;
@@ -973,29 +970,6 @@ public HttpClientOptions setDefaultPort(int defaultPort) {
return this;
}
- /**
- * Get the protocol version.
- *
- * @return the protocol version
- */
- public HttpVersion getProtocolVersion() {
- return protocolVersion;
- }
-
- /**
- * Set the protocol version.
- *
- * @param protocolVersion the protocol version
- * @return a reference to this, so the API can be used fluently
- */
- public HttpClientOptions setProtocolVersion(HttpVersion protocolVersion) {
- if (protocolVersion == null) {
- throw new IllegalArgumentException("protocolVersion must not be null");
- }
- this.protocolVersion = protocolVersion;
- return this;
- }
-
/**
* Set the maximum HTTP chunk size
* @param maxChunkSize the maximum chunk size
@@ -1087,6 +1061,24 @@ public HttpClientOptions setInitialSettings(Http2Settings settings) {
return this;
}
+ /**
+ * @return the initial HTTP/3 connection settings
+ */
+ public Http3SettingsFrame getHttp3InitialSettings() {
+ return http3InitialSettings;
+ }
+
+ /**
+ * Set the HTTP/3 connection settings immediately sent by to the server when the client connects.
+ *
+ * @param settings the settings value
+ * @return a reference to this, so the API can be used fluently
+ */
+ public HttpClientOptions setHttp3InitialSettings(Http3SettingsFrame settings) {
+ this.http3InitialSettings = settings;
+ return this;
+ }
+
@Override
public HttpClientOptions setUseAlpn(boolean useAlpn) {
return (HttpClientOptions) super.setUseAlpn(useAlpn);
@@ -1479,4 +1471,10 @@ public HttpClientOptions setName(String name) {
this.name = name;
return this;
}
+
+ @Override
+ public HttpClientOptions setProtocolVersion(HttpVersion protocolVersion) {
+ super.setProtocolVersion(protocolVersion);
+ return this;
+ }
}
diff --git a/src/main/java/io/vertx/core/http/HttpClientRequest.java b/src/main/java/io/vertx/core/http/HttpClientRequest.java
index 59e8b7942de..379ea41cebc 100644
--- a/src/main/java/io/vertx/core/http/HttpClientRequest.java
+++ b/src/main/java/io/vertx/core/http/HttpClientRequest.java
@@ -623,13 +623,13 @@ default HttpClientRequest writeCustomFrame(HttpFrame frame) {
* @param streamPriority the priority of this request's stream
*/
@Fluent
- default HttpClientRequest setStreamPriority(StreamPriority streamPriority) {
+ default HttpClientRequest setStreamPriority(StreamPriorityBase streamPriority) {
return this;
}
/**
* @return the priority of the associated HTTP/2 stream for HTTP/2 otherwise {@code null}
*/
- StreamPriority getStreamPriority();
+ StreamPriorityBase getStreamPriority();
}
diff --git a/src/main/java/io/vertx/core/http/HttpClientResponse.java b/src/main/java/io/vertx/core/http/HttpClientResponse.java
index 7df3cc99b84..b9bc260f7ed 100644
--- a/src/main/java/io/vertx/core/http/HttpClientResponse.java
+++ b/src/main/java/io/vertx/core/http/HttpClientResponse.java
@@ -11,7 +11,6 @@
package io.vertx.core.http;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.vertx.codegen.annotations.*;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
@@ -190,5 +189,5 @@ default void end(Handler> handler) {
* @param handler the handler to be called when the stream priority changes
*/
@Fluent
- HttpClientResponse streamPriorityHandler(Handler handler);
+ HttpClientResponse streamPriorityHandler(Handler handler);
}
diff --git a/src/main/java/io/vertx/core/http/HttpServerRequest.java b/src/main/java/io/vertx/core/http/HttpServerRequest.java
index 362c63e0888..7ee3925e7a8 100644
--- a/src/main/java/io/vertx/core/http/HttpServerRequest.java
+++ b/src/main/java/io/vertx/core/http/HttpServerRequest.java
@@ -473,7 +473,7 @@ default void toWebSocket(Handler> handler) {
/**
* @return the priority of the associated HTTP/2 stream for HTTP/2 otherwise {@code null}
*/
- default StreamPriority streamPriority() {
+ default StreamPriorityBase streamPriority() {
return null;
}
@@ -485,7 +485,7 @@ default StreamPriority streamPriority() {
* @param handler the handler to be called when stream priority changes
*/
@Fluent
- HttpServerRequest streamPriorityHandler(Handler handler);
+ HttpServerRequest streamPriorityHandler(Handler handler);
/**
* @return Netty's decoder result useful for handling invalid requests with {@link HttpServer#invalidRequestHandler}
diff --git a/src/main/java/io/vertx/core/http/HttpServerResponse.java b/src/main/java/io/vertx/core/http/HttpServerResponse.java
index 75f9e6a51aa..8b50141f6b7 100644
--- a/src/main/java/io/vertx/core/http/HttpServerResponse.java
+++ b/src/main/java/io/vertx/core/http/HttpServerResponse.java
@@ -668,7 +668,7 @@ default HttpServerResponse writeCustomFrame(HttpFrame frame) {
* @param streamPriority the priority for this request's stream
*/
@Fluent
- default HttpServerResponse setStreamPriority(StreamPriority streamPriority) {
+ default HttpServerResponse setStreamPriority(StreamPriorityBase streamPriority) {
return this;
}
diff --git a/src/main/java/io/vertx/core/http/HttpVersion.java b/src/main/java/io/vertx/core/http/HttpVersion.java
index 216a822d44a..4dfbab3e9f0 100644
--- a/src/main/java/io/vertx/core/http/HttpVersion.java
+++ b/src/main/java/io/vertx/core/http/HttpVersion.java
@@ -20,7 +20,7 @@
*/
@VertxGen
public enum HttpVersion {
- HTTP_1_0("http/1.0"), HTTP_1_1("http/1.1"), HTTP_2("h2");
+ HTTP_1_0("http/1.0"), HTTP_1_1("http/1.1"), HTTP_2("h2"), HTTP_3("h3");
private final String alpnName;
diff --git a/src/main/java/io/vertx/core/http/StreamPriorityBase.java b/src/main/java/io/vertx/core/http/StreamPriorityBase.java
new file mode 100644
index 00000000000..f6d61349c30
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/StreamPriorityBase.java
@@ -0,0 +1,20 @@
+package io.vertx.core.http;
+
+public interface StreamPriorityBase {
+ short getWeight();
+
+ StreamPriorityBase setWeight(short weight);
+
+ int getDependency();
+
+ StreamPriorityBase setDependency(int dependency);
+
+ boolean isExclusive();
+
+ StreamPriorityBase setExclusive(boolean exclusive);
+
+ int urgency();
+
+ boolean isIncremental();
+
+}
diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
index eb58e570326..5e965d770a7 100644
--- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
+++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
@@ -64,7 +64,7 @@
import io.vertx.core.http.HttpFrame;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
-import io.vertx.core.http.StreamPriority;
+import io.vertx.core.http.StreamPriorityBase;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebsocketVersion;
import io.vertx.core.http.impl.headers.HeadersAdaptor;
@@ -502,7 +502,7 @@ public void closeHandler(Handler handler) {
}
@Override
- public void priorityHandler(Handler handler) {
+ public void priorityHandler(Handler handler) {
// No op
}
@@ -547,7 +547,7 @@ public ContextInternal getContext() {
}
@Override
- public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect, Handler> handler) {
+ public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriorityBase priority, boolean connect, Handler> handler) {
writeHead(request, chunked, buf, end, connect, handler == null ? null : context.promise(handler));
}
@@ -660,12 +660,12 @@ private void _reset(Throwable cause) {
}
@Override
- public StreamPriority priority() {
+ public StreamPriorityBase priority() {
return null;
}
@Override
- public void updatePriority(StreamPriority streamPriority) {
+ public void updatePriority(StreamPriorityBase streamPriority) {
}
@Override
@@ -734,6 +734,11 @@ private void tryClose() {
}
}
}
+
+ @Override
+ public StreamPriorityBase createDefaultStreamPriority() {
+ return HttpUtils.DEFAULT_STREAM_PRIORITY;
+ }
}
private void checkLifecycle() {
diff --git a/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java
index 11a5648ab6f..6bc273f0856 100644
--- a/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java
+++ b/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java
@@ -47,6 +47,7 @@
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
@@ -312,7 +313,7 @@ String getServerOrigin() {
return serverOrigin;
}
- Vertx vertx() {
+ public VertxInternal vertx() {
return vertx;
}
diff --git a/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java b/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java
index d895ecd9f09..be873246147 100644
--- a/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java
+++ b/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java
@@ -705,7 +705,7 @@ private MultiMap attributes() {
}
@Override
- public HttpServerRequest streamPriorityHandler(Handler handler) {
+ public HttpServerRequest streamPriorityHandler(Handler handler) {
return this;
}
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
index dce54c69b49..252bdc97b43 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
@@ -11,11 +11,7 @@
package io.vertx.core.http.impl;
-import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
@@ -24,29 +20,22 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
-import io.vertx.core.MultiMap;
-import io.vertx.core.VertxException;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.http.*;
-import io.vertx.core.http.impl.headers.HeadersMultiMap;
+import io.vertx.core.http.GoAway;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.http.StreamPriorityBase;
import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;
+import io.vertx.core.http.impl.headers.VertxHttp2Headers;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;
-import io.vertx.core.spi.tracing.SpanKind;
-import io.vertx.core.spi.tracing.VertxTracer;
-import io.vertx.core.streams.WriteStream;
-
-import java.util.Map;
-import java.util.function.BiConsumer;
/**
* @author Julien Viet
*/
class Http2ClientConnection extends Http2ConnectionBase implements HttpClientConnection {
- private final HttpClientImpl client;
+ public final HttpClientImpl client;
private final ClientMetrics metrics;
private Handler evictionHandler = DEFAULT_EVICTION_HANDLER;
private Handler concurrencyChangeHandler = DEFAULT_CONCURRENCY_CHANGE_HANDLER;
@@ -133,7 +122,7 @@ void upgradeStream(Object metric, Object trace, ContextInternal context, Handler
Future fut;
synchronized (this) {
try {
- Stream stream = createStream(context);
+ HttpStream stream = createStream(context);
stream.init(handler.connection().stream(1));
stream.metric = metric;
stream.trace = trace;
@@ -151,7 +140,7 @@ public void createStream(ContextInternal context, Handler fut;
synchronized (this) {
try {
- StreamImpl stream = createStream(context);
+ HttpStreamImpl stream = createStream(context);
fut = Future.succeededFuture(stream);
} catch (Exception e) {
fut = Future.failedFuture(e);
@@ -160,11 +149,11 @@ public void createStream(ContextInternal context, Handler createStream(ContextInternal context) {
+ return new Http2ClientStream(this, context, false, metrics);
}
- private void recycle() {
+ public void recycle() {
int timeout = client.options().getHttp2KeepAliveTimeout();
expirationTimestamp = timeout > 0 ? System.currentTimeMillis() + timeout * 1000L : 0L;
}
@@ -179,10 +168,10 @@ public long lastResponseReceivedTimestamp() {
return 0L;
}
- protected synchronized void onHeadersRead(int streamId, Http2Headers headers, StreamPriority streamPriority, boolean endOfStream) {
- Stream stream = (Stream) stream(streamId);
- if (!stream.stream.isTrailersReceived()) {
- stream.onHeaders(headers, streamPriority);
+ protected synchronized void onHeadersRead(int streamId, Http2Headers headers, StreamPriorityBase streamPriority, boolean endOfStream) {
+ HttpStream stream = (HttpStream) stream(streamId);
+ if (!stream.isTrailersReceived()) {
+ stream.onHeaders(new VertxHttp2Headers(headers), streamPriority);
if (endOfStream) {
stream.onEnd();
}
@@ -191,7 +180,7 @@ protected synchronized void onHeadersRead(int streamId, Http2Headers headers, St
}
}
- private void metricsEnd(Stream stream) {
+ public void metricsEnd(HttpStream stream) {
if (metrics != null) {
metrics.responseEnd(stream.metric, stream.bytesRead());
}
@@ -199,12 +188,13 @@ private void metricsEnd(Stream stream) {
@Override
public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
- StreamImpl stream = (StreamImpl) stream(streamId);
+ HttpStreamImpl stream = (HttpStreamImpl) stream(streamId);
if (stream != null) {
Handler pushHandler = stream.pushHandler;
if (pushHandler != null) {
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
- StreamImpl pushStream = new StreamImpl(this, context, true);
+ HttpStreamImpl pushStream = new Http2ClientStream(this, context,
+ true, metrics);
pushStream.init(promisedStream);
HttpClientPush push = new HttpClientPush(headers, pushStream);
if (metrics != null) {
@@ -220,457 +210,6 @@ public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int stream
Http2ClientConnection.this.handler.writeReset(promisedStreamId, Http2Error.CANCEL.code());
}
- //
- static abstract class Stream extends VertxHttp2Stream {
-
- private final boolean push;
- private HttpResponseHead response;
- protected Object metric;
- protected Object trace;
- private boolean requestEnded;
- private boolean responseEnded;
- protected Handler headHandler;
- protected Handler chunkHandler;
- protected Handler endHandler;
- protected Handler priorityHandler;
- protected Handler drainHandler;
- protected Handler continueHandler;
- protected Handler earlyHintsHandler;
- protected Handler unknownFrameHandler;
- protected Handler exceptionHandler;
- protected Handler pushHandler;
- protected Handler closeHandler;
- protected long writeWindow;
- protected final long windowSize;
-
- Stream(Http2ClientConnection conn, ContextInternal context, boolean push) {
- super(conn, context);
-
- this.push = push;
- this.windowSize = conn.getWindowSize();
- }
-
- void onContinue() {
- context.emit(null, v -> handleContinue());
- }
-
- void onEarlyHints(MultiMap headers) {
- context.emit(null, v -> handleEarlyHints(headers));
- }
-
- abstract void handleContinue();
-
- abstract void handleEarlyHints(MultiMap headers);
-
- public Object metric() {
- return metric;
- }
-
- public Object trace() {
- return trace;
- }
-
- @Override
- void doWriteData(ByteBuf chunk, boolean end, Handler> handler) {
- super.doWriteData(chunk, end, handler);
- }
-
- @Override
- void doWriteHeaders(Http2Headers headers, boolean end, boolean checkFlush, Handler> handler) {
- isConnect = "CONNECT".contentEquals(headers.method());
- super.doWriteHeaders(headers, end, checkFlush, handler);
- }
-
- @Override
- protected void doWriteReset(long code) {
- if (!requestEnded || !responseEnded) {
- super.doWriteReset(code);
- }
- }
-
- protected void endWritten() {
- requestEnded = true;
- if (conn.metrics != null) {
- conn.metrics.requestEnd(metric, bytesWritten());
- }
- }
-
- @Override
- void onEnd(MultiMap trailers) {
- conn.metricsEnd(this);
- responseEnded = true;
- super.onEnd(trailers);
- }
-
- @Override
- void onReset(long code) {
- if (conn.metrics != null) {
- conn.metrics.requestReset(metric);
- }
- super.onReset(code);
- }
-
- @Override
- void onHeaders(Http2Headers headers, StreamPriority streamPriority) {
- if (streamPriority != null) {
- priority(streamPriority);
- }
- if (response == null) {
- int status;
- String statusMessage;
- try {
- status = Integer.parseInt(headers.status().toString());
- statusMessage = HttpResponseStatus.valueOf(status).reasonPhrase();
- } catch (Exception e) {
- handleException(e);
- writeReset(0x01 /* PROTOCOL_ERROR */);
- return;
- }
- if (status == 100) {
- onContinue();
- return;
- } else if (status == 103) {
- MultiMap headersMultiMap = HeadersMultiMap.httpHeaders();
- removeStatusHeaders(headers);
- for (Map.Entry header : headers) {
- headersMultiMap.add(header.getKey(), header.getValue());
- }
- onEarlyHints(headersMultiMap);
- return;
- }
- response = new HttpResponseHead(
- HttpVersion.HTTP_2,
- status,
- statusMessage,
- new Http2HeadersAdaptor(headers));
- removeStatusHeaders(headers);
-
- if (conn.metrics != null) {
- conn.metrics.responseBegin(metric, response);
- }
-
- if (headHandler != null) {
- context.emit(response, headHandler);
- }
- }
- }
-
- private void removeStatusHeaders(Http2Headers headers) {
- headers.remove(":status");
- }
-
- @Override
- void onClose() {
- if (conn.metrics != null) {
- if (!requestEnded || !responseEnded) {
- conn.metrics.requestReset(metric);
- }
- }
- VertxTracer tracer = context.tracer();
- if (tracer != null && trace != null) {
- VertxException err;
- if (responseEnded && requestEnded) {
- err = null;
- } else {
- err = HttpUtils.STREAM_CLOSED_EXCEPTION;
- }
- tracer.receiveResponse(context, response, trace, err, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
- }
- if (!responseEnded) {
- // NOT SURE OF THAT
- onException(HttpUtils.STREAM_CLOSED_EXCEPTION);
- }
- super.onClose();
- // commented to be used later when we properly define the HTTP/2 connection expiration from the pool
- // boolean disposable = conn.streams.isEmpty();
- if (!push) {
- conn.recycle();
- } /* else {
- conn.listener.onRecycle(0, disposable);
- } */
- if (closeHandler != null) {
- closeHandler.handle(null);
- }
- }
- }
-
- static class StreamImpl extends Stream implements HttpClientStream {
-
- StreamImpl(Http2ClientConnection conn, ContextInternal context, boolean push) {
- super(conn, context, push);
- }
-
- @Override
- public void closeHandler(Handler handler) {
- closeHandler = handler;
- }
-
- @Override
- public void continueHandler(Handler handler) {
- continueHandler = handler;
- }
-
- @Override
- public void earlyHintsHandler(Handler handler) {
- earlyHintsHandler = handler;
- }
-
- @Override
- public void unknownFrameHandler(Handler handler) {
- unknownFrameHandler = handler;
- }
-
- @Override
- public void pushHandler(Handler handler) {
- pushHandler = handler;
- }
-
- @Override
- public StreamImpl drainHandler(Handler handler) {
- drainHandler = handler;
- return this;
- }
-
- @Override
- public StreamImpl exceptionHandler(Handler handler) {
- exceptionHandler = handler;
- return this;
- }
-
- @Override
- public WriteStream setWriteQueueMaxSize(int maxSize) {
- return this;
- }
-
- @Override
- public boolean writeQueueFull() {
- return !isNotWritable();
- }
-
- @Override
- public synchronized boolean isNotWritable() {
- return writeWindow > windowSize;
- }
-
- @Override
- public void headHandler(Handler handler) {
- headHandler = handler;
- }
-
- @Override
- public void chunkHandler(Handler handler) {
- chunkHandler = handler;
- }
-
- @Override
- public void priorityHandler(Handler handler) {
- priorityHandler = handler;
- }
-
- @Override
- public void endHandler(Handler handler) {
- endHandler = handler;
- }
-
- @Override
- public StreamPriority priority() {
- return super.priority();
- }
-
- @Override
- public void updatePriority(StreamPriority streamPriority) {
- super.updatePriority(streamPriority);
- }
-
- @Override
- public HttpVersion version() {
- return HttpVersion.HTTP_2;
- }
-
- @Override
- void handleEnd(MultiMap trailers) {
- if (endHandler != null) {
- endHandler.handle(trailers);
- }
- }
-
- @Override
- void handleData(Buffer buf) {
- if (chunkHandler != null) {
- chunkHandler.handle(buf);
- }
- }
-
- @Override
- void handleReset(long errorCode) {
- handleException(new StreamResetException(errorCode));
- }
-
- @Override
- void handleWritabilityChanged(boolean writable) {
- }
-
- @Override
- void handleCustomFrame(HttpFrame frame) {
- if (unknownFrameHandler != null) {
- unknownFrameHandler.handle(frame);
- }
- }
-
-
- @Override
- void handlePriorityChange(StreamPriority streamPriority) {
- if (priorityHandler != null) {
- priorityHandler.handle(streamPriority);
- }
- }
-
- void handleContinue() {
- if (continueHandler != null) {
- continueHandler.handle(null);
- }
- }
-
- void handleEarlyHints(MultiMap headers) {
- if (earlyHintsHandler != null) {
- earlyHintsHandler.handle(headers);
- }
- }
-
- void handleException(Throwable exception) {
- if (exceptionHandler != null) {
- exceptionHandler.handle(exception);
- }
- }
-
- @Override
- public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect, Handler> handler) {
- priority(priority);
- conn.context.emit(null, v -> {
- writeHeaders(request, buf, end, priority, connect, handler);
- });
- }
-
- private void writeHeaders(HttpRequestHead request, ByteBuf buf, boolean end, StreamPriority priority, boolean connect, Handler> handler) {
- Http2Headers headers = new DefaultHttp2Headers();
- headers.method(request.method.name());
- boolean e;
- if (request.method == HttpMethod.CONNECT) {
- if (request.authority == null) {
- throw new IllegalArgumentException("Missing :authority / host header");
- }
- headers.authority(request.authority);
- // don't end stream for CONNECT
- e = false;
- } else {
- headers.path(request.uri);
- headers.scheme(conn.isSsl() ? "https" : "http");
- if (request.authority != null) {
- headers.authority(request.authority);
- }
- e= end;
- }
- if (request.headers != null && request.headers.size() > 0) {
- for (Map.Entry header : request.headers) {
- headers.add(HttpUtils.toLowerCase(header.getKey()), header.getValue());
- }
- }
- if (conn.client.options().isTryUseCompression() && headers.get(HttpHeaderNames.ACCEPT_ENCODING) == null) {
- headers.set(HttpHeaderNames.ACCEPT_ENCODING, Http1xClientConnection.determineCompressionAcceptEncoding());
- }
- try {
- createStream(request, headers, handler);
- } catch (Http2Exception ex) {
- if (handler != null) {
- handler.handle(context.failedFuture(ex));
- }
- handleException(ex);
- return;
- }
- if (buf != null) {
- doWriteHeaders(headers, false, false, null);
- doWriteData(buf, e, handler);
- } else {
- doWriteHeaders(headers, e, true, handler);
- }
- }
-
- private void createStream(HttpRequestHead head, Http2Headers headers, Handler> handler) throws Http2Exception {
- int id = this.conn.handler.encoder().connection().local().lastStreamCreated();
- if (id == 0) {
- id = 1;
- } else {
- id += 2;
- }
- head.id = id;
- head.remoteAddress = conn.remoteAddress();
- Http2Stream stream = this.conn.handler.encoder().connection().local().createStream(id, false);
- init(stream);
- if (conn.metrics != null) {
- metric = conn.metrics.requestBegin(headers.path().toString(), head);
- }
- VertxTracer tracer = context.tracer();
- if (tracer != null) {
- BiConsumer headers_ = (key, val) -> new Http2HeadersAdaptor(headers).add(key, val);
- String operation = head.traceOperation;
- if (operation == null) {
- operation = headers.method().toString();
- }
- trace = tracer.sendRequest(context, SpanKind.RPC, conn.client.options().getTracingPolicy(), head, operation, headers_, HttpUtils.CLIENT_HTTP_REQUEST_TAG_EXTRACTOR);
- }
- }
-
- @Override
- public void writeBuffer(ByteBuf buf, boolean end, Handler> listener) {
- if (buf != null) {
- int size = buf.readableBytes();
- synchronized (this) {
- writeWindow += size;
- }
- if (listener != null) {
- Handler> prev = listener;
- listener = ar -> {
- Handler drainHandler;
- synchronized (this) {
- boolean full = writeWindow > windowSize;
- writeWindow -= size;
- if (full && writeWindow <= windowSize) {
- drainHandler = this.drainHandler;
- } else {
- drainHandler = null;
- }
- }
- if (drainHandler != null) {
- drainHandler.handle(null);
- }
- prev.handle(ar);
- };
- }
- }
- writeData(buf, end, listener);
- }
-
- @Override
- public ContextInternal getContext() {
- return context;
- }
-
- @Override
- public void doSetWriteQueueMaxSize(int size) {
- }
-
- @Override
- public void reset(Throwable cause) {
- long code = cause instanceof StreamResetException ? ((StreamResetException)cause).getCode() : 0;
- conn.context.emit(code, this::writeReset);
- }
-
- @Override
- public HttpClientConnection connection() {
- return conn;
- }
- }
-
@Override
protected void handleIdle(IdleStateEvent event) {
if (handler.connection().local().numActiveStreams() > 0) {
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientStream.java b/src/main/java/io/vertx/core/http/impl/Http2ClientStream.java
new file mode 100644
index 00000000000..0720667196f
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/impl/Http2ClientStream.java
@@ -0,0 +1,157 @@
+package io.vertx.core.http.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http2.EmptyHttp2Headers;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.concurrent.FutureListener;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.MultiMap;
+import io.vertx.core.http.HttpVersion;
+import io.vertx.core.http.StreamPriorityBase;
+import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;
+import io.vertx.core.http.impl.headers.VertxHttp2Headers;
+import io.vertx.core.http.impl.headers.VertxHttpHeaders;
+import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.spi.metrics.ClientMetrics;
+import io.vertx.core.tracing.TracingPolicy;
+
+class Http2ClientStream extends HttpStreamImpl {
+ private static final MultiMap EMPTY = new Http2HeadersAdaptor(EmptyHttp2Headers.INSTANCE);
+
+ Http2ClientStream(Http2ClientConnection conn, ContextInternal context, boolean push,
+ ClientMetrics metrics) {
+ super(conn, context, push, metrics);
+ }
+
+ @Override
+ public HttpClientConnection connection() {
+ return conn;
+ }
+
+ @Override
+ public HttpVersion version() {
+ return HttpVersion.HTTP_2;
+ }
+
+ @Override
+ protected void metricsEnd(HttpStream, ?> stream) {
+ conn.metricsEnd(stream);
+ }
+
+ @Override
+ protected void recycle() {
+ conn.recycle();
+ }
+
+ @Override
+ int lastStreamCreated() {
+ return this.conn.handler.encoder().connection().local().lastStreamCreated();
+ }
+
+ @Override
+ protected void createStreamInternal(int id, boolean b, Handler> onComplete) throws HttpException {
+ try {
+ Http2Stream stream = this.conn.handler.encoder().connection().local().createStream(id, false);
+ onComplete.handle(Future.succeededFuture(stream));
+ } catch (Http2Exception e) {
+ throw new HttpException(e);
+ }
+ }
+
+ @Override
+ protected TracingPolicy getTracingPolicy() {
+ return conn.client.options().getTracingPolicy();
+ }
+
+ @Override
+ protected boolean isTryUseCompression() {
+ return this.conn.client.options().isTryUseCompression();
+ }
+
+ @Override
+ VertxHttpHeaders createHttpHeadersWrapper() {
+ return new VertxHttp2Headers();
+ }
+
+ @Override
+ protected void consumeCredits(int len) {
+ conn.consumeCredits(this.stream, len);
+ }
+
+ @Override
+ public void writeFrame(byte type, short flags, ByteBuf payload) {
+ conn.handler.writeFrame(stream, type, flags, payload);
+ }
+
+ @Override
+ public long getWindowSize() {
+ return conn.getWindowSize();
+ }
+
+ @Override
+ public void writeHeaders(VertxHttpHeaders headers, boolean end, StreamPriorityBase priority,
+ boolean checkFlush, FutureListener promise) {
+ conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(),
+ checkFlush, promise);
+ }
+
+ @Override
+ public void writePriorityFrame(StreamPriorityBase priority) {
+ conn.handler.writePriority(stream, priority.getDependency(), priority.getWeight(), priority.isExclusive());
+ }
+
+ @Override
+ public void writeData_(ByteBuf chunk, boolean end, FutureListener promise) {
+ conn.handler.writeData(stream, chunk, end, promise);
+ }
+
+ @Override
+ public void writeReset_(int streamId, long code) {
+ conn.handler.writeReset(streamId, code);
+ }
+
+ @Override
+ public void init_(VertxHttpStreamBase vertxHttpStream, Http2Stream stream) {
+ this.stream = stream;
+ this.writable = this.conn.handler.encoder().flowController().isWritable(this.stream);
+ stream.setProperty(conn.streamKey, vertxHttpStream);
+ }
+
+ @Override
+ public synchronized int getStreamId() {
+ return stream != null ? stream.id() : -1;
+ }
+
+ @Override
+ public boolean remoteSideOpen() {
+ return stream.state().remoteSideOpen();
+ }
+
+ @Override
+ public boolean hasStream() {
+ return stream != null;
+ }
+
+ @Override
+ public MultiMap getEmptyHeaders() {
+ return EMPTY;
+ }
+
+ @Override
+ public boolean isWritable_() {
+ return writable;
+ }
+
+ @Override
+ public boolean isTrailersReceived_() {
+ return stream.isTrailersReceived();
+ }
+
+ @Override
+ public StreamPriorityBase createDefaultStreamPriority() {
+ return HttpUtils.DEFAULT_STREAM_PRIORITY;
+ }
+}
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
index 190ea91f7d2..a67679617b9 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
@@ -33,9 +33,11 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.VertxByteBufAllocator;
import io.vertx.core.http.GoAway;
+import io.vertx.core.http.Http2StreamPriority;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.StreamPriority;
+import io.vertx.core.http.StreamPriorityBase;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.VertxInternal;
@@ -61,6 +63,8 @@ private static ByteBuf safeBuffer(ByteBuf buf) {
return buffer;
}
+ protected abstract void onHeadersRead(int streamId, Http2Headers headers, StreamPriorityBase streamPriority, boolean endOfStream);
+
protected final ChannelHandlerContext handlerContext;
protected final VertxHttp2ConnectionHandler handler;
protected final Http2Connection.PropertyKey streamKey;
@@ -87,7 +91,7 @@ public Http2ConnectionBase(EventLoopContext context, VertxHttp2ConnectionHandler
this.localSettings = handler.initialSettings();
}
- VertxInternal vertx() {
+ public VertxInternal vertx() {
return vertx;
}
@@ -107,7 +111,7 @@ protected void handleIdle(IdleStateEvent event) {
}
synchronized void onConnectionError(Throwable cause) {
- ArrayList streams = new ArrayList<>();
+ ArrayList streams = new ArrayList<>();
try {
handler.connection().forEachActiveStream(stream -> {
streams.add(stream.getProperty(streamKey));
@@ -116,13 +120,13 @@ synchronized void onConnectionError(Throwable cause) {
} catch (Http2Exception e) {
log.error("Could not get the list of active streams", e);
}
- for (VertxHttp2Stream stream : streams) {
+ for (VertxHttpStreamBase stream : streams) {
stream.context.dispatch(v -> stream.handleException(cause));
}
handleException(cause);
}
- VertxHttp2Stream> stream(int id) {
+ VertxHttpStreamBase, ?> stream(int id) {
Http2Stream s = handler.connection().stream(id);
if (s == null) {
return null;
@@ -131,21 +135,21 @@ VertxHttp2Stream> stream(int id) {
}
void onStreamError(int streamId, Throwable cause) {
- VertxHttp2Stream stream = stream(streamId);
+ VertxHttpStreamBase, ?> stream = stream(streamId);
if (stream != null) {
stream.onException(cause);
}
}
void onStreamWritabilityChanged(Http2Stream s) {
- VertxHttp2Stream stream = s.getProperty(streamKey);
+ VertxHttpStreamBase, ?> stream = s.getProperty(streamKey);
if (stream != null) {
stream.onWritabilityChanged();
}
}
void onStreamClosed(Http2Stream s) {
- VertxHttp2Stream stream = s.getProperty(streamKey);
+ VertxHttpStreamBase, ?> stream = s.getProperty(streamKey);
if (stream != null) {
boolean active = chctx.channel().isActive();
if (goAwayStatus != null) {
@@ -189,9 +193,9 @@ boolean onGoAwayReceived(GoAway goAway) {
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {
- VertxHttp2Stream stream = stream(streamId);
+ VertxHttpStreamBase, ?> stream = stream(streamId);
if (stream != null) {
- StreamPriority streamPriority = new StreamPriority()
+ StreamPriorityBase streamPriority = new Http2StreamPriority()
.setDependency(streamDependency)
.setWeight(weight)
.setExclusive(exclusive);
@@ -201,7 +205,7 @@ public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDe
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
- StreamPriority streamPriority = new StreamPriority()
+ StreamPriorityBase streamPriority = new Http2StreamPriority()
.setDependency(streamDependency)
.setWeight(weight)
.setExclusive(exclusive);
@@ -213,8 +217,6 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
onHeadersRead(streamId, headers, null, endOfStream);
}
- protected abstract void onHeadersRead(int streamId, Http2Headers headers, StreamPriority streamPriority, boolean endOfStream);
-
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) {
Handler handler;
@@ -291,7 +293,7 @@ public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int wind
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload) {
- VertxHttp2Stream stream = stream(streamId);
+ VertxHttpStreamBase, ?> stream = stream(streamId);
if (stream != null) {
Buffer buff = Buffer.buffer(safeBuffer(payload));
stream.onCustomFrame(new HttpFrameImpl(frameType, flags.value(), buff));
@@ -300,7 +302,7 @@ public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int stream
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
- VertxHttp2Stream stream = stream(streamId);
+ VertxHttpStreamBase, ?> stream = stream(streamId);
if (stream != null) {
stream.onReset(errorCode);
}
@@ -308,7 +310,7 @@ public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorC
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) {
- VertxHttp2Stream stream = stream(streamId);
+ VertxHttpStreamBase, ?> stream = stream(streamId);
if (stream != null) {
data = safeBuffer(data);
Buffer buff = Buffer.buffer(data);
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java
index 8d3684b10a0..9f042670582 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java
@@ -28,6 +28,7 @@
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.http.*;
+import io.vertx.core.http.impl.headers.VertxHttp2Headers;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.net.HostAndPort;
@@ -53,7 +54,7 @@ public class Http2ServerConnection extends Http2ConnectionBase implements HttpSe
Handler requestHandler;
private int concurrentStreams;
private final ArrayDeque pendingPushes = new ArrayDeque<>(8);
- private VertxHttp2Stream upgraded;
+ private VertxHttpStreamBase upgraded;
Http2ServerConnection(
EventLoopContext context,
@@ -159,8 +160,8 @@ private Http2ServerStream createStream(int streamId, Http2Headers headers, boole
return vertxStream;
}
- VertxHttp2Stream> stream(int id) {
- VertxHttp2Stream> stream = super.stream(id);
+ VertxHttpStreamBase, ?> stream(int id) {
+ VertxHttpStreamBase, ?> stream = super.stream(id);
if (stream == null && id == 1 && handler.upgraded) {
return upgraded;
}
@@ -168,8 +169,8 @@ VertxHttp2Stream> stream(int id) {
}
@Override
- protected synchronized void onHeadersRead(int streamId, Http2Headers headers, StreamPriority streamPriority, boolean endOfStream) {
- VertxHttp2Stream stream = stream(streamId);
+ protected synchronized void onHeadersRead(int streamId, Http2Headers headers, StreamPriorityBase streamPriority, boolean endOfStream) {
+ VertxHttpStreamBase, ?> stream = stream(streamId);
if (stream == null) {
if (isMalformedRequest(headers)) {
handler.writeReset(streamId, Http2Error.PROTOCOL_ERROR.code());
@@ -181,7 +182,7 @@ protected synchronized void onHeadersRead(int streamId, Http2Headers headers, St
} else {
stream = createStream(streamId, headers, endOfStream);
}
- stream.onHeaders(headers, streamPriority);
+ stream.onHeaders(new VertxHttp2Headers(headers), streamPriority);
} else {
// Http server request trailer - not implemented yet (in api)
}
@@ -190,7 +191,7 @@ protected synchronized void onHeadersRead(int streamId, Http2Headers headers, St
}
}
- void sendPush(int streamId, HostAndPort authority, HttpMethod method, MultiMap headers, String path, StreamPriority streamPriority, Promise promise) {
+ void sendPush(int streamId, HostAndPort authority, HttpMethod method, MultiMap headers, String path, StreamPriorityBase streamPriority, Promise promise) {
EventLoop eventLoop = context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
doSendPush(streamId, authority, method, headers, path, streamPriority, promise);
@@ -199,7 +200,7 @@ void sendPush(int streamId, HostAndPort authority, HttpMethod method, MultiMap h
}
}
- private synchronized void doSendPush(int streamId, HostAndPort authority, HttpMethod method, MultiMap headers, String path, StreamPriority streamPriority, Promise promise) {
+ private synchronized void doSendPush(int streamId, HostAndPort authority, HttpMethod method, MultiMap headers, String path, StreamPriorityBase streamPriority, Promise promise) {
boolean ssl = isSsl();
Http2Headers headers_ = new DefaultHttp2Headers();
headers_.method(method.name());
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java b/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java
index 227dce9b7c7..2cc5e0343c1 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java
@@ -68,7 +68,7 @@ public class Http2ServerRequest extends HttpServerRequestInternal implements Htt
private boolean expectMultipart;
private HttpPostRequestDecoder postRequestDecoder;
private Handler customFrameHandler;
- private Handler streamPriorityHandler;
+ private Handler streamPriorityHandler;
Http2ServerRequest(Http2ServerStream stream,
String serverOrigin,
@@ -523,12 +523,12 @@ public synchronized Future end() {
return eventHandler(true).end();
}
- public StreamPriority streamPriority() {
+ public StreamPriorityBase streamPriority() {
return stream.priority();
}
@Override
- public HttpServerRequest streamPriorityHandler(Handler handler) {
+ public HttpServerRequest streamPriorityHandler(Handler handler) {
synchronized (stream.conn) {
streamPriorityHandler = handler;
}
@@ -541,8 +541,8 @@ public DecoderResult decoderResult() {
}
@Override
- public void handlePriorityChange(StreamPriority streamPriority) {
- Handler handler;
+ public void handlePriorityChange(StreamPriorityBase streamPriority) {
+ Handler handler;
synchronized (stream.conn) {
handler = streamPriorityHandler;
}
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java b/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java
index 9713c5fd38e..932d38cee90 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java
@@ -18,7 +18,6 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpStatusClass;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.Http2Headers;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
@@ -27,9 +26,10 @@
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
-import io.vertx.core.http.StreamPriority;
+import io.vertx.core.http.StreamPriorityBase;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;
+import io.vertx.core.http.impl.headers.VertxHttp2Headers;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.HostAndPort;
import io.vertx.core.net.NetSocket;
@@ -52,9 +52,9 @@ public class Http2ServerResponse implements HttpServerResponse, HttpResponse {
private final Http2ServerConnection conn;
private final boolean push;
private final String contentEncoding;
- private final Http2Headers headers = new DefaultHttp2Headers();
+ private final DefaultHttp2Headers headers = new DefaultHttp2Headers();
private Http2HeadersAdaptor headersMap;
- private Http2Headers trailers;
+ private DefaultHttp2Headers trailers;
private Http2HeadersAdaptor trailedMap;
private boolean chunked;
private boolean headWritten;
@@ -247,7 +247,8 @@ public HttpServerResponse putHeader(CharSequence name, Iterable va
public MultiMap trailers() {
synchronized (conn) {
if (trailedMap == null) {
- trailedMap = new Http2HeadersAdaptor(trailers = new DefaultHttp2Headers());
+ trailers = new DefaultHttp2Headers();
+ trailedMap = new Http2HeadersAdaptor(trailers);
}
return trailedMap;
}
@@ -315,7 +316,10 @@ public HttpServerResponse endHandler(@Nullable Handler handler) {
public HttpServerResponse writeContinue() {
synchronized (conn) {
checkHeadWritten();
- stream.writeHeaders(new DefaultHttp2Headers().status(HttpResponseStatus.CONTINUE.codeAsText()), false, true, null);
+ DefaultHttp2Headers defaultHttp2Headers = new DefaultHttp2Headers();
+ defaultHttp2Headers.status(HttpResponseStatus.CONTINUE.codeAsText());
+ stream.writeHeaders(new VertxHttp2Headers(defaultHttp2Headers), false, true,
+ null);
return this;
}
}
@@ -337,7 +341,7 @@ public void writeEarlyHints(MultiMap headers, Handler> handler
synchronized (conn) {
checkHeadWritten();
}
- stream.writeHeaders(http2Headers, false, true, handler);
+ stream.writeHeaders(new VertxHttp2Headers(http2Headers), false, true, handler);
}
@Override
@@ -471,7 +475,7 @@ void write(ByteBuf chunk, boolean end, Handler> handler) {
invokeHandler = true;
}
if (end && trailers != null) {
- stream.writeHeaders(trailers, true, true, null);
+ stream.writeHeaders(new VertxHttp2Headers(trailers), true, true, null);
}
bodyEndHandler = this.bodyEndHandler;
endHandler = this.endHandler;
@@ -507,7 +511,7 @@ private boolean checkSendHeaders(boolean end, boolean checkFlush) {
}
prepareHeaders();
headWritten = true;
- stream.writeHeaders(headers, end, checkFlush, null);
+ stream.writeHeaders(new VertxHttp2Headers(headers), end, checkFlush, null);
return true;
} else {
return false;
@@ -735,7 +739,7 @@ public Future push(HttpMethod method, String authority, Stri
}
@Override
- public HttpServerResponse setStreamPriority(StreamPriority priority) {
+ public HttpServerResponse setStreamPriority(StreamPriorityBase priority) {
stream.updatePriority(priority);
return this;
}
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java b/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java
index 610f508d8e2..9e31c2f7955 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java
@@ -10,9 +10,13 @@
*/
package io.vertx.core.http.impl;
+import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http2.EmptyHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.concurrent.FutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
@@ -20,8 +24,9 @@
import io.vertx.core.http.HttpFrame;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
-import io.vertx.core.http.StreamPriority;
+import io.vertx.core.http.StreamPriorityBase;
import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;
+import io.vertx.core.http.impl.headers.VertxHttpHeaders;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.HostAndPort;
import io.vertx.core.net.impl.HostAndPortImpl;
@@ -34,7 +39,8 @@
import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED;
-class Http2ServerStream extends VertxHttp2Stream {
+class Http2ServerStream extends VertxHttpStreamBase {
+ private static final MultiMap EMPTY = new Http2HeadersAdaptor(EmptyHttp2Headers.INSTANCE);
protected final Http2Headers headers;
protected final HttpMethod method;
@@ -98,7 +104,7 @@ void registerMetrics() {
}
@Override
- void onHeaders(Http2Headers headers, StreamPriority streamPriority) {
+ void onHeaders(VertxHttpHeaders headers, StreamPriorityBase streamPriority) {
if (streamPriority != null) {
priority(streamPriority);
}
@@ -106,12 +112,13 @@ void onHeaders(Http2Headers headers, StreamPriority streamPriority) {
CharSequence value = headers.get(HttpHeaderNames.EXPECT);
if (conn.options.isHandle100ContinueAutomatically() &&
((value != null && HttpHeaderValues.CONTINUE.equals(value)) ||
- headers.contains(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE))) {
+ headers.contains(String.valueOf(HttpHeaderNames.EXPECT), String.valueOf(HttpHeaderValues.CONTINUE)))) {
request.response().writeContinue();
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
- trace = tracer.receiveRequest(context, SpanKind.RPC, tracingPolicy, request, method().name(), new Http2HeadersAdaptor(headers), HttpUtils.SERVER_REQUEST_TAG_EXTRACTOR);
+ trace = tracer.receiveRequest(context, SpanKind.RPC, tracingPolicy, request, method().name(),
+ headers.toHeaderAdapter(), HttpUtils.SERVER_REQUEST_TAG_EXTRACTOR);
}
request.dispatch(conn.requestHandler);
}
@@ -129,7 +136,7 @@ void onEnd(MultiMap trailers) {
}
@Override
- void doWriteHeaders(Http2Headers headers, boolean end, boolean checkFlush, Handler> handler) {
+ void doWriteHeaders(VertxHttpHeaders headers, boolean end, boolean checkFlush, Handler> handler) {
if (Metrics.METRICS_ENABLED && !end) {
HttpServerMetrics metrics = conn.metrics();
if (metrics != null) {
@@ -188,7 +195,7 @@ void handleCustomFrame(HttpFrame frame) {
}
@Override
- void handlePriorityChange(StreamPriority newPriority) {
+ void handlePriorityChange(StreamPriorityBase newPriority) {
request.handlePriorityChange(newPriority);
}
@@ -242,4 +249,77 @@ public HttpServerRequest routed(String route) {
}
return null;
}
+
+ @Override
+ protected void consumeCredits(int len) {
+ conn.consumeCredits(this.stream, len);
+ }
+
+ @Override
+ public void writeFrame(byte type, short flags, ByteBuf payload) {
+ conn.handler.writeFrame(stream, type, flags, payload);
+ }
+ @Override
+ public void writeHeaders(VertxHttpHeaders headers, boolean end, StreamPriorityBase priority,
+ boolean checkFlush, FutureListener promise) {
+ conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(),
+ checkFlush, promise);
+ }
+
+ @Override
+ public void writePriorityFrame(StreamPriorityBase priority) {
+ conn.handler.writePriority(stream, priority.getDependency(), priority.getWeight(), priority.isExclusive());
+ }
+
+ @Override
+ public void writeData_(ByteBuf chunk, boolean end, FutureListener promise) {
+ conn.handler.writeData(stream, chunk, end, promise);
+ }
+
+ @Override
+ public void writeReset_(int streamId, long code) {
+ conn.handler.writeReset(streamId, code);
+ }
+
+ @Override
+ public void init_(VertxHttpStreamBase vertxHttpStream, Http2Stream stream) {
+ this.stream = stream;
+ this.writable = this.conn.handler.encoder().flowController().isWritable(this.stream);
+ stream.setProperty(conn.streamKey, vertxHttpStream);
+ }
+
+ @Override
+ public synchronized int getStreamId() {
+ return stream != null ? stream.id() : -1;
+ }
+
+ @Override
+ public boolean remoteSideOpen() {
+ return stream.state().remoteSideOpen();
+ }
+
+ @Override
+ public boolean hasStream() {
+ return stream != null;
+ }
+
+ @Override
+ public MultiMap getEmptyHeaders() {
+ return EMPTY;
+ }
+
+ @Override
+ public boolean isWritable_() {
+ return writable;
+ }
+
+ @Override
+ public boolean isTrailersReceived_() {
+ return stream.isTrailersReceived();
+ }
+
+ @Override
+ protected StreamPriorityBase createDefaultStreamPriority() {
+ return HttpUtils.DEFAULT_STREAM_PRIORITY;
+ }
}
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerStreamHandler.java b/src/main/java/io/vertx/core/http/impl/Http2ServerStreamHandler.java
index fe3959a32b4..5dff8ea0e58 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2ServerStreamHandler.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2ServerStreamHandler.java
@@ -15,7 +15,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpFrame;
import io.vertx.core.http.HttpServerRequest;
-import io.vertx.core.http.StreamPriority;
+import io.vertx.core.http.StreamPriorityBase;
interface Http2ServerStreamHandler {
@@ -38,7 +38,7 @@ default void handleEnd(MultiMap trailers) {
default void handleCustomFrame(HttpFrame frame) {
}
- default void handlePriorityChange(StreamPriority streamPriority) {
+ default void handlePriorityChange(StreamPriorityBase streamPriority) {
}
default void onException(Throwable t) {
diff --git a/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java
index 61fa54654b7..7e4b2b7791c 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java
@@ -143,7 +143,7 @@ public ContextInternal getContext() {
}
@Override
- public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect, Handler> handler) {
+ public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriorityBase priority, boolean connect, Handler> handler) {
delegate.writeHead(request, chunked, buf, end, priority, connect, handler);
}
@@ -193,7 +193,7 @@ public void endHandler(Handler handler) {
}
@Override
- public void priorityHandler(Handler handler) {
+ public void priorityHandler(Handler handler) {
delegate.priorityHandler(handler);
}
@@ -228,12 +228,12 @@ public void reset(Throwable cause) {
}
@Override
- public StreamPriority priority() {
+ public StreamPriorityBase priority() {
return delegate.priority();
}
@Override
- public void updatePriority(StreamPriority streamPriority) {
+ public void updatePriority(StreamPriorityBase streamPriority) {
delegate.updatePriority(streamPriority);
}
@@ -259,6 +259,11 @@ public WriteStream drainHandler(@Nullable Handler handler) {
delegate.drainHandler(handler);
return this;
}
+
+ @Override
+ public StreamPriorityBase createDefaultStreamPriority() {
+ return delegate.createDefaultStreamPriority();
+ }
}
/**
@@ -274,7 +279,7 @@ private static class UpgradingStream implements HttpClientStream {
private Handler headHandler;
private Handler chunkHandler;
private Handler endHandler;
- private Handler priorityHandler;
+ private Handler priorityHandler;
private Handler exceptionHandler;
private Handler drainHandler;
private Handler continueHandler;
@@ -302,7 +307,7 @@ public void writeHead(HttpRequestHead request,
boolean chunked,
ByteBuf buf,
boolean end,
- StreamPriority priority,
+ StreamPriorityBase priority,
boolean connect,
Handler> handler) {
ChannelPipeline pipeline = upgradingConnection.channel().pipeline();
@@ -504,7 +509,7 @@ private void doWriteHead(HttpRequestHead head,
boolean chunked,
ByteBuf buf,
boolean end,
- StreamPriority priority,
+ StreamPriorityBase priority,
boolean connect,
Handler> handler) {
EventExecutor exec = upgradingConnection.channelHandlerContext().executor();
@@ -651,7 +656,7 @@ public void unknownFrameHandler(Handler handler) {
}
@Override
- public void priorityHandler(Handler handler) {
+ public void priorityHandler(Handler handler) {
if (upgradedStream != null) {
upgradedStream.priorityHandler(handler);
} else {
@@ -748,7 +753,7 @@ public void reset(Throwable cause) {
}
@Override
- public StreamPriority priority() {
+ public StreamPriorityBase priority() {
if (upgradedStream != null) {
return upgradedStream.priority();
} else {
@@ -757,13 +762,22 @@ public StreamPriority priority() {
}
@Override
- public void updatePriority(StreamPriority streamPriority) {
+ public void updatePriority(StreamPriorityBase streamPriority) {
if (upgradedStream != null) {
upgradedStream.updatePriority(streamPriority);
} else {
upgradingStream.updatePriority(streamPriority);
}
}
+
+ @Override
+ public StreamPriorityBase createDefaultStreamPriority() {
+ if (upgradedStream != null) {
+ return upgradedStream.createDefaultStreamPriority();
+ } else {
+ return upgradingStream.createDefaultStreamPriority();
+ }
+ }
}
@Override
diff --git a/src/main/java/io/vertx/core/http/impl/Http3ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http3ClientConnection.java
new file mode 100644
index 00000000000..6baa9933909
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/impl/Http3ClientConnection.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+
+package io.vertx.core.http.impl;
+
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.incubator.codec.http3.Http3Headers;
+import io.netty.incubator.codec.quic.QuicStreamChannel;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.http.StreamPriorityBase;
+import io.vertx.core.http.impl.headers.Http3HeadersAdaptor;
+import io.vertx.core.http.impl.headers.VertxHttp3Headers;
+import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.impl.EventLoopContext;
+import io.vertx.core.spi.metrics.ClientMetrics;
+import io.vertx.core.spi.metrics.HttpClientMetrics;
+
+/**
+ * @author Iman Zolfaghari
+ */
+class Http3ClientConnection extends Http3ConnectionBase implements HttpClientConnection {
+
+ public final HttpClientImpl client;
+ private final ClientMetrics metrics;
+ private Handler evictionHandler = DEFAULT_EVICTION_HANDLER;
+ private Handler concurrencyChangeHandler = DEFAULT_CONCURRENCY_CHANGE_HANDLER;
+ private long expirationTimestamp;
+ private boolean evicted;
+
+ public Http3ClientConnection(HttpClientImpl client,
+ EventLoopContext context,
+ VertxHttp3ConnectionHandler extends Http3ConnectionBase> connHandler,
+ ClientMetrics metrics) {
+ super(context, connHandler);
+ this.metrics = metrics;
+ this.client = client;
+ }
+
+ @Override
+ public Http3ClientConnection evictionHandler(Handler handler) {
+ evictionHandler = handler;
+ return this;
+ }
+
+ @Override
+ public Http3ClientConnection concurrencyChangeHandler(Handler handler) {
+ concurrencyChangeHandler = handler;
+ return this;
+ }
+
+ public long concurrency() {
+// long concurrency = remoteSettings().getMaxConcurrentStreams();
+// long http2MaxConcurrency = client.options().getHttp2MultiplexingLimit() <= 0 ? Long.MAX_VALUE : client.options().getHttp2MultiplexingLimit();
+// if (http2MaxConcurrency > 0) {
+// concurrency = Math.min(concurrency, http2MaxConcurrency);
+// }
+// return concurrency;
+ return 5;
+ }
+
+// @Override
+// boolean onGoAwaySent(GoAway goAway) {
+// boolean goneAway = super.onGoAwaySent(goAway);
+// if (goneAway) {
+// // Eagerly evict from the pool
+// tryEvict();
+// }
+// return goneAway;
+// }
+//
+// @Override
+// boolean onGoAwayReceived(GoAway goAway) {
+// boolean goneAway = super.onGoAwayReceived(goAway);
+// if (goneAway) {
+// // Eagerly evict from the pool
+// tryEvict();
+// }
+// return goneAway;
+// }
+
+ /**
+ * Try to evict the connection from the pool. This can be called multiple times since
+ * the connection can be eagerly removed from the pool on emission or reception of a {@code GOAWAY}
+ * frame.
+ */
+ private void tryEvict() {
+ if (!evicted) {
+ evicted = true;
+ evictionHandler.handle(null);
+ }
+ }
+
+// @Override
+// protected void concurrencyChanged(long concurrency) {
+// int limit = client.options().getHttp2MultiplexingLimit();
+// if (limit > 0) {
+// concurrency = Math.min(concurrency, limit);
+// }
+// concurrencyChangeHandler.handle(concurrency);
+// }
+
+ @Override
+ public HttpClientMetrics metrics() {
+ return client.metrics();
+ }
+
+ @Override
+ public void createStream(ContextInternal context, Handler> handler) {
+ Future fut;
+ synchronized (this) {
+ try {
+ HttpStreamImpl stream = createStream(context);
+ fut = Future.succeededFuture(stream);
+ } catch (Exception e) {
+ fut = Future.failedFuture(e);
+ }
+ }
+ context.emit(fut, handler);
+ }
+
+ private HttpStreamImpl createStream(ContextInternal context) {
+ return new Http3ClientStream(this, context, false, metrics);
+ }
+
+ public void recycle() {
+ int timeout = client.options().getHttp2KeepAliveTimeout();
+ expirationTimestamp = timeout > 0 ? System.currentTimeMillis() + timeout * 1000L : 0L;
+ }
+
+ @Override
+ public boolean isValid() {
+ return expirationTimestamp == 0 || System.currentTimeMillis() <= expirationTimestamp;
+ }
+
+ @Override
+ public long lastResponseReceivedTimestamp() {
+ return 0L;
+ }
+
+ @Override
+ protected synchronized void onHeadersRead(VertxHttpStreamBase, ?> stream, Http3Headers headers, StreamPriorityBase streamPriority, boolean endOfStream) {
+ if (!stream.isTrailersReceived()) {
+ stream.onHeaders(new VertxHttp3Headers(headers), streamPriority);
+ if (endOfStream) {
+ stream.onEnd();
+ }
+ } else {
+ stream.onEnd(new Http3HeadersAdaptor(headers));
+ }
+ }
+
+ public void metricsEnd(HttpStream stream) {
+ if (metrics != null) {
+ metrics.responseEnd(stream.metric, stream.bytesRead());
+ }
+ }
+
+ @Override
+ protected void handleIdle(IdleStateEvent event) {
+// if (handler.connection().local().numActiveStreams() > 0) {
+ super.handleIdle(event);
+// }
+ }
+
+ public static VertxHttp3ConnectionHandler createVertxHttp3ConnectionHandler(
+ HttpClientImpl client,
+ ClientMetrics metrics,
+ EventLoopContext context,
+ boolean upgrade,
+ Object socketMetric) {
+ HttpClientOptions options = client.options();
+ HttpClientMetrics met = client.metrics();
+ VertxHttp3ConnectionHandler handler =
+ new VertxHttp3ConnectionHandlerBuilder()
+ .server(false)
+ .http3InitialSettings(client.options().getHttp3InitialSettings())
+ .connectionFactory(connHandler -> {
+ Http3ClientConnection conn = new Http3ClientConnection(client, context, connHandler, metrics);
+ if (metrics != null) {
+ Object m = socketMetric;
+ conn.metric(m);
+ }
+ return conn;
+ })
+ .build(context);
+ handler.addHandler(conn -> {
+ if (options.getHttp2ConnectionWindowSize() > 0) {
+ conn.setWindowSize(options.getHttp2ConnectionWindowSize());
+ }
+ if (metrics != null) {
+ if (!upgrade) {
+ met.endpointConnected(metrics);
+ }
+ }
+ });
+ handler.removeHandler(conn -> {
+ if (metrics != null) {
+ met.endpointDisconnected(metrics);
+ }
+ conn.tryEvict();
+ });
+ return handler;
+ }
+}
diff --git a/src/main/java/io/vertx/core/http/impl/Http3ClientStream.java b/src/main/java/io/vertx/core/http/impl/Http3ClientStream.java
new file mode 100644
index 00000000000..9bfa8d55172
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/impl/Http3ClientStream.java
@@ -0,0 +1,167 @@
+package io.vertx.core.http.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.incubator.codec.http3.DefaultHttp3Headers;
+import io.netty.incubator.codec.http3.Http3;
+import io.netty.incubator.codec.http3.Http3FrameToHttpObjectCodec;
+import io.netty.incubator.codec.http3.Http3RequestStreamInitializer;
+import io.netty.incubator.codec.quic.QuicChannel;
+import io.netty.incubator.codec.quic.QuicStreamChannel;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.MultiMap;
+import io.vertx.core.http.HttpVersion;
+import io.vertx.core.http.StreamPriorityBase;
+import io.vertx.core.http.impl.headers.Http3HeadersAdaptor;
+import io.vertx.core.http.impl.headers.VertxHttp3Headers;
+import io.vertx.core.http.impl.headers.VertxHttpHeaders;
+import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.spi.metrics.ClientMetrics;
+import io.vertx.core.tracing.TracingPolicy;
+
+class Http3ClientStream extends HttpStreamImpl {
+ private static final MultiMap EMPTY = new Http3HeadersAdaptor(new DefaultHttp3Headers());
+
+ Http3ClientStream(Http3ClientConnection conn, ContextInternal context, boolean push,
+ ClientMetrics, ?, ?, ?> metrics) {
+ super(conn, context, push, metrics);
+ }
+
+ @Override
+ public HttpClientConnection connection() {
+ return conn;
+ }
+
+ @Override
+ public HttpVersion version() {
+ return HttpVersion.HTTP_3;
+ }
+
+ @Override
+ protected void metricsEnd(HttpStream, ?> stream) {
+ conn.metricsEnd(stream);
+ }
+
+ @Override
+ protected void recycle() {
+ conn.recycle();
+ }
+
+ @Override
+ int lastStreamCreated() {
+ return this.stream != null ? (int) this.stream.streamId() : 0;
+ }
+
+ @Override
+ protected void createStreamInternal(int id, boolean b, Handler> onComplete) {
+ Http3.newRequestStream((QuicChannel) conn.channelHandlerContext().channel().parent(),
+ new Http3RequestStreamInitializer() {
+ @Override
+ protected void initRequestStream(QuicStreamChannel ch) {
+ ch.pipeline()
+ .addLast(new Http3FrameToHttpObjectCodec(false))
+ .addLast(conn.handler);
+ }
+ })
+ .addListener((GenericFutureListener>) quicStreamChannelFuture -> {
+ QuicStreamChannel quicStreamChannel = quicStreamChannelFuture.get();
+ onComplete.handle(Future.succeededFuture(quicStreamChannel));
+ });
+ }
+
+ @Override
+ protected TracingPolicy getTracingPolicy() {
+ return conn.client.options().getTracingPolicy();
+ }
+
+ @Override
+ protected boolean isTryUseCompression() {
+ return this.conn.client.options().isTryUseCompression();
+ }
+
+ @Override
+ VertxHttpHeaders createHttpHeadersWrapper() {
+ return new VertxHttp3Headers();
+ }
+
+ @Override
+ protected void consumeCredits(int len) {
+ conn.consumeCredits(this.stream, len);
+ }
+
+ @Override
+ public void writeFrame(byte type, short flags, ByteBuf payload) {
+ stream.write(payload);
+ }
+
+ @Override
+ public long getWindowSize() {
+ return conn.getWindowSize();
+ }
+
+ @Override
+ public void writeHeaders(VertxHttpHeaders headers, boolean end, StreamPriorityBase priority,
+ boolean checkFlush, FutureListener promise) {
+ conn.handler.writeHeaders(stream, headers, end, priority, checkFlush, promise);
+ }
+
+ @Override
+ public void writePriorityFrame(StreamPriorityBase priority) {
+ conn.handler.writePriority(stream, priority.urgency(), priority.isIncremental());
+ }
+
+ @Override
+ public void writeData_(ByteBuf chunk, boolean end, FutureListener promise) {
+ conn.handler.writeData(stream, chunk, end, promise);
+ }
+
+ @Override
+ public void writeReset_(int streamId, long code) {
+ stream.write(code);
+ }
+
+ @Override
+ public void init_(VertxHttpStreamBase vertxHttpStream, QuicStreamChannel stream) {
+ this.stream = stream;
+ this.writable = stream.isWritable();
+ VertxHttp3ConnectionHandler.setHttp3ClientStream(stream, this);
+ }
+
+ @Override
+ public synchronized int getStreamId() {
+ return stream != null ? (int) stream.streamId() : -1;
+ }
+
+ @Override
+ public boolean remoteSideOpen() {
+ return stream.isOpen();
+ }
+
+ @Override
+ public boolean hasStream() {
+ return stream != null;
+ }
+
+ @Override
+ public MultiMap getEmptyHeaders() {
+ return EMPTY;
+ }
+
+ @Override
+ public boolean isWritable_() {
+ return writable;
+ }
+
+ @Override
+ public boolean isTrailersReceived_() {
+ return false; //TODO review
+ }
+
+ @Override
+ public StreamPriorityBase createDefaultStreamPriority() {
+ return HttpUtils.DEFAULT_QUIC_STREAM_PRIORITY;
+ }
+}
diff --git a/src/main/java/io/vertx/core/http/impl/Http3ConnectionBase.java b/src/main/java/io/vertx/core/http/impl/Http3ConnectionBase.java
new file mode 100644
index 00000000000..73c4d0a528b
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/impl/Http3ConnectionBase.java
@@ -0,0 +1,537 @@
+/*
+ * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+
+package io.vertx.core.http.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.incubator.codec.http3.Http3Headers;
+import io.netty.incubator.codec.http3.Http3SettingsFrame;
+import io.netty.incubator.codec.quic.QuicStreamChannel;
+import io.vertx.codegen.annotations.Nullable;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.buffer.impl.VertxByteBufAllocator;
+import io.vertx.core.http.GoAway;
+import io.vertx.core.http.Http2Settings;
+import io.vertx.core.http.Http2StreamPriority;
+import io.vertx.core.http.HttpClosedException;
+import io.vertx.core.http.HttpConnection;
+import io.vertx.core.http.StreamPriorityBase;
+import io.vertx.core.impl.EventLoopContext;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.future.PromiseInternal;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+import io.vertx.core.net.impl.ConnectionBase;
+
+import java.util.ArrayDeque;
+
+/**
+ * @author Iman Zolfaghari
+ */
+public abstract class Http3ConnectionBase extends ConnectionBase implements HttpConnection {
+
+ private static final Logger log = LoggerFactory.getLogger(Http2ConnectionBase.class);
+
+ private static ByteBuf safeBuffer(ByteBuf buf) {
+ ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(buf.readableBytes());
+ buffer.writeBytes(buf);
+ return buffer;
+ }
+
+ protected abstract void onHeadersRead(VertxHttpStreamBase, ?> stream, Http3Headers headers,
+ StreamPriorityBase streamPriority, boolean endOfStream);
+
+ protected final ChannelHandlerContext handlerContext;
+ protected VertxHttp3ConnectionHandler extends Http3ConnectionBase> handler;
+// protected final Http2Connection.PropertyKey streamKey;
+ private boolean shutdown;
+ private Handler remoteSettingsHandler;
+ private final ArrayDeque> updateSettingsHandlers = new ArrayDeque<>();
+ private final ArrayDeque> pongHandlers = new ArrayDeque<>();
+ private Http3SettingsFrame localSettings;
+ private Http3SettingsFrame remoteSettings;
+ private Handler goAwayHandler;
+ private Handler shutdownHandler;
+ private Handler pingHandler;
+ private GoAway goAwayStatus;
+ private int windowSize;
+ private long maxConcurrentStreams;
+
+ public Http3ConnectionBase(EventLoopContext context, VertxHttp3ConnectionHandler extends Http3ConnectionBase> handler) {
+ super(context, handler.context());
+ this.handler = handler;
+ this.handlerContext = chctx;
+ this.windowSize = -1; //TODO: old code: handler.connection().local().flowController().windowSize(handler.connection().connectionStream());
+ this.maxConcurrentStreams = 0xFFFFFFFFL; //TODO: old code: io.vertx.core.http.Http2Settings.DEFAULT_MAX_CONCURRENT_STREAMS;
+// this.streamKey = handler.connection().newKey();
+ this.localSettings = handler.initialSettings();
+ }
+
+ public VertxInternal vertx() {
+ return vertx;
+ }
+
+ @Override
+ public void handleClosed() {
+ super.handleClosed();
+ }
+
+ @Override
+ protected void handleInterestedOpsChanged() {
+ // Handled by HTTP/3
+ }
+
+ @Override
+ protected void handleIdle(IdleStateEvent event) {
+ super.handleIdle(event);
+ }
+
+ synchronized void onConnectionError(Throwable cause) {
+// ArrayList streams = new ArrayList<>();
+// try {
+// handler.connection().forEachActiveStream(stream -> {
+// streams.add(stream.getProperty(streamKey));
+// return true;
+// });
+// } catch (Http2Exception e) {
+// log.error("Could not get the list of active streams", e);
+// }
+// for (VertxHttpStreamBase stream : streams) {
+// stream.context.dispatch(v -> stream.handleException(cause));
+// }
+ handleException(cause);
+ }
+
+// VertxHttpStreamBase, ?> stream(int id) {
+// VertxHttpStreamBase, ?> s = handler.connection().stream(id);
+// if (s == null) {
+// return null;
+// }
+// return s.getProperty(streamKey);
+// }
+
+ void onStreamError(VertxHttpStreamBase, ?> stream, Throwable cause) {
+ if (stream != null) {
+ stream.onException(cause);
+ }
+ }
+
+ void onStreamWritabilityChanged(VertxHttpStreamBase, ?> stream) {
+// this.handler.getHttp3ConnectionHandler().channelWritabilityChanged();
+ if (stream != null) {
+ stream.onWritabilityChanged();
+ }
+ }
+
+ void onStreamClosed(VertxHttpStreamBase, ?> stream) {
+ if (stream != null) {
+ boolean active = chctx.channel().isActive();
+ if (goAwayStatus != null) {
+ stream.onException(new HttpClosedException(goAwayStatus));
+ } else if (!active) {
+ stream.onException(HttpUtils.STREAM_CLOSED_EXCEPTION);
+ }
+ stream.onClose();
+ }
+ checkShutdown();
+ }
+
+ boolean onGoAwaySent(GoAway goAway) {
+ synchronized (this) {
+ if (this.goAwayStatus != null) {
+ return false;
+ }
+ this.goAwayStatus = goAway;
+ }
+ checkShutdown();
+ return true;
+ }
+
+ boolean onGoAwayReceived(GoAway goAway) {
+ Handler handler;
+ synchronized (this) {
+ if (this.goAwayStatus != null) {
+ return false;
+ }
+ this.goAwayStatus = goAway;
+ handler = goAwayHandler;
+ }
+ if (handler != null) {
+ context.dispatch(new GoAway(goAway), handler);
+ }
+ checkShutdown();
+ return true;
+ }
+
+ // Http3FrameListener
+
+// @Override
+ public void onPriorityRead(ChannelHandlerContext ctx, VertxHttpStreamBase, ?> stream, int streamDependency,
+ short weight, boolean exclusive) {
+ if (stream != null) {
+ StreamPriorityBase streamPriority = new Http2StreamPriority()
+ .setDependency(streamDependency)
+ .setWeight(weight)
+ .setExclusive(exclusive);
+ stream.onPriorityChange(streamPriority);
+ }
+ }
+
+// @Override
+ public void onHeadersRead(ChannelHandlerContext ctx, VertxHttpStreamBase, ?> stream, Http3Headers headers,
+ int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
+ StreamPriorityBase streamPriority = new Http2StreamPriority()
+ .setDependency(streamDependency)
+ .setWeight(weight)
+ .setExclusive(exclusive);
+ onHeadersRead(stream, headers, streamPriority, endOfStream);
+ }
+
+// @Override
+ public void onHeadersRead(ChannelHandlerContext ctx, VertxHttpStreamBase, ?> stream, Http3Headers headers,
+ int padding, boolean endOfStream) throws Http2Exception {
+ onHeadersRead(stream, headers, null, endOfStream);
+ }
+
+// @Override
+ public void onSettingsAckRead(ChannelHandlerContext ctx) {
+ Handler handler;
+ synchronized (this) {
+ handler = updateSettingsHandlers.poll();
+ }
+ if (handler != null) {
+ // No need to run on a particular context it shall be done by the handler instead
+ context.emit(handler);
+ }
+ }
+
+ protected void concurrencyChanged(long concurrency) {
+ }
+
+// @Override
+ public void onSettingsRead(ChannelHandlerContext ctx, Http3SettingsFrame settings) {
+ boolean changed;
+ Handler handler;
+ synchronized (this) {
+// Long val = settings.maxConcurrentStreams(); //TODO:
+ Long val = 5L;
+ if (val != null) {
+ if (remoteSettings != null) {
+ changed = val != maxConcurrentStreams;
+ } else {
+ changed = false;
+ }
+ maxConcurrentStreams = val;
+ } else {
+ changed = false;
+ }
+ remoteSettings = settings;
+ handler = remoteSettingsHandler;
+ }
+ if (handler != null) {
+ context.dispatch(settings, handler);
+ }
+ if (changed) {
+ concurrencyChanged(maxConcurrentStreams);
+ }
+ }
+
+// @Override
+ public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
+ Handler handler = pingHandler;
+ if (handler != null) {
+ Buffer buff = Buffer.buffer().appendLong(data);
+ context.dispatch(v -> handler.handle(buff));
+ }
+ }
+
+// @Override
+ public void onPingAckRead(ChannelHandlerContext ctx, long data) {
+ Promise handler = pongHandlers.poll();
+ if (handler != null) {
+ Buffer buff = Buffer.buffer().appendLong(data);
+ handler.complete(buff);
+ }
+ }
+
+// @Override
+ public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
+ Http2Headers headers, int padding) throws Http2Exception {
+ }
+
+// @Override
+ public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
+ }
+
+// @Override
+ public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
+ }
+
+// @Override
+// public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, VertxHttpStreamBase, ?> stream,
+// Http2Flags flags, ByteBuf payload) {
+// VertxHttpStreamBase, ?, Http2Headers> stream = stream(streamId);
+// if (stream != null) {
+// Buffer buff = Buffer.buffer(safeBuffer(payload));
+// stream.onCustomFrame(new HttpFrameImpl(frameType, flags.value(), buff));
+// }
+// }
+
+// @Override
+ public void onRstStreamRead(ChannelHandlerContext ctx, VertxHttpStreamBase, ?> stream, long errorCode) {
+// VertxHttpStreamBase, ?, Http2Headers> stream = stream(streamId);
+ if (stream != null) {
+ stream.onReset(errorCode);
+ }
+ }
+
+// @Override
+ public int onDataRead(ChannelHandlerContext ctx, VertxHttpStreamBase, ?> stream,
+ ByteBuf data, int padding, boolean endOfStream) {
+ if (stream != null) {
+ data = safeBuffer(data);
+ Buffer buff = Buffer.buffer(data);
+ stream.onData(buff);
+ if (endOfStream) {
+ stream.onEnd();
+ }
+ }
+ return padding;
+ }
+
+ @Override
+ public int getWindowSize() {
+ return windowSize;
+ }
+
+ @Override
+ public HttpConnection setWindowSize(int windowSize) {
+// try {
+// Http2Stream stream = handler.encoder().connection().connectionStream();
+// int delta = windowSize - this.windowSize;
+// handler.decoder().flowController().incrementWindowSize(stream, delta);
+// this.windowSize = windowSize;
+// return this;
+// } catch (Http2Exception e) {
+// throw new VertxException(e);
+// }
+ return this;
+ }
+
+ @Override
+ public HttpConnection goAway(long errorCode, int lastStreamId, Buffer debugData) {
+// if (errorCode < 0) {
+// throw new IllegalArgumentException();
+// }
+// if (lastStreamId < 0) {
+// lastStreamId = handler.connection().remote().lastStreamCreated();
+// }
+// handler.writeGoAway(errorCode, lastStreamId, debugData != null ? debugData.getByteBuf() : Unpooled.EMPTY_BUFFER);
+ return this;
+ }
+
+ @Override
+ public synchronized HttpConnection goAwayHandler(Handler handler) {
+ goAwayHandler = handler;
+ return this;
+ }
+
+ @Override
+ public synchronized HttpConnection shutdownHandler(Handler handler) {
+ shutdownHandler = handler;
+ return this;
+ }
+
+ @Override
+ public void shutdown(long timeout, Handler> handler) {
+ shutdown(timeout, vertx.promise(handler));
+ }
+
+ @Override
+ public Future shutdown(long timeoutMs) {
+ PromiseInternal promise = vertx.promise();
+ shutdown(timeoutMs, promise);
+ return promise.future();
+ }
+
+ private void shutdown(long timeout, PromiseInternal promise) {
+ if (timeout < 0) {
+ promise.fail("Invalid timeout value " + timeout);
+ return;
+ }
+ handler.gracefulShutdownTimeoutMillis(timeout);
+ ChannelFuture fut = channel().close();
+ fut.addListener(promise);
+ }
+
+ @Override
+ public Http3ConnectionBase closeHandler(Handler handler) {
+ return (Http3ConnectionBase) super.closeHandler(handler);
+ }
+
+ @Override
+ public Future close() {
+ PromiseInternal promise = context.promise();
+ ChannelPromise pr = chctx.newPromise();
+ ChannelPromise channelPromise = pr.addListener(promise);
+ handlerContext.writeAndFlush(Unpooled.EMPTY_BUFFER, pr);
+ channelPromise.addListener((ChannelFutureListener) future -> shutdown(0L));
+ return promise.future();
+ }
+
+ @Override
+ public synchronized HttpConnection remoteSettingsHandler(Handler handler) {
+// remoteSettingsHandler = handler;
+ return this;
+ }
+
+ @Override
+ public synchronized io.vertx.core.http.Http2Settings remoteSettings() {
+// return HttpUtils.toVertxSettings(remoteSettings);
+ return null;
+ }
+
+ @Override
+ public synchronized io.vertx.core.http.Http2Settings settings() {
+// return HttpUtils.toVertxSettings(localSettings);
+ return null;
+ }
+
+ @Override
+ public Future updateSettings(io.vertx.core.http.Http2Settings settings) {
+ Promise promise = context.promise();
+ io.netty.handler.codec.http2.Http2Settings settingsUpdate = HttpUtils.fromVertxSettings(settings);
+ updateSettings(settingsUpdate, promise);
+ return promise.future();
+ }
+
+ @Override
+ public HttpConnection updateSettings(io.vertx.core.http.Http2Settings settings, @Nullable Handler> completionHandler) {
+ updateSettings(settings).onComplete(completionHandler);
+ return this;
+ }
+
+ protected void updateSettings(io.netty.handler.codec.http2.Http2Settings settingsUpdate, Handler> completionHandler) {
+// Http2Settings current = handler.decoder().localSettings();
+// for (Map.Entry entry : current.entrySet()) {
+// Character key = entry.getKey();
+// if (Objects.equals(settingsUpdate.get(key), entry.getValue())) {
+// settingsUpdate.remove(key);
+// }
+// }
+// Handler pending = v -> {
+// synchronized (Http2ConnectionBase.this) {
+// localSettings.putAll(settingsUpdate);
+// }
+// if (completionHandler != null) {
+// completionHandler.handle(Future.succeededFuture());
+// }
+// };
+// updateSettingsHandlers.add(pending);
+// handler.writeSettings(settingsUpdate).addListener(fut -> {
+// if (!fut.isSuccess()) {
+// synchronized (Http2ConnectionBase.this) {
+// updateSettingsHandlers.remove(pending);
+// }
+// if (completionHandler != null) {
+// completionHandler.handle(Future.failedFuture(fut.cause()));
+// }
+// }
+// });
+ }
+
+ @Override
+ public Future ping(Buffer data) {
+ if (data.length() != 8) {
+ throw new IllegalArgumentException("Ping data must be exactly 8 bytes");
+ }
+ Promise promise = context.promise();
+ handler.writePing(data.getLong(0)).addListener(fut -> {
+ if (fut.isSuccess()) {
+ synchronized (Http3ConnectionBase.this) {
+ pongHandlers.add(promise);
+ }
+ } else {
+ promise.fail(fut.cause());
+ }
+ });
+ return promise.future();
+ }
+
+ @Override
+ public HttpConnection ping(Buffer data, Handler> pongHandler) {
+ Future fut = ping(data);
+ if (pongHandler != null) {
+ fut.onComplete(pongHandler);
+ }
+ return this;
+ }
+
+ @Override
+ public synchronized HttpConnection pingHandler(Handler handler) {
+ pingHandler = handler;
+ return this;
+ }
+
+ // Necessary to set the covariant return type
+ @Override
+ public Http3ConnectionBase exceptionHandler(Handler handler) {
+ return (Http3ConnectionBase) super.exceptionHandler(handler);
+ }
+
+ public void consumeCredits(QuicStreamChannel stream, int numBytes) {
+// throw new RuntimeException("Method not implemented");
+ }
+
+
+ // @Override
+ public void onHeadersRead(ChannelHandlerContext ctx, VertxHttpStreamBase, ?> stream,
+ Http3Headers headers, boolean endOfStream) throws Http2Exception {
+ onHeadersRead(stream, headers, null, endOfStream);
+ }
+
+ // Private
+
+ private void checkShutdown() {
+ Handler shutdownHandler;
+ synchronized (this) {
+ if (shutdown) {
+ return;
+ }
+ Http3ConnectionBase conn = handler.connection();
+ if ((!handler.goAwayReceived() /*&& !conn.goAwaySent()*/) /*|| conn.numActiveStreams() > 0*/) {
+ // TODO: correct these
+ return;
+ }
+ shutdown = true;
+ shutdownHandler = this.shutdownHandler;
+ }
+ doShutdown(shutdownHandler);
+ }
+
+ protected void doShutdown(Handler shutdownHandler) {
+ if (shutdownHandler != null) {
+ context.dispatch(shutdownHandler);
+ }
+ }
+}
diff --git a/src/main/java/io/vertx/core/http/impl/Http3SslHandshakeHandler.java b/src/main/java/io/vertx/core/http/impl/Http3SslHandshakeHandler.java
new file mode 100644
index 00000000000..31f547aa132
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/impl/Http3SslHandshakeHandler.java
@@ -0,0 +1,26 @@
+package io.vertx.core.http.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.ssl.SslHandshakeCompletionEvent;
+import io.vertx.core.impl.future.PromiseInternal;
+
+class Http3SslHandshakeHandler extends ChannelInboundHandlerAdapter {
+ private final PromiseInternal promise;
+
+ public Http3SslHandshakeHandler(PromiseInternal promise) {
+ this.promise = promise;
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
+ if (evt instanceof SslHandshakeCompletionEvent) {
+ SslHandshakeCompletionEvent completion = (SslHandshakeCompletionEvent) evt;
+ if (!completion.isSuccess()) {
+ promise.tryFail(completion.cause());
+ }
+ } else {
+ ctx.fireUserEventTriggered(evt);
+ }
+ }
+}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java
index eb2e01bbe75..2ec3d619558 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java
@@ -14,10 +14,13 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.incubator.codec.quic.QuicChannel;
+import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
@@ -25,8 +28,8 @@
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpVersion;
-import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.ProxyOptions;
@@ -87,7 +90,8 @@ public SocketAddress server() {
}
private void connect(EventLoopContext context, Promise promise) {
- netClient.connectInternal(proxyOptions, server, peerAddress, this.options.isForceSni() ? peerAddress.host() : null, ssl, useAlpn, false, promise, context, 0);
+ netClient.connectInternal(proxyOptions, server, peerAddress, this.options.isForceSni() ? peerAddress.host() :
+ null, ssl, useAlpn, false, promise, context, 0);
}
public Future wrap(EventLoopContext context, NetSocket so_) {
@@ -100,7 +104,7 @@ public Future wrap(EventLoopContext context, NetSocket so_
List removedHandlers = new ArrayList<>();
for (Map.Entry stringChannelHandlerEntry : pipeline) {
ChannelHandler handler = stringChannelHandlerEntry.getValue();
- if (!(handler instanceof SslHandler)) {
+ if (!(handler instanceof SslHandler) && !(handler.getClass().getSimpleName().equals("QuicheQuicClientCodec"))) {
removedHandlers.add(handler);
}
}
@@ -111,7 +115,10 @@ public Future wrap(EventLoopContext context, NetSocket so_
if (ssl) {
String protocol = so.applicationLayerProtocol();
if (useAlpn) {
- if ("h2".equals(protocol)) {
+ if ("h3".equals(protocol)) {
+ applyHttp3ConnectionOptions(ch.pipeline());
+ http3Connected(context, metric, ch, promise);
+ } else if ("h2".equals(protocol)) {
applyHttp2ConnectionOptions(ch.pipeline());
http2Connected(context, metric, ch, promise);
} else {
@@ -125,7 +132,10 @@ public Future wrap(EventLoopContext context, NetSocket so_
http1xConnected(version, server, true, context, metric, ch, promise);
}
} else {
- if (version == HttpVersion.HTTP_2) {
+ if (version == HttpVersion.HTTP_3) {
+ applyHttp3ConnectionOptions(pipeline);
+ http3Connected(context, metric, ch, promise);
+ } else if (version == HttpVersion.HTTP_2) {
if (this.options.isHttp2ClearTextUpgrade()) {
applyHttp1xConnectionOptions(pipeline);
http1xConnected(version, server, false, context, metric, ch, promise);
@@ -153,7 +163,18 @@ private void applyHttp2ConnectionOptions(ChannelPipeline pipeline) {
int readIdleTimeout = options.getReadIdleTimeout();
int writeIdleTimeout = options.getWriteIdleTimeout();
if (idleTimeout > 0 || readIdleTimeout > 0 || writeIdleTimeout > 0) {
- pipeline.addLast("idle", new IdleStateHandler(readIdleTimeout, writeIdleTimeout, idleTimeout, options.getIdleTimeoutUnit()));
+ pipeline.addLast("idle", new IdleStateHandler(readIdleTimeout, writeIdleTimeout, idleTimeout,
+ options.getIdleTimeoutUnit()));
+ }
+ }
+
+ private void applyHttp3ConnectionOptions(ChannelPipeline pipeline) {
+ int idleTimeout = options.getIdleTimeout();
+ int readIdleTimeout = options.getReadIdleTimeout();
+ int writeIdleTimeout = options.getWriteIdleTimeout();
+ if (idleTimeout > 0 || readIdleTimeout > 0 || writeIdleTimeout > 0) {
+ pipeline.addLast("idle", new IdleStateHandler(readIdleTimeout, writeIdleTimeout, idleTimeout,
+ options.getIdleTimeoutUnit()));
}
}
@@ -162,7 +183,8 @@ private void applyHttp1xConnectionOptions(ChannelPipeline pipeline) {
int readIdleTimeout = options.getReadIdleTimeout();
int writeIdleTimeout = options.getWriteIdleTimeout();
if (idleTimeout > 0 || readIdleTimeout > 0 || writeIdleTimeout > 0) {
- pipeline.addLast("idle", new IdleStateHandler(readIdleTimeout, writeIdleTimeout, idleTimeout, options.getIdleTimeoutUnit()));
+ pipeline.addLast("idle", new IdleStateHandler(readIdleTimeout, writeIdleTimeout, idleTimeout,
+ options.getIdleTimeoutUnit()));
}
if (options.getLogActivity()) {
pipeline.addLast("logging", new LoggingHandler(options.getActivityLogDataFormat()));
@@ -189,7 +211,8 @@ private void http1xConnected(HttpVersion version,
boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade();
VertxHandler clientHandler = VertxHandler.create(chctx -> {
HttpClientMetrics met = client.metrics();
- Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, client, chctx, ssl, server, context, this.metrics);
+ Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, client,
+ chctx, ssl, server, context, this.metrics);
if (met != null) {
conn.metric(socketMetric);
met.endpointConnected(metrics);
@@ -246,6 +269,37 @@ private void http2Connected(EventLoopContext context,
clientHandler.connectFuture().addListener(promise);
}
+ private void http3Connected(EventLoopContext context,
+ Object metric,
+ Channel ch,
+ PromiseInternal promise) {
+ VertxHttp3ConnectionHandler clientHandler;
+ try {
+ clientHandler = Http3ClientConnection.createVertxHttp3ConnectionHandler(client, metrics, context, false, metric);
+
+ QuicChannel.newBootstrap(ch)
+ .handler(clientHandler.getHttp3ConnectionHandler())
+ .streamHandler(clientHandler.getStreamHandler())
+ .localAddress(ch.localAddress())
+ .remoteAddress(ch.remoteAddress())
+ .connect()
+ .addListener((io.netty.util.concurrent.Future future) -> {
+ if(!future.isSuccess()) {
+ connectFailed(ch, future.cause(), promise);
+ return;
+ }
+
+ QuicChannel quicChannel = future.get();
+ quicChannel.pipeline().addLast(clientHandler.getUserEventHandler());
+ });
+
+ } catch (Exception e) {
+ connectFailed(ch, e, promise);
+ return;
+ }
+ clientHandler.connectFuture().addListener(promise);
+ }
+
private void connectFailed(Channel ch, Throwable t, Promise future) {
if (ch != null) {
try {
diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
index 0fecb6dde39..d4ccf4addba 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
@@ -159,6 +159,9 @@ public HttpClientImpl(VertxInternal vertx, HttpClientOptions options, CloseFutur
List alpnVersions = options.getAlpnVersions();
if (alpnVersions == null || alpnVersions.isEmpty()) {
switch (options.getProtocolVersion()) {
+ case HTTP_3:
+ alpnVersions = Arrays.asList(HttpVersion.HTTP_3, HttpVersion.HTTP_2, HttpVersion.HTTP_1_1);
+ break;
case HTTP_2:
alpnVersions = Arrays.asList(HttpVersion.HTTP_2, HttpVersion.HTTP_1_1);
break;
diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
index c97dce40774..45f39557ffd 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
@@ -54,7 +54,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
private Throwable reset;
private int followRedirects;
private HeadersMultiMap headers;
- private StreamPriority priority;
+ private StreamPriorityBase priority;
private boolean headWritten;
private boolean isConnect;
private String traceOperation;
@@ -65,7 +65,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
this.chunked = false;
this.endPromise = context.promise();
this.endFuture = endPromise.future();
- this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY;
+ this.priority = stream.createDefaultStreamPriority();
this.traceOperation = traceOperation;
//
@@ -526,7 +526,7 @@ private void checkResponseHandler() {
}
@Override
- public synchronized HttpClientRequest setStreamPriority(StreamPriority priority) {
+ public synchronized HttpClientRequest setStreamPriority(StreamPriorityBase priority) {
if (headWritten) {
stream.updatePriority(priority);
} else {
@@ -536,7 +536,7 @@ public synchronized HttpClientRequest setStreamPriority(StreamPriority priority)
}
@Override
- public synchronized StreamPriority getStreamPriority() {
+ public synchronized StreamPriorityBase getStreamPriority() {
return stream.priority();
}
}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java
index 89f36ca50de..0184177c949 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java
@@ -230,7 +230,7 @@ public boolean writeQueueFull() {
}
@Override
- public StreamPriority getStreamPriority() {
+ public StreamPriorityBase getStreamPriority() {
return stream.priority();
}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java
index 06b40cc7a1f..be06d445232 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java
@@ -48,7 +48,7 @@ public class HttpClientResponseImpl implements HttpClientResponse {
private HttpEventHandler eventHandler;
private Handler customFrameHandler;
- private Handler priorityHandler;
+ private Handler priorityHandler;
// Cache these for performance
private MultiMap headers;
@@ -278,7 +278,7 @@ public synchronized Future end() {
}
@Override
- public HttpClientResponse streamPriorityHandler(Handler handler) {
+ public HttpClientResponse streamPriorityHandler(Handler handler) {
synchronized (conn) {
if (handler != null) {
checkEnded();
@@ -288,8 +288,8 @@ public HttpClientResponse streamPriorityHandler(Handler handler)
return this;
}
- void handlePriorityChange(StreamPriority streamPriority) {
- Handler handler;
+ void handlePriorityChange(StreamPriorityBase streamPriority) {
+ Handler handler;
synchronized (conn) {
handler = priorityHandler;
}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java
index e940fbc05ed..9384e4e7bef 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java
@@ -20,7 +20,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpFrame;
import io.vertx.core.http.HttpVersion;
-import io.vertx.core.http.StreamPriority;
+import io.vertx.core.http.StreamPriorityBase;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.WriteStream;
@@ -52,7 +52,7 @@ void writeHead(HttpRequestHead request,
boolean chunked,
ByteBuf buf,
boolean end,
- StreamPriority priority,
+ StreamPriorityBase priority,
boolean connect,
Handler> handler);
void writeBuffer(ByteBuf buf, boolean end, Handler> listener);
@@ -102,7 +102,7 @@ default void end(Handler> handler) {
void headHandler(Handler handler);
void chunkHandler(Handler handler);
void endHandler(Handler handler);
- void priorityHandler(Handler handler);
+ void priorityHandler(Handler handler);
void closeHandler(Handler handler);
void doSetWriteQueueMaxSize(int size);
@@ -112,7 +112,7 @@ default void end(Handler> handler) {
void reset(Throwable cause);
- StreamPriority priority();
- void updatePriority(StreamPriority streamPriority);
-
+ StreamPriorityBase priority();
+ void updatePriority(StreamPriorityBase streamPriority);
+ StreamPriorityBase createDefaultStreamPriority();
}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpException.java b/src/main/java/io/vertx/core/http/impl/HttpException.java
new file mode 100644
index 00000000000..0d70844f85a
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/impl/HttpException.java
@@ -0,0 +1,7 @@
+package io.vertx.core.http.impl;
+
+public class HttpException extends Exception{
+ public HttpException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpServerRequestWrapper.java b/src/main/java/io/vertx/core/http/impl/HttpServerRequestWrapper.java
index 5d23e180ba7..7521fd65649 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpServerRequestWrapper.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpServerRequestWrapper.java
@@ -10,7 +10,6 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
-import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpConnection;
@@ -21,7 +20,7 @@
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.ServerWebSocket;
-import io.vertx.core.http.StreamPriority;
+import io.vertx.core.http.StreamPriorityBase;
import io.vertx.core.net.HostAndPort;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
@@ -316,13 +315,13 @@ public HttpConnection connection() {
}
@Override
- public StreamPriority streamPriority() {
+ public StreamPriorityBase streamPriority() {
return delegate.streamPriority();
}
@Override
@Fluent
- public HttpServerRequest streamPriorityHandler(Handler handler) {
+ public HttpServerRequest streamPriorityHandler(Handler handler) {
return delegate.streamPriorityHandler(handler);
}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpStream.java b/src/main/java/io/vertx/core/http/impl/HttpStream.java
new file mode 100644
index 00000000000..91da204c3a2
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/impl/HttpStream.java
@@ -0,0 +1,203 @@
+package io.vertx.core.http.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.MultiMap;
+import io.vertx.core.VertxException;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpFrame;
+import io.vertx.core.http.HttpVersion;
+import io.vertx.core.http.StreamPriorityBase;
+import io.vertx.core.http.impl.headers.HeadersMultiMap;
+import io.vertx.core.http.impl.headers.VertxHttpHeaders;
+import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.net.impl.ConnectionBase;
+import io.vertx.core.spi.metrics.ClientMetrics;
+import io.vertx.core.spi.tracing.VertxTracer;
+
+import java.util.Map;
+
+abstract class HttpStream extends VertxHttpStreamBase {
+
+ private final boolean push;
+ private HttpResponseHead response;
+ protected Object metric;
+ protected Object trace;
+ protected boolean requestEnded;
+ private boolean responseEnded;
+ protected Handler headHandler;
+ protected Handler chunkHandler;
+ protected Handler endHandler;
+ protected Handler priorityHandler;
+ protected Handler drainHandler;
+ protected Handler continueHandler;
+ protected Handler earlyHintsHandler;
+ protected Handler unknownFrameHandler;
+ protected Handler exceptionHandler;
+ protected Handler pushHandler;
+ protected Handler closeHandler;
+ protected long writeWindow;
+ protected final long windowSize;
+
+ protected final ClientMetrics metrics;
+
+ protected abstract long getWindowSize();
+ protected abstract HttpVersion version();
+ protected abstract void recycle();
+ protected abstract void metricsEnd(HttpStream, ?> stream);
+
+ HttpStream(C conn, ContextInternal context, boolean push, ClientMetrics ,?, ?, ?> metrics) {
+ super(conn, context);
+
+ this.push = push;
+ this.windowSize = getWindowSize();
+
+ this.metrics = metrics;
+ }
+
+ void onContinue() {
+ context.emit(null, v -> handleContinue());
+ }
+
+ void onEarlyHints(MultiMap headers) {
+ context.emit(null, v -> handleEarlyHints(headers));
+ }
+
+ abstract void handleContinue();
+
+ abstract void handleEarlyHints(MultiMap headers);
+
+ public Object metric() {
+ return metric;
+ }
+
+ public Object trace() {
+ return trace;
+ }
+
+ @Override
+ void doWriteData(ByteBuf chunk, boolean end, Handler> handler) {
+ super.doWriteData(chunk, end, handler);
+ }
+
+ @Override
+ void doWriteHeaders(VertxHttpHeaders headers, boolean end, boolean checkFlush, Handler> handler) {
+ isConnect = "CONNECT".contentEquals(headers.method());
+ super.doWriteHeaders(headers, end, checkFlush, handler);
+ }
+
+ @Override
+ protected void doWriteReset(long code) {
+ if (!requestEnded || !responseEnded) {
+ super.doWriteReset(code);
+ }
+ }
+
+ protected void endWritten() {
+ requestEnded = true;
+ if (metrics != null) {
+ metrics.requestEnd(metric, bytesWritten());
+ }
+ }
+
+ @Override
+ void onEnd(MultiMap trailers) {
+ metricsEnd(this);
+ responseEnded = true;
+ super.onEnd(trailers);
+ }
+
+ @Override
+ void onReset(long code) {
+ if (metrics != null) {
+ metrics.requestReset(metric);
+ }
+ super.onReset(code);
+ }
+
+ @Override
+ void onHeaders(VertxHttpHeaders headers, StreamPriorityBase streamPriority) {
+ if (streamPriority != null) {
+ priority(streamPriority);
+ }
+ if (response == null) {
+ int status;
+ String statusMessage;
+ try {
+ status = Integer.parseInt(String.valueOf(headers.status()));
+ statusMessage = HttpResponseStatus.valueOf(status).reasonPhrase();
+ } catch (Exception e) {
+ handleException(e);
+ writeReset(0x01 /* PROTOCOL_ERROR */);
+ return;
+ }
+ if (status == 100) {
+ onContinue();
+ return;
+ } else if (status == 103) {
+ MultiMap headersMultiMap = HeadersMultiMap.httpHeaders();
+ removeStatusHeaders(headers);
+ for (Map.Entry header : headers.getIterable()) {
+ headersMultiMap.add(header.getKey(), header.getValue());
+ }
+ onEarlyHints(headersMultiMap);
+ return;
+ }
+ response = new HttpResponseHead(
+ version(),
+ status,
+ statusMessage,
+ headers.toHeaderAdapter());
+ removeStatusHeaders(headers);
+
+ if (metrics != null) {
+ metrics.responseBegin(metric, response);
+ }
+
+ if (headHandler != null) {
+ context.emit(response, headHandler);
+ }
+ }
+ }
+
+ private void removeStatusHeaders(VertxHttpHeaders headers) {
+ headers.remove(":status");
+ }
+
+ @Override
+ void onClose() {
+ if (metrics != null) {
+ if (!requestEnded || !responseEnded) {
+ metrics.requestReset(metric);
+ }
+ }
+ VertxTracer tracer = context.tracer();
+ if (tracer != null && trace != null) {
+ VertxException err;
+ if (responseEnded && requestEnded) {
+ err = null;
+ } else {
+ err = HttpUtils.STREAM_CLOSED_EXCEPTION;
+ }
+ tracer.receiveResponse(context, response, trace, err, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
+ }
+ if (!responseEnded) {
+ // NOT SURE OF THAT
+ onException(HttpUtils.STREAM_CLOSED_EXCEPTION);
+ }
+ super.onClose();
+ // commented to be used later when we properly define the HTTP/2 connection expiration from the pool
+ // boolean disposable = conn.streams.isEmpty();
+ if (!push) {
+ recycle();
+ } /* else {
+ conn.listener.onRecycle(0, disposable);
+ } */
+ if (closeHandler != null) {
+ closeHandler.handle(null);
+ }
+ }
+
+}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpStreamImpl.java b/src/main/java/io/vertx/core/http/impl/HttpStreamImpl.java
new file mode 100644
index 00000000000..aa61c69dc86
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/impl/HttpStreamImpl.java
@@ -0,0 +1,309 @@
+package io.vertx.core.http.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.MultiMap;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpFrame;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.StreamPriorityBase;
+import io.vertx.core.http.StreamResetException;
+import io.vertx.core.http.impl.headers.VertxHttpHeaders;
+import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.net.impl.ConnectionBase;
+import io.vertx.core.spi.metrics.ClientMetrics;
+import io.vertx.core.spi.tracing.SpanKind;
+import io.vertx.core.spi.tracing.VertxTracer;
+import io.vertx.core.streams.WriteStream;
+import io.vertx.core.tracing.TracingPolicy;
+
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+abstract class HttpStreamImpl extends HttpStream implements HttpClientStream {
+
+ protected abstract boolean isTryUseCompression();
+
+ abstract int lastStreamCreated();
+
+ protected abstract void createStreamInternal(int id, boolean b, Handler> onComplete) throws HttpException;
+
+ protected abstract TracingPolicy getTracingPolicy();
+
+ abstract VertxHttpHeaders createHttpHeadersWrapper();
+
+ HttpStreamImpl(C conn, ContextInternal context, boolean push, ClientMetrics, ?, ?, ?> metrics) {
+ super(conn, context, push, metrics);
+ }
+
+ @Override
+ public void closeHandler(Handler handler) {
+ closeHandler = handler;
+ }
+
+ @Override
+ public void continueHandler(Handler handler) {
+ continueHandler = handler;
+ }
+
+ @Override
+ public void earlyHintsHandler(Handler handler) {
+ earlyHintsHandler = handler;
+ }
+
+ @Override
+ public void unknownFrameHandler(Handler handler) {
+ unknownFrameHandler = handler;
+ }
+
+ @Override
+ public void pushHandler(Handler handler) {
+ pushHandler = handler;
+ }
+
+ @Override
+ public HttpStreamImpl, ?> drainHandler(Handler handler) {
+ drainHandler = handler;
+ return this;
+ }
+
+ @Override
+ public HttpStreamImpl, ?> exceptionHandler(Handler handler) {
+ exceptionHandler = handler;
+ return this;
+ }
+
+ @Override
+ public WriteStream setWriteQueueMaxSize(int maxSize) {
+ return this;
+ }
+
+ @Override
+ public boolean writeQueueFull() {
+ return !isNotWritable();
+ }
+
+ @Override
+ public synchronized boolean isNotWritable() {
+ return writeWindow > windowSize;
+ }
+
+ @Override
+ public void headHandler(Handler handler) {
+ headHandler = handler;
+ }
+
+ @Override
+ public void chunkHandler(Handler handler) {
+ chunkHandler = handler;
+ }
+
+ @Override
+ public void priorityHandler(Handler handler) {
+ priorityHandler = handler;
+ }
+
+ @Override
+ public void endHandler(Handler handler) {
+ endHandler = handler;
+ }
+
+ @Override
+ public StreamPriorityBase priority() {
+ return super.priority();
+ }
+
+ @Override
+ public void updatePriority(StreamPriorityBase streamPriority) {
+ super.updatePriority(streamPriority);
+ }
+
+ @Override
+ void handleEnd(MultiMap trailers) {
+ if (endHandler != null) {
+ endHandler.handle(trailers);
+ }
+ }
+
+ @Override
+ void handleData(Buffer buf) {
+ if (chunkHandler != null) {
+ chunkHandler.handle(buf);
+ }
+ }
+
+ @Override
+ void handleReset(long errorCode) {
+ handleException(new StreamResetException(errorCode));
+ }
+
+ @Override
+ void handleWritabilityChanged(boolean writable) {
+ }
+
+ @Override
+ void handleCustomFrame(HttpFrame frame) {
+ if (unknownFrameHandler != null) {
+ unknownFrameHandler.handle(frame);
+ }
+ }
+
+
+ @Override
+ void handlePriorityChange(StreamPriorityBase streamPriority) {
+ if (priorityHandler != null) {
+ priorityHandler.handle(streamPriority);
+ }
+ }
+
+ void handleContinue() {
+ if (continueHandler != null) {
+ continueHandler.handle(null);
+ }
+ }
+
+ void handleEarlyHints(MultiMap headers) {
+ if (earlyHintsHandler != null) {
+ earlyHintsHandler.handle(headers);
+ }
+ }
+
+ void handleException(Throwable exception) {
+ if (exceptionHandler != null) {
+ exceptionHandler.handle(exception);
+ }
+ }
+
+ @Override
+ public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriorityBase priority,
+ boolean connect, Handler> handler) {
+ priority(priority);
+ conn.context.emit(null, v -> {
+ writeHeaders(request, buf, end, priority, connect, handler);
+ });
+ }
+
+ private void writeHeaders(HttpRequestHead request, ByteBuf buf, boolean end, StreamPriorityBase priority,
+ boolean connect,
+ Handler> handler) {
+ VertxHttpHeaders headers = createHttpHeadersWrapper();
+ headers.method(request.method.name());
+ boolean e;
+ if (request.method == HttpMethod.CONNECT) {
+ if (request.authority == null) {
+ throw new IllegalArgumentException("Missing :authority / host header");
+ }
+ headers.authority(request.authority);
+ // don't end stream for CONNECT
+ e = false;
+ } else {
+ headers.path(request.uri);
+ headers.scheme(conn.isSsl() ? "https" : "http");
+ if (request.authority != null) {
+ headers.authority(request.authority);
+ }
+ e = end;
+ }
+ if (request.headers != null && request.headers.size() > 0) {
+ for (Map.Entry header : request.headers) {
+ headers.add(HttpUtils.toLowerCase(header.getKey()), header.getValue());
+ }
+ }
+ if (isTryUseCompression() && headers.get(HttpHeaderNames.ACCEPT_ENCODING) == null) {
+ headers.set(HttpHeaderNames.ACCEPT_ENCODING, Http1xClientConnection.determineCompressionAcceptEncoding());
+ }
+ try {
+ createStream(request, headers, handler);
+ } catch (HttpException ex) {
+ if (handler != null) {
+ handler.handle(context.failedFuture(ex));
+ }
+ handleException(ex);
+ return;
+ }
+ if (buf != null) {
+ doWriteHeaders(headers, false, false, null);
+ doWriteData(buf, e, handler);
+ } else {
+ doWriteHeaders(headers, e, true, handler);
+ }
+ }
+
+ private void createStream(HttpRequestHead head, VertxHttpHeaders headers,
+ Handler> handler) throws HttpException {
+ int id = lastStreamCreated();
+ if (id == 0) {
+ id = 1;
+ } else {
+ id += 2;
+ }
+ head.id = id;
+ head.remoteAddress = conn.remoteAddress();
+ createStreamInternal(id, false, streamX -> {
+ init(streamX.result());
+ if (metrics != null) {
+ metric = metrics.requestBegin(headers.path().toString(), head);
+ }
+ VertxTracer tracer = context.tracer();
+ if (tracer != null) {
+ BiConsumer headers_ =
+ (key, val) -> headers.add(key, val);
+ String operation = head.traceOperation;
+ if (operation == null) {
+ operation = headers.method().toString();
+ }
+ trace = tracer.sendRequest(context, SpanKind.RPC, getTracingPolicy(), head, operation,
+ headers_,
+ HttpUtils.CLIENT_HTTP_REQUEST_TAG_EXTRACTOR);
+ }
+ });
+ }
+
+ @Override
+ public void writeBuffer(ByteBuf buf, boolean end, Handler> listener) {
+ if (buf != null) {
+ int size = buf.readableBytes();
+ synchronized (this) {
+ writeWindow += size;
+ }
+ if (listener != null) {
+ Handler> prev = listener;
+ listener = ar -> {
+ Handler drainHandler;
+ synchronized (this) {
+ boolean full = writeWindow > windowSize;
+ writeWindow -= size;
+ if (full && writeWindow <= windowSize) {
+ drainHandler = this.drainHandler;
+ } else {
+ drainHandler = null;
+ }
+ }
+ if (drainHandler != null) {
+ drainHandler.handle(null);
+ }
+ prev.handle(ar);
+ };
+ }
+ }
+ writeData(buf, end, listener);
+ }
+
+ @Override
+ public ContextInternal getContext() {
+ return context;
+ }
+
+ @Override
+ public void doSetWriteQueueMaxSize(int size) {
+ }
+
+ @Override
+ public void reset(Throwable cause) {
+ long code = cause instanceof StreamResetException ? ((StreamResetException) cause).getCode() : 0;
+ conn.context.emit(code, this::writeReset);
+ }
+
+}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpUtils.java b/src/main/java/io/vertx/core/http/impl/HttpUtils.java
index 78e293c054b..48364cc4b95 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpUtils.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpUtils.java
@@ -17,6 +17,7 @@
import io.netty.channel.Channel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.incubator.codec.quic.QuicStreamPriority;
import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil;
import io.vertx.core.AsyncResult;
@@ -27,10 +28,13 @@
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystem;
import io.vertx.core.file.OpenOptions;
+import io.vertx.core.http.Http2StreamPriority;
+import io.vertx.core.http.Http3StreamPriority;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.StreamPriority;
+import io.vertx.core.http.StreamPriorityBase;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.HostAndPort;
import io.vertx.core.spi.tracing.TagExtractor;
@@ -163,7 +167,7 @@ public String value(HttpResponseHead resp, int index) {
}
};
- static final StreamPriority DEFAULT_STREAM_PRIORITY = new StreamPriority() {
+ static final StreamPriorityBase DEFAULT_STREAM_PRIORITY = new Http2StreamPriority(new StreamPriority() {
@Override
public StreamPriority setWeight(short weight) {
throw new UnsupportedOperationException("Unmodifiable stream priority");
@@ -178,8 +182,9 @@ public StreamPriority setDependency(int dependency) {
public StreamPriority setExclusive(boolean exclusive) {
throw new UnsupportedOperationException("Unmodifiable stream priority");
}
- };
+ });
+ static final StreamPriorityBase DEFAULT_QUIC_STREAM_PRIORITY = new Http3StreamPriority(new QuicStreamPriority(0, true));
private HttpUtils() {
}
diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java
index 2d566444847..e0465b0e2f9 100644
--- a/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java
+++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java
@@ -25,6 +25,7 @@
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.GoAway;
+import io.vertx.core.http.impl.headers.VertxHttpHeaders;
import io.vertx.core.net.impl.ConnectionBase;
import java.util.function.Function;
@@ -216,9 +217,9 @@ public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData
//
- void writeHeaders(Http2Stream stream, Http2Headers headers, boolean end, int streamDependency, short weight, boolean exclusive, boolean checkFlush, FutureListener listener) {
+ void writeHeaders(Http2Stream stream, VertxHttpHeaders headers, boolean end, int streamDependency, short weight, boolean exclusive, boolean checkFlush, FutureListener listener) {
ChannelPromise promise = listener == null ? chctx.voidPromise() : chctx.newPromise().addListener(listener);
- encoder().writeHeaders(chctx, stream.id(), headers, streamDependency, weight, exclusive, 0, end, promise);
+ encoder().writeHeaders(chctx, stream.id(), headers.getHeaders(), streamDependency, weight, exclusive, 0, end, promise);
if (checkFlush) {
checkFlush();
}
diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp3ConnectionHandler.java b/src/main/java/io/vertx/core/http/impl/VertxHttp3ConnectionHandler.java
new file mode 100644
index 00000000000..2f6b842e40f
--- /dev/null
+++ b/src/main/java/io/vertx/core/http/impl/VertxHttp3ConnectionHandler.java
@@ -0,0 +1,398 @@
+/*
+ * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+
+package io.vertx.core.http.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.ChannelInputShutdownEvent;
+import io.netty.channel.socket.ChannelInputShutdownReadComplete;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.ssl.SslHandshakeCompletionEvent;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.incubator.codec.http3.DefaultHttp3GoAwayFrame;
+import io.netty.incubator.codec.http3.DefaultHttp3Headers;
+import io.netty.incubator.codec.http3.DefaultHttp3SettingsFrame;
+import io.netty.incubator.codec.http3.DefaultHttp3UnknownFrame;
+import io.netty.incubator.codec.http3.Http3;
+import io.netty.incubator.codec.http3.Http3ClientConnectionHandler;
+import io.netty.incubator.codec.http3.Http3ConnectionHandler;
+import io.netty.incubator.codec.http3.Http3Headers;
+import io.netty.incubator.codec.http3.Http3ServerConnectionHandler;
+import io.netty.incubator.codec.http3.Http3SettingsFrame;
+import io.netty.incubator.codec.quic.QuicConnectionCloseEvent;
+import io.netty.incubator.codec.quic.QuicDatagramExtensionEvent;
+import io.netty.incubator.codec.quic.QuicStreamChannel;
+import io.netty.incubator.codec.quic.QuicStreamLimitChangedEvent;
+import io.netty.incubator.codec.quic.QuicStreamPriority;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.GoAway;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpVersion;
+import io.vertx.core.http.StreamPriorityBase;
+import io.vertx.core.http.impl.headers.VertxHttpHeaders;
+import io.vertx.core.impl.EventLoopContext;
+import io.vertx.core.net.impl.ConnectionBase;
+
+import java.util.function.Function;
+
+/**
+ * @author Iman Zolfaghari
+ */
+class VertxHttp3ConnectionHandler extends ChannelInboundHandlerAdapter {
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(VertxHttp3ConnectionHandler.class);
+
+ private final Function, C> connectionFactory;
+ private C connection;
+ private ChannelHandlerContext chctx;
+ private final Promise connectFuture;
+ private boolean settingsRead;
+ private Handler addHandler;
+ private Handler removeHandler;
+ private final Http3SettingsFrame http3InitialSettings;
+
+ private boolean read;
+ private Http3ConnectionHandler connectionHandlerInternal;
+ private ChannelHandler streamHandlerInternal;
+ private ChannelHandler userEventHandlerInternal;
+
+ public static final AttributeKey HTTP3_MY_STREAM_KEY = AttributeKey.valueOf(Http3ClientStream.class
+ , "HTTP3MyStream");
+
+ public VertxHttp3ConnectionHandler(
+ Function, C> connectionFactory,
+ EventLoopContext context,
+ Http3SettingsFrame http3InitialSettings, boolean isServer) {
+ this.connectionFactory = connectionFactory;
+ this.http3InitialSettings = http3InitialSettings;
+ connectFuture = new DefaultPromise<>(context.nettyEventLoop());
+ createHttp3ConnectionHandler(isServer);
+ createStreamHandler();
+ createUserEventHandler();
+ }
+
+ public Future connectFuture() {
+ if (connectFuture == null) {
+ throw new IllegalStateException();
+ }
+ return connectFuture;
+ }
+
+ public ChannelHandlerContext context() {
+ return chctx;
+ }
+
+ private void onSettingsRead(ChannelHandlerContext ctx, Http3SettingsFrame settings) {
+ this.chctx = ctx;
+ this.connection = connectionFactory.apply(this);
+ this.connection.onSettingsRead(ctx, settings);
+ this.settingsRead = true;
+ if (addHandler != null) {
+ addHandler.handle(connection);
+ }
+ this.connectFuture.trySuccess(connection);
+ }
+
+
+ /**
+ * Set a handler to be called when the connection is set on this handler.
+ *
+ * @param handler the handler to be notified
+ * @return this
+ */
+ public VertxHttp3ConnectionHandler addHandler(Handler handler) {
+ this.addHandler = handler;
+ return this;
+ }
+
+ /**
+ * Set a handler to be called when the connection is unset from this handler.
+ *
+ * @param handler the handler to be notified
+ * @return this
+ */
+ public VertxHttp3ConnectionHandler