diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 9250a3b87bf..84f6de6e32c 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -238,8 +238,13 @@ private void beginRequest(Stream stream, HttpRequestHead request, boolean chunke synchronized (this) { responses.add(stream); this.isConnect = connect; - if (this.metrics != null) { - stream.metric = this.metrics.requestBegin(request.uri, request); + if (metrics != null) { + Object metric = stream.metric; + if (metric != null) { + metrics.requestBegin(metric, request.uri, request); + } else { + stream.metric = metrics.requestBegin(request.uri, request); + } } VertxTracer tracer = context.tracer(); if (tracer != null) { @@ -364,10 +369,11 @@ private abstract static class Stream { private long bytesWritten; private long readWindow; - Stream(Http1xClientConnection conn, ContextInternal context, int id) { + Stream(Http1xClientConnection conn, ContextInternal context, Object metric, int id) { this.conn = conn; this.context = context; this.id = id; + this.metric = metric; this.promise = context.promise(); } @@ -430,8 +436,8 @@ private static class StreamImpl extends Stream implements HttpClientStream { private Handler exceptionHandler; private Handler closeHandler; - StreamImpl(ContextInternal context, Http1xClientConnection conn, int id) { - super(conn, context, id); + StreamImpl(ContextInternal context, Http1xClientConnection conn, Object metric, int id) { + super(conn, context, metric, id); this.queue = new InboundBuffer<>(context, 5) .handler(item -> { @@ -1301,7 +1307,13 @@ public void createStream(ContextInternal context, Handler createRequest(ContextInternal context) { } private StreamImpl createStream(ContextInternal context) { - return new StreamImpl(this, context, false); + Object metric; + if (metrics != null) { + metric = metrics.init(); + } else { + metric = null; + } + return new StreamImpl(this, context, metric, false); } private void recycle() { @@ -214,7 +220,7 @@ public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int stream Handler pushHandler = stream.pushHandler; if (pushHandler != null) { Http2Stream promisedStream = handler.connection().stream(promisedStreamId); - StreamImpl pushStream = new StreamImpl(this, context, true); + StreamImpl pushStream = new StreamImpl(this, context, null, true); pushStream.init(promisedStream); HttpClientPush push = new HttpClientPush(headers, pushStream); if (metrics != null) { @@ -254,10 +260,11 @@ static abstract class Stream extends VertxHttp2Stream { protected long writeWindow; protected final long windowSize; - Stream(Http2ClientConnection conn, ContextInternal context, boolean push) { + Stream(Http2ClientConnection conn, ContextInternal context, Object metric, boolean push) { super(conn, context); this.push = push; + this.metric = metric; this.windowSize = conn.getWindowSize(); } @@ -411,8 +418,8 @@ void onClose() { static class StreamImpl extends Stream implements HttpClientStream { - StreamImpl(Http2ClientConnection conn, ContextInternal context, boolean push) { - super(conn, context, push); + StreamImpl(Http2ClientConnection conn, ContextInternal context, Object metric, boolean push) { + super(conn, context, metric, push); } @Override @@ -636,7 +643,12 @@ private Http2Exception createStream(HttpRequestHead head, Http2Headers headers) } init(stream); if (conn.metrics != null) { - metric = conn.metrics.requestBegin(headers.path().toString(), head); + Object m = metric; + if (m != null) { + conn.metrics.requestBegin(m, headers.path().toString(), head); + } else { + metric = conn.metrics.requestBegin(headers.path().toString(), head); + } } VertxTracer tracer = context.tracer(); if (tracer != null) { diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java index 93814449c72..33a6d2fe3a8 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestBase.java @@ -28,7 +28,7 @@ /** * @author Julien Viet */ -public abstract class HttpClientRequestBase implements HttpClientRequest { +public abstract class HttpClientRequestBase implements HttpClientRequestInternal { protected final ContextInternal context; protected final HttpClientStream stream; @@ -76,6 +76,10 @@ protected String authority() { } } + public Object metric() { + return stream.metric(); + } + @Override public int streamId() { return stream.id(); diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestInternal.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestInternal.java new file mode 100644 index 00000000000..f0de77f4017 --- /dev/null +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestInternal.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2011-2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http.impl; + +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.spi.metrics.ClientMetrics; + +/** + * Extends to expose internal methods that are necessary for integration. + * + * @author Julien Viet + */ +public interface HttpClientRequestInternal extends HttpClientRequest { + + /** + * @return the request metric obtained from {@link ClientMetrics} for this request + */ + Object metric(); + +} diff --git a/src/main/java/io/vertx/core/spi/metrics/ClientMetrics.java b/src/main/java/io/vertx/core/spi/metrics/ClientMetrics.java index ced4c133018..5f07cafc45b 100644 --- a/src/main/java/io/vertx/core/spi/metrics/ClientMetrics.java +++ b/src/main/java/io/vertx/core/spi/metrics/ClientMetrics.java @@ -31,6 +31,31 @@ default T enqueueRequest() { default void dequeueRequest(T taskMetric) { } + /** + * Create a request metric instance, when the implementation + * + *
    + *
  • returns {@code null}, {@link #requestBegin(String, Object)} is called (backward compatibility mode)
  • + *
  • returns a non-null value, {@link #requestBegin(Object, String, Object)} is called with the returned request + * metric instance
  • + *
+ * + * @return a newly created request metric + */ + default M init() { + return null; + } + + /** + * Called when a client request begins and {@link #init()} has returned a non-null value. + * + * @param requestMetric the request metric + * @param uri an arbitrary uri + * @param request the request object + */ + default void requestBegin(M requestMetric, String uri, Req request) { + } + /** * Called when a client request begins. Vert.x will invoke {@link #requestEnd} when the request * has ended or {@link #requestReset} if the request/response has failed before. diff --git a/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java b/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java index 37adad1c09b..37deac62dd8 100644 --- a/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java +++ b/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java @@ -17,6 +17,7 @@ import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.impl.HttpClientImpl; +import io.vertx.core.http.impl.HttpClientRequestInternal; import io.vertx.core.http.impl.HttpServerRequestInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.core.metrics.MetricsOptions; @@ -191,6 +192,15 @@ public void testHttpMetricsLifecycle() throws Exception { @Test public void testHttpClientLifecycle() throws Exception { + testHttpClientLifecycle(false); + } + + @Test + public void testHttpClientLifecycleWithInit() throws Exception { + testHttpClientLifecycle(true); + } + + public void testHttpClientLifecycle(boolean implementInit) throws Exception { // The test cannot pass for HTTP/2 upgrade for now HttpClientOptions opts = createBaseClientOptions(); @@ -228,6 +238,7 @@ public void testHttpClientLifecycle() throws Exception { }); startServer(testAddress); FakeHttpClientMetrics clientMetrics = FakeMetricsBase.getMetrics(client); + clientMetrics.setImplementInit(implementInit); CountDownLatch responseBeginLatch = new CountDownLatch(1); CountDownLatch responseEndLatch = new CountDownLatch(1); Future request = client.request(new RequestOptions() @@ -235,6 +246,9 @@ public void testHttpClientLifecycle() throws Exception { .setPort(HttpTestBase.DEFAULT_HTTP_PORT) .setHost("localhost") .setURI("/somepath")).onComplete(onSuccess(req -> { + if (implementInit) { + assertNotNull(((HttpClientRequestInternal)req).metric()); + } req .response(onSuccess(resp -> { responseBeginLatch.countDown(); diff --git a/src/test/java/io/vertx/test/fakemetrics/EndpointMetric.java b/src/test/java/io/vertx/test/fakemetrics/EndpointMetric.java index a92ca427529..b7f9449d151 100644 --- a/src/test/java/io/vertx/test/fakemetrics/EndpointMetric.java +++ b/src/test/java/io/vertx/test/fakemetrics/EndpointMetric.java @@ -30,8 +30,10 @@ public class EndpointMetric implements ClientMetrics requests = new ConcurrentHashMap<>(); + private final boolean implementInit; - public EndpointMetric() { + public EndpointMetric(boolean implementInit) { + this.implementInit = implementInit; } @Override @@ -45,12 +47,36 @@ public void dequeueRequest(Void taskMetric) { queueSize.decrementAndGet(); } + @Override + public HttpClientMetric init() { + if (implementInit) { + return new HttpClientMetric(this); + } else { + return ClientMetrics.super.init(); + } + } + + @Override + public void requestBegin(HttpClientMetric requestMetric, String uri, HttpRequest request) { + if (implementInit) { + requestCount.incrementAndGet(); + requestMetric.request.set(request); + requests.put(request, requestMetric); + } else { + ClientMetrics.super.requestBegin(requestMetric, uri, request); + } + } + @Override public HttpClientMetric requestBegin(String uri, HttpRequest request) { - requestCount.incrementAndGet(); - HttpClientMetric metric = new HttpClientMetric(this, request); - requests.put(request, metric); - return metric; + if (implementInit) { + return null; + } else { + requestCount.incrementAndGet(); + HttpClientMetric metric = new HttpClientMetric(this, request); + requests.put(request, metric); + return metric; + } } @Override @@ -84,13 +110,13 @@ public void requestReset(HttpClientMetric requestMetric) { } requestCount.decrementAndGet(); requestMetric.failed.set(true); - requests.remove(requestMetric.request); + requests.remove(requestMetric.request.get()); } @Override public void responseEnd(HttpClientMetric requestMetric, long bytesRead) { requestMetric.bytesRead.set(bytesRead); requestCount.decrementAndGet(); - requests.remove(requestMetric.request); + requests.remove(requestMetric.request.get()); } } diff --git a/src/test/java/io/vertx/test/fakemetrics/FakeHttpClientMetrics.java b/src/test/java/io/vertx/test/fakemetrics/FakeHttpClientMetrics.java index ae9fbbb9c9b..0245d184053 100644 --- a/src/test/java/io/vertx/test/fakemetrics/FakeHttpClientMetrics.java +++ b/src/test/java/io/vertx/test/fakemetrics/FakeHttpClientMetrics.java @@ -34,11 +34,16 @@ public class FakeHttpClientMetrics extends FakeTCPMetrics implements HttpClientM private final String name; private final ConcurrentMap webSockets = new ConcurrentHashMap<>(); private final ConcurrentMap endpoints = new ConcurrentHashMap<>(); + private volatile boolean implementInit = false; public FakeHttpClientMetrics(String name) { this.name = name; } + public void setImplementInit(boolean implementInit) { + this.implementInit = implementInit; + } + public WebSocketMetric getMetric(WebSocket ws) { return webSockets.get(ws); } @@ -83,7 +88,7 @@ public Integer connectionCount(String name) { @Override public ClientMetrics createEndpointMetrics(SocketAddress remoteAddress, int maxPoolSize) { - EndpointMetric metric = new EndpointMetric() { + EndpointMetric metric = new EndpointMetric(implementInit) { @Override public void close() { endpoints.remove(remoteAddress); diff --git a/src/test/java/io/vertx/test/fakemetrics/HttpClientMetric.java b/src/test/java/io/vertx/test/fakemetrics/HttpClientMetric.java index 75ff5187b90..23efce44f98 100644 --- a/src/test/java/io/vertx/test/fakemetrics/HttpClientMetric.java +++ b/src/test/java/io/vertx/test/fakemetrics/HttpClientMetric.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * @author Julien Viet @@ -23,7 +24,7 @@ public class HttpClientMetric { public final EndpointMetric endpoint; - public final HttpRequest request; + public final AtomicReference request; public final AtomicInteger requestEnded = new AtomicInteger(); public final AtomicInteger responseBegin = new AtomicInteger(); public final AtomicLong bytesRead = new AtomicLong(); @@ -32,6 +33,11 @@ public class HttpClientMetric { public HttpClientMetric(EndpointMetric endpoint, HttpRequest request) { this.endpoint = endpoint; - this.request = request; + this.request = new AtomicReference<>(request); + } + + public HttpClientMetric(EndpointMetric endpoint) { + this.endpoint = endpoint; + this.request = new AtomicReference<>(); } }