Skip to content

Commit 5402370

Browse files
committed
Extend ClientMetrics SPI to introduce a new init method that is called by the HTTP client before the request is actually sent.
Motivation: The ClientMetrics SPI design creates a request metric when the request headers are written, we should actually relax this constraint and let client obtain a metric from the SPI before headers are written, in order to give the oppportunity to customize the request metric object before headers are written. Changes: Extend ClientMetrics SPI to add a new init method that produces a metrics instance. When the implementation returns null, the client behaves with a compatibility mode and calls ClientMetrics#requestBegin to signal the beginning of the request and obtain the request metric. When the implementation returns non-null, the client instead calls the ClientMetrics#requestBegin overload that accepts the request metric instance that should be handed back to the SPI.
1 parent 8e64ac9 commit 5402370

File tree

7 files changed

+118
-22
lines changed

7 files changed

+118
-22
lines changed

src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,13 @@ private void beginRequest(Stream stream, HttpRequestHead request, boolean chunke
238238
synchronized (this) {
239239
responses.add(stream);
240240
this.isConnect = connect;
241-
if (this.metrics != null) {
242-
stream.metric = this.metrics.requestBegin(request.uri, request);
241+
if (metrics != null) {
242+
Object metric = stream.metric;
243+
if (metric != null) {
244+
metrics.requestBegin(metric, request.uri, request);
245+
} else {
246+
stream.metric = metrics.requestBegin(request.uri, request);
247+
}
243248
}
244249
VertxTracer tracer = context.tracer();
245250
if (tracer != null) {
@@ -364,10 +369,11 @@ private abstract static class Stream {
364369
private long bytesWritten;
365370
private long readWindow;
366371

367-
Stream(Http1xClientConnection conn, ContextInternal context, int id) {
372+
Stream(Http1xClientConnection conn, ContextInternal context, Object metric, int id) {
368373
this.conn = conn;
369374
this.context = context;
370375
this.id = id;
376+
this.metric = metric;
371377
this.promise = context.promise();
372378
}
373379

@@ -430,8 +436,8 @@ private static class StreamImpl extends Stream implements HttpClientStream {
430436
private Handler<Throwable> exceptionHandler;
431437
private Handler<Void> closeHandler;
432438

433-
StreamImpl(ContextInternal context, Http1xClientConnection conn, int id) {
434-
super(conn, context, id);
439+
StreamImpl(ContextInternal context, Http1xClientConnection conn, Object metric, int id) {
440+
super(conn, context, metric, id);
435441

436442
this.queue = new InboundBuffer<>(context, 5)
437443
.handler(item -> {
@@ -1301,7 +1307,13 @@ public void createStream(ContextInternal context, Handler<AsyncResult<HttpClient
13011307
if (closed) {
13021308
stream = null;
13031309
} else {
1304-
stream = new StreamImpl(context, this, seq++);
1310+
Object metric;
1311+
if (metrics != null) {
1312+
metric = metrics.init();
1313+
} else {
1314+
metric = null;
1315+
}
1316+
stream = new StreamImpl(context, this, metric, seq++);
13051317
requests.add(stream);
13061318
if (requests.size() == 1) {
13071319
stream.promise.complete(stream);

src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,13 @@ public Future<HttpClientRequest> createRequest(ContextInternal context) {
172172
}
173173

174174
private StreamImpl createStream(ContextInternal context) {
175-
return new StreamImpl(this, context, false);
175+
Object metric;
176+
if (metrics != null) {
177+
metric = metrics.init();
178+
} else {
179+
metric = null;
180+
}
181+
return new StreamImpl(this, context, metric, false);
176182
}
177183

178184
private void recycle() {
@@ -214,7 +220,7 @@ public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int stream
214220
Handler<HttpClientPush> pushHandler = stream.pushHandler;
215221
if (pushHandler != null) {
216222
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
217-
StreamImpl pushStream = new StreamImpl(this, context, true);
223+
StreamImpl pushStream = new StreamImpl(this, context, null, true);
218224
pushStream.init(promisedStream);
219225
HttpClientPush push = new HttpClientPush(headers, pushStream);
220226
if (metrics != null) {
@@ -254,10 +260,11 @@ static abstract class Stream extends VertxHttp2Stream<Http2ClientConnection> {
254260
protected long writeWindow;
255261
protected final long windowSize;
256262

257-
Stream(Http2ClientConnection conn, ContextInternal context, boolean push) {
263+
Stream(Http2ClientConnection conn, ContextInternal context, Object metric, boolean push) {
258264
super(conn, context);
259265

260266
this.push = push;
267+
this.metric = metric;
261268
this.windowSize = conn.getWindowSize();
262269
}
263270

@@ -411,8 +418,8 @@ void onClose() {
411418

412419
static class StreamImpl extends Stream implements HttpClientStream {
413420

414-
StreamImpl(Http2ClientConnection conn, ContextInternal context, boolean push) {
415-
super(conn, context, push);
421+
StreamImpl(Http2ClientConnection conn, ContextInternal context, Object metric, boolean push) {
422+
super(conn, context, metric, push);
416423
}
417424

418425
@Override
@@ -636,7 +643,12 @@ private Http2Exception createStream(HttpRequestHead head, Http2Headers headers)
636643
}
637644
init(stream);
638645
if (conn.metrics != null) {
639-
metric = conn.metrics.requestBegin(headers.path().toString(), head);
646+
Object m = metric;
647+
if (m != null) {
648+
conn.metrics.requestBegin(m, headers.path().toString(), head);
649+
} else {
650+
metric = conn.metrics.requestBegin(headers.path().toString(), head);
651+
}
640652
}
641653
VertxTracer tracer = context.tracer();
642654
if (tracer != null) {

src/main/java/io/vertx/core/spi/metrics/ClientMetrics.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,31 @@ default T enqueueRequest() {
3131
default void dequeueRequest(T taskMetric) {
3232
}
3333

34+
/**
35+
* Create a request metric instance, when the implementation
36+
*
37+
* <ul>
38+
* <li>returns {@code null}, {@link #requestBegin(String, Object)} is called (backward compatibility mode)</li>
39+
* <li>returns a non-null value, {@link #requestBegin(Object, String, Object)} is called with the returned request
40+
* metric instance</li>
41+
* </ul>
42+
*
43+
* @return a newly created request metric
44+
*/
45+
default M init() {
46+
return null;
47+
}
48+
49+
/**
50+
* Called when a client request begins and {@link #init()} has returned a non-null value.
51+
*
52+
* @param requestMetric the request metric
53+
* @param uri an arbitrary uri
54+
* @param request the request object
55+
*/
56+
default void requestBegin(M requestMetric, String uri, Req request) {
57+
}
58+
3459
/**
3560
* Called when a client request begins. Vert.x will invoke {@link #requestEnd} when the request
3661
* has ended or {@link #requestReset} if the request/response has failed before.

src/test/java/io/vertx/core/http/HttpMetricsTestBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,15 @@ public void testHttpMetricsLifecycle() throws Exception {
191191

192192
@Test
193193
public void testHttpClientLifecycle() throws Exception {
194+
testHttpClientLifecycle(false);
195+
}
196+
197+
@Test
198+
public void testHttpClientLifecycleWithInit() throws Exception {
199+
testHttpClientLifecycle(true);
200+
}
201+
202+
public void testHttpClientLifecycle(boolean implementInit) throws Exception {
194203

195204
// The test cannot pass for HTTP/2 upgrade for now
196205
HttpClientOptions opts = createBaseClientOptions();
@@ -228,6 +237,7 @@ public void testHttpClientLifecycle() throws Exception {
228237
});
229238
startServer(testAddress);
230239
FakeHttpClientMetrics clientMetrics = FakeMetricsBase.getMetrics(client);
240+
clientMetrics.setImplementInit(implementInit);
231241
CountDownLatch responseBeginLatch = new CountDownLatch(1);
232242
CountDownLatch responseEndLatch = new CountDownLatch(1);
233243
Future<HttpClientRequest> request = client.request(new RequestOptions()

src/test/java/io/vertx/test/fakemetrics/EndpointMetric.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ public class EndpointMetric implements ClientMetrics<HttpClientMetric, Void, Htt
3030
public final AtomicInteger connectionCount = new AtomicInteger();
3131
public final AtomicInteger requestCount = new AtomicInteger();
3232
public final ConcurrentMap<HttpRequest, HttpClientMetric> requests = new ConcurrentHashMap<>();
33+
private final boolean implementInit;
3334

34-
public EndpointMetric() {
35+
public EndpointMetric(boolean implementInit) {
36+
this.implementInit = implementInit;
3537
}
3638

3739
@Override
@@ -45,12 +47,36 @@ public void dequeueRequest(Void taskMetric) {
4547
queueSize.decrementAndGet();
4648
}
4749

50+
@Override
51+
public HttpClientMetric init() {
52+
if (implementInit) {
53+
return new HttpClientMetric(this);
54+
} else {
55+
return ClientMetrics.super.init();
56+
}
57+
}
58+
59+
@Override
60+
public void requestBegin(HttpClientMetric requestMetric, String uri, HttpRequest request) {
61+
if (implementInit) {
62+
requestCount.incrementAndGet();
63+
requestMetric.request.set(request);
64+
requests.put(request, requestMetric);
65+
} else {
66+
ClientMetrics.super.requestBegin(requestMetric, uri, request);
67+
}
68+
}
69+
4870
@Override
4971
public HttpClientMetric requestBegin(String uri, HttpRequest request) {
50-
requestCount.incrementAndGet();
51-
HttpClientMetric metric = new HttpClientMetric(this, request);
52-
requests.put(request, metric);
53-
return metric;
72+
if (implementInit) {
73+
return null;
74+
} else {
75+
requestCount.incrementAndGet();
76+
HttpClientMetric metric = new HttpClientMetric(this, request);
77+
requests.put(request, metric);
78+
return metric;
79+
}
5480
}
5581

5682
@Override
@@ -84,13 +110,13 @@ public void requestReset(HttpClientMetric requestMetric) {
84110
}
85111
requestCount.decrementAndGet();
86112
requestMetric.failed.set(true);
87-
requests.remove(requestMetric.request);
113+
requests.remove(requestMetric.request.get());
88114
}
89115

90116
@Override
91117
public void responseEnd(HttpClientMetric requestMetric, long bytesRead) {
92118
requestMetric.bytesRead.set(bytesRead);
93119
requestCount.decrementAndGet();
94-
requests.remove(requestMetric.request);
120+
requests.remove(requestMetric.request.get());
95121
}
96122
}

src/test/java/io/vertx/test/fakemetrics/FakeHttpClientMetrics.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,16 @@ public class FakeHttpClientMetrics extends FakeTCPMetrics implements HttpClientM
3434
private final String name;
3535
private final ConcurrentMap<WebSocketBase, WebSocketMetric> webSockets = new ConcurrentHashMap<>();
3636
private final ConcurrentMap<SocketAddress, EndpointMetric> endpoints = new ConcurrentHashMap<>();
37+
private volatile boolean implementInit = false;
3738

3839
public FakeHttpClientMetrics(String name) {
3940
this.name = name;
4041
}
4142

43+
public void setImplementInit(boolean implementInit) {
44+
this.implementInit = implementInit;
45+
}
46+
4247
public WebSocketMetric getMetric(WebSocket ws) {
4348
return webSockets.get(ws);
4449
}
@@ -83,7 +88,7 @@ public Integer connectionCount(String name) {
8388

8489
@Override
8590
public ClientMetrics<HttpClientMetric, Void, HttpRequest, HttpResponse> createEndpointMetrics(SocketAddress remoteAddress, int maxPoolSize) {
86-
EndpointMetric metric = new EndpointMetric() {
91+
EndpointMetric metric = new EndpointMetric(implementInit) {
8792
@Override
8893
public void close() {
8994
endpoints.remove(remoteAddress);

src/test/java/io/vertx/test/fakemetrics/HttpClientMetric.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
import java.util.concurrent.atomic.AtomicBoolean;
1717
import java.util.concurrent.atomic.AtomicInteger;
1818
import java.util.concurrent.atomic.AtomicLong;
19+
import java.util.concurrent.atomic.AtomicReference;
1920

2021
/**
2122
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
2223
*/
2324
public class HttpClientMetric {
2425

2526
public final EndpointMetric endpoint;
26-
public final HttpRequest request;
27+
public final AtomicReference<HttpRequest> request;
2728
public final AtomicInteger requestEnded = new AtomicInteger();
2829
public final AtomicInteger responseBegin = new AtomicInteger();
2930
public final AtomicLong bytesRead = new AtomicLong();
@@ -32,6 +33,11 @@ public class HttpClientMetric {
3233

3334
public HttpClientMetric(EndpointMetric endpoint, HttpRequest request) {
3435
this.endpoint = endpoint;
35-
this.request = request;
36+
this.request = new AtomicReference<>(request);
37+
}
38+
39+
public HttpClientMetric(EndpointMetric endpoint) {
40+
this.endpoint = endpoint;
41+
this.request = new AtomicReference<>();
3642
}
3743
}

0 commit comments

Comments
 (0)