Skip to content

Commit caa3962

Browse files
committed
Account for stream in transport connections.
Motivation: With the advent of QUIC, we need to raffine further more the contract of TransportMetrics which has historically used the notion of socket and connection interchangeably. We should clarify these meanings and provide minimal support for QUIC metrics. Changes: In TransportMetrics: - Remove occurences of socket, replaced by connection which is the common concepts defined by TCP and QUIC - Propagate the change above in the hierarchy (which is a mere renaming of type argument / parameter name) - Support basic stream event for QUIC : streamOpended / streamClosed
1 parent b68afb3 commit caa3962

21 files changed

+172
-136
lines changed

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicConnectionHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,17 +160,17 @@ void handleClosed(QuicConnectionCloseEvent event) {
160160
}
161161

162162
@Override
163-
public void bytesRead(Object socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
164-
NetworkMetrics.super.bytesRead(socketMetric, remoteAddress, numberOfBytes);
163+
public void bytesRead(Object connectionMetric, SocketAddress remoteAddress, long numberOfBytes) {
164+
NetworkMetrics.super.bytesRead(connectionMetric, remoteAddress, numberOfBytes);
165165
}
166166

167167
@Override
168-
public void bytesWritten(Object socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
169-
NetworkMetrics.super.bytesWritten(socketMetric, remoteAddress, numberOfBytes);
168+
public void bytesWritten(Object connectionMetric, SocketAddress remoteAddress, long numberOfBytes) {
169+
NetworkMetrics.super.bytesWritten(connectionMetric, remoteAddress, numberOfBytes);
170170
}
171171

172172
@Override
173-
public void exceptionOccurred(Object socketMetric, SocketAddress remoteAddress, Throwable t) {
174-
NetworkMetrics.super.exceptionOccurred(socketMetric, remoteAddress, t);
173+
public void exceptionOccurred(Object connectionMetric, SocketAddress remoteAddress, Throwable err) {
174+
NetworkMetrics.super.exceptionOccurred(connectionMetric, remoteAddress, err);
175175
}
176176
}

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicConnectionImpl.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class QuicConnectionImpl extends ConnectionBase implements QuicConnection
5050

5151
private final ContextInternal context;
5252
private final QuicChannel channel;
53-
private final TransportMetrics<?> metrics;
53+
private final TransportMetrics metrics;
5454
private final long idleTimeout;
5555
private final long readIdleTimeout;
5656
private final long writeIdleTimeout;
@@ -104,15 +104,19 @@ protected void handleGrace(Completable<Void> completion) {
104104
}
105105
};
106106
if (metrics != null) {
107-
this.streamMetrics = new NetworkMetrics<>() {
107+
this.streamMetrics = new TransportMetrics<>() {
108108
@Override
109-
public void bytesRead(Object socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
109+
public void bytesRead(Object connectionMetric, SocketAddress remoteAddress, long numberOfBytes) {
110110
metrics.bytesRead(metric(), remoteAddress, numberOfBytes);
111111
}
112112
@Override
113-
public void bytesWritten(Object socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
113+
public void bytesWritten(Object connectionMetric, SocketAddress remoteAddress, long numberOfBytes) {
114114
metrics.bytesWritten(metric(), remoteAddress, numberOfBytes);
115115
}
116+
@Override
117+
public void disconnected(Object connectionMetric, SocketAddress remoteAddress) {
118+
metrics.streamClosed(metric());
119+
}
116120
};
117121
} else {
118122
this.streamMetrics = null;
@@ -140,6 +144,9 @@ void handleStream(QuicStreamChannel streamChannel) {
140144
}
141145
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, streamContext, streamChannel, streamMetrics, chctx));
142146
handler.addHandler(stream -> {
147+
if (metrics != null) {
148+
metrics.streamOpened(metric());
149+
}
143150
Handler<QuicStream> h = QuicConnectionImpl.this.handler;
144151
if (h != null) {
145152
context.dispatch(stream, h);
@@ -238,6 +245,9 @@ public Future<QuicStream> openStream(ContextInternal context, boolean bidirectio
238245
Promise<QuicStream> promise = context.promise();
239246
VertxHandler<QuicStreamImpl> handler = VertxHandler.create(chctx -> new QuicStreamImpl(this, context, (QuicStreamChannel) chctx.channel(), streamMetrics, chctx));
240247
handler.addHandler(stream -> {
248+
if (metrics != null) {
249+
metrics.streamOpened(metric());
250+
}
241251
promise.tryComplete(stream);
242252
});
243253
QuicStreamType type = bidirectional ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;

vertx-core/src/main/java/io/vertx/core/spi/metrics/HttpClientMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
*
3636
* @author <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>
3737
*/
38-
public interface HttpClientMetrics<R, W, S> extends TransportMetrics<S> {
38+
public interface HttpClientMetrics<R, W, C> extends TransportMetrics<C> {
3939

4040
/**
4141
* Provides metrics for a particular endpoint

vertx-core/src/main/java/io/vertx/core/spi/metrics/HttpServerMetrics.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,17 @@
3535
*
3636
* @author <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>
3737
*/
38-
public interface HttpServerMetrics<R, W, S> extends TransportMetrics<S> {
38+
public interface HttpServerMetrics<R, W, C> extends TransportMetrics<C> {
3939

4040
/**
4141
* Called when an http server request begins. Vert.x will invoke {@link #responseEnd} when the response has ended
4242
* or {@link #requestReset} if the request/response has failed before.
4343
*
44-
* @param socketMetric the socket metric
45-
* @param request the http server reuqest
44+
* @param connectionMetric the connection metric
45+
* @param request the http server request
4646
* @return the request metric
4747
*/
48-
default R requestBegin(S socketMetric, HttpRequest request) {
48+
default R requestBegin(C connectionMetric, HttpRequest request) {
4949
return null;
5050
}
5151

@@ -79,12 +79,12 @@ default void responseBegin(R requestMetric, HttpResponse response) {
7979
/**
8080
* Called when an http server response is pushed.
8181
*
82-
* @param socketMetric the socket metric
82+
* @param connectionMetric the connection metric
8383
* @param method the pushed response method
8484
* @param uri the pushed response uri
8585
* @param response the http server response @return the request metric
8686
*/
87-
default R responsePushed(S socketMetric, HttpMethod method, String uri, HttpResponse response) {
87+
default R responsePushed(C connectionMetric, HttpMethod method, String uri, HttpResponse response) {
8888
return null;
8989
}
9090

@@ -100,21 +100,21 @@ default void responseEnd(R requestMetric, HttpResponse response, long bytesWritt
100100
/**
101101
* Called when a server web socket connects.
102102
*
103-
* @param socketMetric the socket metric
103+
* @param connectionMetric the socket metric
104104
* @param requestMetric the request metric
105-
* @param serverWebSocket the server web socket
105+
* @param webSocket the server web socket
106106
* @return the server web socket metric
107107
*/
108-
default W connected(S socketMetric, R requestMetric, ServerWebSocket serverWebSocket) {
108+
default W connected(C connectionMetric, R requestMetric, ServerWebSocket webSocket) {
109109
return null;
110110
}
111111

112112
/**
113113
* Called when the server web socket has disconnected.
114114
*
115-
* @param serverWebSocketMetric the server web socket metric
115+
* @param webSocketMetric the server web socket metric
116116
*/
117-
default void disconnected(W serverWebSocketMetric) {
117+
default void disconnected(W webSocketMetric) {
118118
}
119119

120120
/**

vertx-core/src/main/java/io/vertx/core/spi/metrics/NetworkMetrics.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,39 @@
1515

1616
/**
1717
* An SPI used internally by Vert.x to gather metrics on a net socket which serves
18-
* as a base class for TCP or UDP.<p/>
18+
* as a base class for TCP, UDP or QUIC.<p/>
1919
*
2020
* @author <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>
2121
*/
22-
public interface NetworkMetrics<S> extends Metrics {
22+
public interface NetworkMetrics<C> extends Metrics {
2323

2424
/**
2525
* Called when bytes have been read
2626
*
27-
* @param socketMetric the socket metric, null for UDP
27+
* @param connectionMetric the connection metric, {@code null} for UDP
2828
* @param remoteAddress the remote address which this socket received bytes from
2929
* @param numberOfBytes the number of bytes read
3030
*/
31-
default void bytesRead(S socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
31+
default void bytesRead(C connectionMetric, SocketAddress remoteAddress, long numberOfBytes) {
3232
}
3333

3434
/**
3535
* Called when bytes have been written
3636
*
37-
* @param socketMetric the socket metric, null for UDP
37+
* @param connectionMetric the connection metric, {@code null} for UDP
3838
* @param remoteAddress the remote address which bytes are being written to
3939
* @param numberOfBytes the number of bytes written
4040
*/
41-
default void bytesWritten(S socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
41+
default void bytesWritten(C connectionMetric, SocketAddress remoteAddress, long numberOfBytes) {
4242
}
4343

4444
/**
4545
* Called when exceptions occur for a specific connection.
4646
*
47-
* @param socketMetric the socket metric, null for UDP
48-
* @param remoteAddress the remote address of the connection or null if it's datagram/udp
49-
* @param t the exception that occurred
47+
* @param connectionMetric the connection metric, {@code null} for UDP
48+
* @param remoteAddress the remote address of the connection or {@code null} if it's datagram/udp
49+
* @param err the exception that occurred
5050
*/
51-
default void exceptionOccurred(S socketMetric, SocketAddress remoteAddress, Throwable t) {
51+
default void exceptionOccurred(C connectionMetric, SocketAddress remoteAddress, Throwable err) {
5252
}
5353
}

vertx-core/src/main/java/io/vertx/core/spi/metrics/TCPMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@
1818
* @deprecated instead use {@link TransportMetrics}
1919
*/
2020
@Deprecated(forRemoval = true)
21-
public interface TCPMetrics<S> extends TransportMetrics<S> {
21+
public interface TCPMetrics<C> extends TransportMetrics<C> {
2222
}

vertx-core/src/main/java/io/vertx/core/spi/metrics/TransportMetrics.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
*
2222
* @author <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>
2323
*/
24-
public interface TransportMetrics<S> extends NetworkMetrics<S> {
24+
public interface TransportMetrics<C> extends NetworkMetrics<C> {
2525

2626
/**
2727
* Called when a client has connected, which is applicable for connections.<p/>
@@ -31,18 +31,32 @@ public interface TransportMetrics<S> extends NetworkMetrics<S> {
3131
*
3232
* @param remoteAddress the remote address of the client
3333
* @param remoteName the remote name of the client
34-
* @return the socket metric
34+
* @return the connection metric
3535
*/
36-
default S connected(SocketAddress remoteAddress, String remoteName) {
36+
default C connected(SocketAddress remoteAddress, String remoteName) {
3737
return null;
3838
}
3939

4040
/**
4141
* Called when a client has disconnected, which is applicable for connections.
4242
*
43-
* @param socketMetric the socket metric
43+
* @param connectionMetric the connection metric
4444
* @param remoteAddress the remote address of the client
4545
*/
46-
default void disconnected(S socketMetric, SocketAddress remoteAddress) {
46+
default void disconnected(C connectionMetric, SocketAddress remoteAddress) {
47+
}
48+
49+
/**
50+
* Called when a connection has opened a stream, only applicable for Quic connections
51+
* @param connectionMetric the connection metric
52+
*/
53+
default void streamOpened(C connectionMetric) {
54+
}
55+
56+
/**
57+
* Called when a connection has closed a stream, only applicable for Quic connections
58+
* @param connectionMetric the connection metric
59+
*/
60+
default void streamClosed(C connectionMetric) {
4761
}
4862
}

vertx-core/src/test/java/io/vertx/test/fakemetrics/SocketMetric.java renamed to vertx-core/src/test/java/io/vertx/test/fakemetrics/ConnectionMetric.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
/**
2323
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
2424
*/
25-
public class SocketMetric {
25+
public class ConnectionMetric {
2626

2727
public final SocketAddress remoteAddress;
2828
public final String remoteName;
@@ -31,8 +31,9 @@ public class SocketMetric {
3131
public final List<Long> bytesReadEvents = Collections.synchronizedList(new ArrayList<>());
3232
public final AtomicLong bytesWritten = new AtomicLong();
3333
public final List<Long> bytesWrittenEvents = Collections.synchronizedList(new ArrayList<>());
34+
public final AtomicLong openStreams = new AtomicLong();
3435

35-
public SocketMetric(SocketAddress remoteAddress, String remoteName) {
36+
public ConnectionMetric(SocketAddress remoteAddress, String remoteName) {
3637
this.remoteAddress = remoteAddress;
3738
this.remoteName = remoteName;
3839
}

vertx-core/src/test/java/io/vertx/test/fakemetrics/FakeDatagramSocketMetrics.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ public void listening(String localName, SocketAddress localAddress) {
5151
}
5252

5353
@Override
54-
public void bytesRead(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
54+
public void bytesRead(Void connectionMetric, SocketAddress remoteAddress, long numberOfBytes) {
5555
reads.add(new PacketMetric(remoteAddress, numberOfBytes));
5656
}
5757

5858
@Override
59-
public void bytesWritten(Void socketMetric, SocketAddress remoteAddress,long numberOfBytes) {
59+
public void bytesWritten(Void connectionMetric, SocketAddress remoteAddress, long numberOfBytes) {
6060
writes.add(new PacketMetric(remoteAddress, numberOfBytes));
6161
}
6262
}

vertx-core/src/test/java/io/vertx/test/fakemetrics/FakeHttpClientMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
/**
3030
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
3131
*/
32-
public class FakeHttpClientMetrics extends FakeTCPMetrics implements HttpClientMetrics<HttpClientMetric, WebSocketMetric, SocketMetric> {
32+
public class FakeHttpClientMetrics extends FakeTCPMetrics implements HttpClientMetrics<HttpClientMetric, WebSocketMetric, ConnectionMetric> {
3333

3434
private final ConcurrentMap<WebSocketBase, WebSocketMetric> webSockets = new ConcurrentHashMap<>();
3535
private final ConcurrentMap<SocketAddress, EndpointMetric> endpoints = new ConcurrentHashMap<>();

0 commit comments

Comments
 (0)