Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,13 @@ private void beginRequest(Stream stream, HttpRequestHead request, boolean chunke
synchronized (this) {
responses.add(stream);
this.isConnect = connect;
if (this.metrics != null) {
stream.metric = this.metrics.requestBegin(request.uri, request);
if (metrics != null) {
Object metric = stream.metric;
if (metric != null) {
metrics.requestBegin(metric, request.uri, request);
} else {
stream.metric = metrics.requestBegin(request.uri, request);
}
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
Expand Down Expand Up @@ -364,10 +369,11 @@ private abstract static class Stream {
private long bytesWritten;
private long readWindow;

Stream(Http1xClientConnection conn, ContextInternal context, int id) {
Stream(Http1xClientConnection conn, ContextInternal context, Object metric, int id) {
this.conn = conn;
this.context = context;
this.id = id;
this.metric = metric;
this.promise = context.promise();
}

Expand Down Expand Up @@ -430,8 +436,8 @@ private static class StreamImpl extends Stream implements HttpClientStream {
private Handler<Throwable> exceptionHandler;
private Handler<Void> closeHandler;

StreamImpl(ContextInternal context, Http1xClientConnection conn, int id) {
super(conn, context, id);
StreamImpl(ContextInternal context, Http1xClientConnection conn, Object metric, int id) {
super(conn, context, metric, id);

this.queue = new InboundBuffer<>(context, 5)
.handler(item -> {
Expand Down Expand Up @@ -1301,7 +1307,13 @@ public void createStream(ContextInternal context, Handler<AsyncResult<HttpClient
if (closed) {
stream = null;
} else {
stream = new StreamImpl(context, this, seq++);
Object metric;
if (metrics != null) {
metric = metrics.init();
} else {
metric = null;
}
stream = new StreamImpl(context, this, metric, seq++);
requests.add(stream);
if (requests.size() == 1) {
stream.promise.complete(stream);
Expand Down
24 changes: 18 additions & 6 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,13 @@ public Future<HttpClientRequest> createRequest(ContextInternal context) {
}

private StreamImpl createStream(ContextInternal context) {
return new StreamImpl(this, context, false);
Object metric;
if (metrics != null) {
metric = metrics.init();
} else {
metric = null;
}
return new StreamImpl(this, context, metric, false);
}

private void recycle() {
Expand Down Expand Up @@ -214,7 +220,7 @@ public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int stream
Handler<HttpClientPush> pushHandler = stream.pushHandler;
if (pushHandler != null) {
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
StreamImpl pushStream = new StreamImpl(this, context, true);
StreamImpl pushStream = new StreamImpl(this, context, null, true);
pushStream.init(promisedStream);
HttpClientPush push = new HttpClientPush(headers, pushStream);
if (metrics != null) {
Expand Down Expand Up @@ -254,10 +260,11 @@ static abstract class Stream extends VertxHttp2Stream<Http2ClientConnection> {
protected long writeWindow;
protected final long windowSize;

Stream(Http2ClientConnection conn, ContextInternal context, boolean push) {
Stream(Http2ClientConnection conn, ContextInternal context, Object metric, boolean push) {
super(conn, context);

this.push = push;
this.metric = metric;
this.windowSize = conn.getWindowSize();
}

Expand Down Expand Up @@ -411,8 +418,8 @@ void onClose() {

static class StreamImpl extends Stream implements HttpClientStream {

StreamImpl(Http2ClientConnection conn, ContextInternal context, boolean push) {
super(conn, context, push);
StreamImpl(Http2ClientConnection conn, ContextInternal context, Object metric, boolean push) {
super(conn, context, metric, push);
}

@Override
Expand Down Expand Up @@ -636,7 +643,12 @@ private Http2Exception createStream(HttpRequestHead head, Http2Headers headers)
}
init(stream);
if (conn.metrics != null) {
metric = conn.metrics.requestBegin(headers.path().toString(), head);
Object m = metric;
if (m != null) {
conn.metrics.requestBegin(m, headers.path().toString(), head);
} else {
metric = conn.metrics.requestBegin(headers.path().toString(), head);
}
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public abstract class HttpClientRequestBase implements HttpClientRequest {
public abstract class HttpClientRequestBase implements HttpClientRequestInternal {

protected final ContextInternal context;
protected final HttpClientStream stream;
Expand Down Expand Up @@ -76,6 +76,10 @@ protected String authority() {
}
}

public Object metric() {
return stream.metric();
}

@Override
public int streamId() {
return stream.id();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2011-2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.http.impl;

import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.spi.metrics.ClientMetrics;

/**
* Extends to expose internal methods that are necessary for integration.
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public interface HttpClientRequestInternal extends HttpClientRequest {

/**
* @return the request metric obtained from {@link ClientMetrics} for this request
*/
Object metric();

}
25 changes: 25 additions & 0 deletions src/main/java/io/vertx/core/spi/metrics/ClientMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,31 @@ default T enqueueRequest() {
default void dequeueRequest(T taskMetric) {
}

/**
* Create a request metric instance, when the implementation
*
* <ul>
* <li>returns {@code null}, {@link #requestBegin(String, Object)} is called (backward compatibility mode)</li>
* <li>returns a non-null value, {@link #requestBegin(Object, String, Object)} is called with the returned request
* metric instance</li>
* </ul>
*
* @return a newly created request metric
*/
default M init() {
return null;
}

/**
* Called when a client request begins and {@link #init()} has returned a non-null value.
*
* @param requestMetric the request metric
* @param uri an arbitrary uri
* @param request the request object
*/
default void requestBegin(M requestMetric, String uri, Req request) {
}

/**
* Called when a client request begins. Vert.x will invoke {@link #requestEnd} when the request
* has ended or {@link #requestReset} if the request/response has failed before.
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/io/vertx/core/http/HttpMetricsTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.HttpClientImpl;
import io.vertx.core.http.impl.HttpClientRequestInternal;
import io.vertx.core.http.impl.HttpServerRequestInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.metrics.MetricsOptions;
Expand Down Expand Up @@ -191,6 +192,15 @@ public void testHttpMetricsLifecycle() throws Exception {

@Test
public void testHttpClientLifecycle() throws Exception {
testHttpClientLifecycle(false);
}

@Test
public void testHttpClientLifecycleWithInit() throws Exception {
testHttpClientLifecycle(true);
}

public void testHttpClientLifecycle(boolean implementInit) throws Exception {

// The test cannot pass for HTTP/2 upgrade for now
HttpClientOptions opts = createBaseClientOptions();
Expand Down Expand Up @@ -228,13 +238,17 @@ public void testHttpClientLifecycle() throws Exception {
});
startServer(testAddress);
FakeHttpClientMetrics clientMetrics = FakeMetricsBase.getMetrics(client);
clientMetrics.setImplementInit(implementInit);
CountDownLatch responseBeginLatch = new CountDownLatch(1);
CountDownLatch responseEndLatch = new CountDownLatch(1);
Future<HttpClientRequest> request = client.request(new RequestOptions()
.setMethod(HttpMethod.POST)
.setPort(HttpTestBase.DEFAULT_HTTP_PORT)
.setHost("localhost")
.setURI("/somepath")).onComplete(onSuccess(req -> {
if (implementInit) {
assertNotNull(((HttpClientRequestInternal)req).metric());
}
req
.response(onSuccess(resp -> {
responseBeginLatch.countDown();
Expand Down
40 changes: 33 additions & 7 deletions src/test/java/io/vertx/test/fakemetrics/EndpointMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ public class EndpointMetric implements ClientMetrics<HttpClientMetric, Void, Htt
public final AtomicInteger connectionCount = new AtomicInteger();
public final AtomicInteger requestCount = new AtomicInteger();
public final ConcurrentMap<HttpRequest, HttpClientMetric> requests = new ConcurrentHashMap<>();
private final boolean implementInit;

public EndpointMetric() {
public EndpointMetric(boolean implementInit) {
this.implementInit = implementInit;
}

@Override
Expand All @@ -45,12 +47,36 @@ public void dequeueRequest(Void taskMetric) {
queueSize.decrementAndGet();
}

@Override
public HttpClientMetric init() {
if (implementInit) {
return new HttpClientMetric(this);
} else {
return ClientMetrics.super.init();
}
}

@Override
public void requestBegin(HttpClientMetric requestMetric, String uri, HttpRequest request) {
if (implementInit) {
requestCount.incrementAndGet();
requestMetric.request.set(request);
requests.put(request, requestMetric);
} else {
ClientMetrics.super.requestBegin(requestMetric, uri, request);
}
}

@Override
public HttpClientMetric requestBegin(String uri, HttpRequest request) {
requestCount.incrementAndGet();
HttpClientMetric metric = new HttpClientMetric(this, request);
requests.put(request, metric);
return metric;
if (implementInit) {
return null;
} else {
requestCount.incrementAndGet();
HttpClientMetric metric = new HttpClientMetric(this, request);
requests.put(request, metric);
return metric;
}
}

@Override
Expand Down Expand Up @@ -84,13 +110,13 @@ public void requestReset(HttpClientMetric requestMetric) {
}
requestCount.decrementAndGet();
requestMetric.failed.set(true);
requests.remove(requestMetric.request);
requests.remove(requestMetric.request.get());
}

@Override
public void responseEnd(HttpClientMetric requestMetric, long bytesRead) {
requestMetric.bytesRead.set(bytesRead);
requestCount.decrementAndGet();
requests.remove(requestMetric.request);
requests.remove(requestMetric.request.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ public class FakeHttpClientMetrics extends FakeTCPMetrics implements HttpClientM
private final String name;
private final ConcurrentMap<WebSocketBase, WebSocketMetric> webSockets = new ConcurrentHashMap<>();
private final ConcurrentMap<SocketAddress, EndpointMetric> endpoints = new ConcurrentHashMap<>();
private volatile boolean implementInit = false;

public FakeHttpClientMetrics(String name) {
this.name = name;
}

public void setImplementInit(boolean implementInit) {
this.implementInit = implementInit;
}

public WebSocketMetric getMetric(WebSocket ws) {
return webSockets.get(ws);
}
Expand Down Expand Up @@ -83,7 +88,7 @@ public Integer connectionCount(String name) {

@Override
public ClientMetrics<HttpClientMetric, Void, HttpRequest, HttpResponse> createEndpointMetrics(SocketAddress remoteAddress, int maxPoolSize) {
EndpointMetric metric = new EndpointMetric() {
EndpointMetric metric = new EndpointMetric(implementInit) {
@Override
public void close() {
endpoints.remove(remoteAddress);
Expand Down
10 changes: 8 additions & 2 deletions src/test/java/io/vertx/test/fakemetrics/HttpClientMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class HttpClientMetric {

public final EndpointMetric endpoint;
public final HttpRequest request;
public final AtomicReference<HttpRequest> request;
public final AtomicInteger requestEnded = new AtomicInteger();
public final AtomicInteger responseBegin = new AtomicInteger();
public final AtomicLong bytesRead = new AtomicLong();
Expand All @@ -32,6 +33,11 @@ public class HttpClientMetric {

public HttpClientMetric(EndpointMetric endpoint, HttpRequest request) {
this.endpoint = endpoint;
this.request = request;
this.request = new AtomicReference<>(request);
}

public HttpClientMetric(EndpointMetric endpoint) {
this.endpoint = endpoint;
this.request = new AtomicReference<>();
}
}
Loading