Add WebSocket client metrics#4118
Conversation
d16260f to
49cdeda
Compare
|
@LivingLikeKrillin On most classes you have |
Add WEBSOCKET_CLIENT_PREFIX, HANDSHAKE_TIME, and CONNECTION_DURATION constants, meter/observation documentation enums, and recorder interfaces with Micrometer-based implementation. Related to reactor#3820 Signed-off-by: LivingLikeKrillin <143606756+LivingLikeKrillin@users.noreply.github.com>
Replace the HTTP metrics handler with a WebSocket-aware handler on upgrade. Connection duration is recorded when the handler is removed from the pipeline. Control frames (Close, Ping, Pong) are excluded from data metrics. Related to reactor#3820 Signed-off-by: LivingLikeKrillin <143606756+LivingLikeKrillin@users.noreply.github.com>
Test handshake time, data received/sent, connection duration, handshake failure, and protocol combinations (HTTP/1.1 and H2). Related to reactor#3820 Signed-off-by: LivingLikeKrillin <143606756+LivingLikeKrillin@users.noreply.github.com>
49cdeda to
dcbecaa
Compare
Signed-off-by: LivingLikeKrillin <143606756+LivingLikeKrillin@users.noreply.github.com>
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
- Compute path and contextView eagerly in swapMetricsHandler() instead of lazy initialization via initMetrics() - Change extractProcessedDataFromBuffer parameter type to WebSocketFrame - Use GET for HTTP/1.1 and CONNECT for HTTP/2 via wsHttpMethod() - Restore missing @author raccoonback in WebsocketClientOperations
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
...etty-http/src/main/java/reactor/netty/http/client/AbstractWebSocketClientMetricsHandler.java
Outdated
Show resolved
Hide resolved
reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java
Outdated
Show resolved
Hide resolved
reactor-netty-http/src/main/java/reactor/netty/http/client/WebsocketClientOperations.java
Outdated
Show resolved
Hide resolved
- Remove @nullable from path and contextView fields and parameters, since they are eagerly initialized in swapMetricsHandler() - Remove dead code: null guards, ternary fallbacks, and else branches in ContextAwareWebSocketClientMetricsHandler that are unreachable after the @nullable removal - Remove unused copy constructors from all handler classes - Move resolvePath() from AbstractWebSocketClientMetricsHandler to WebsocketClientOperations where it is actually called - Remove unused URI parameter from swapMetricsHandler() - Remove unused status parameter from recordHandshakeFailure() and hardcode "ERROR" since no other value is used - Rename metric namespace from reactor.netty.http.client.websocket to reactor.netty.websocket.client
Accumulate data across fragmented WebSocket frames and record metrics only when isFinalFragment() is true, instead of recording per individual frame. - Set dataSentTime/dataReceivedTime only on the first fragment - Accumulate dataSent/dataReceived across continuation frames - Record and reset counters on final fragment - Use synchronous recording in write() since reactor-netty's SendManyInner does not support ChannelPromise.addListener()
- testWebSocketFragmentedDataSentMetrics: verify that sending 3 fragments records DATA_SENT once with accumulated total - testWebSocketFragmentedDataReceivedMetrics: verify that receiving 2 fragments records DATA_RECEIVED once with accumulated total - Add /ws-fragment server route that sends fragmented frames
7161609 to
10db602
Compare
|
|
||
| protected abstract WebSocketClientMetricsRecorder recorder(); | ||
|
|
||
| protected void recordConnectionClosed(ChannelHandlerContext ctx) { |
Check notice
Code scanning / CodeQL
Useless parameter Note
| } | ||
| } | ||
|
|
||
| protected void recordException(ChannelHandlerContext ctx) { |
Check notice
Code scanning / CodeQL
Useless parameter Note
| } | ||
| } | ||
|
|
||
| protected void recordRead(io.netty.channel.Channel channel, SocketAddress address) { |
Check notice
Code scanning / CodeQL
Useless parameter Note
...http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsRecorder.java
Dismissed
Show dismissed
Hide dismissed
...http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsRecorder.java
Dismissed
Show dismissed
Hide dismissed
...http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsRecorder.java
Dismissed
Show dismissed
Hide dismissed
...http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsRecorder.java
Dismissed
Show dismissed
Hide dismissed
...http/src/main/java/reactor/netty/http/client/ContextAwareWebSocketClientMetricsRecorder.java
Dismissed
Show dismissed
Hide dismissed
reactor-netty-http/src/test/java/reactor/netty/http/WebSocketClientMetricsHandlerTests.java
Dismissed
Show dismissed
Hide dismissed
reactor-netty-http/src/test/java/reactor/netty/http/WebSocketClientMetricsHandlerTests.java
Dismissed
Show dismissed
Hide dismissed
violetagg
left a comment
There was a problem hiding this comment.
I have proposal for moving handshake timer to the abstract class so that it is available for all handlers. What do you think?
| String path; | ||
|
|
||
| ContextView contextView; |
There was a problem hiding this comment.
| String path; | |
| ContextView contextView; | |
| final String path; | |
| final ContextView contextView; |
| long dataSentTime; | ||
|
|
||
| long connectionStartTime; | ||
|
|
There was a problem hiding this comment.
What do you think about moving the handshake timer so that it is available for all variants?
| long handshakeStartTime; | |
| super.handlerAdded(ctx); | ||
| connectionStartTime = System.nanoTime(); | ||
| } | ||
|
|
There was a problem hiding this comment.
What do you think about moving the handshake timer so that it is available for all variants?
| void startHandshake(Channel channel) { | |
| handshakeStartTime = System.nanoTime(); | |
| } | |
| void recordHandshakeComplete(Channel channel, String status) { | |
| Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime); | |
| if (proxyAddress == null) { | |
| recorder().recordWebSocketHandshakeTime(remoteAddress, path, status, time); | |
| } | |
| else { | |
| recorder().recordWebSocketHandshakeTime(remoteAddress, proxyAddress, path, status, time); | |
| } | |
| } | |
| void recordHandshakeFailure(Channel channel) { | |
| Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime); | |
| if (proxyAddress == null) { | |
| recorder().recordWebSocketHandshakeTime(remoteAddress, path, "ERROR", time); | |
| } | |
| else { | |
| recorder().recordWebSocketHandshakeTime(remoteAddress, proxyAddress, path, "ERROR", time); | |
| } | |
| } | |
| try { | ||
| if (msg instanceof WebSocketFrame) { | ||
| WebSocketFrame frame = (WebSocketFrame) msg; | ||
| if (isDataFrame(frame)) { | ||
| if (dataSentTime == 0) { | ||
| dataSentTime = System.nanoTime(); | ||
| } | ||
| dataSent += extractProcessedDataFromBuffer(frame); | ||
|
|
||
| if (frame.isFinalFragment()) { | ||
| recordWrite(remoteAddress); | ||
| dataSentTime = 0; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| catch (RuntimeException e) { | ||
| if (log.isWarnEnabled()) { | ||
| log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e); | ||
| } | ||
| } |
There was a problem hiding this comment.
What do you think if we do the recording once we know that the write was successful?
| try { | |
| if (msg instanceof WebSocketFrame) { | |
| WebSocketFrame frame = (WebSocketFrame) msg; | |
| if (isDataFrame(frame)) { | |
| if (dataSentTime == 0) { | |
| dataSentTime = System.nanoTime(); | |
| } | |
| dataSent += extractProcessedDataFromBuffer(frame); | |
| if (frame.isFinalFragment()) { | |
| recordWrite(remoteAddress); | |
| dataSentTime = 0; | |
| } | |
| } | |
| } | |
| } | |
| catch (RuntimeException e) { | |
| if (log.isWarnEnabled()) { | |
| log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e); | |
| } | |
| } | |
| if (msg instanceof WebSocketFrame) { | |
| WebSocketFrame frame = (WebSocketFrame) msg; | |
| if (isDataFrame(frame)) { | |
| if (dataSentTime == 0) { | |
| dataSentTime = System.nanoTime(); | |
| } | |
| dataSent += extractProcessedDataFromBuffer(frame); | |
| if (frame.isFinalFragment()) { | |
| promise.addListener(f -> { | |
| try { | |
| recordWrite(remoteAddress); | |
| dataSentTime = 0; | |
| } | |
| catch (RuntimeException e) { | |
| if (log.isWarnEnabled()) { | |
| log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e); | |
| } | |
| } | |
| }); | |
| } | |
| } | |
| } |
| protected ContextAwareWebSocketClientMetricsRecorder recorder() { | ||
| return recorder; | ||
| } | ||
|
|
There was a problem hiding this comment.
Bring here an implementation for handshake
| @Override | |
| void recordHandshakeComplete(io.netty.channel.Channel channel, String status) { | |
| Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime); | |
| if (proxyAddress == null) { | |
| recorder.recordWebSocketHandshakeTime(contextView, remoteAddress, path, status, time); | |
| } | |
| else { | |
| recorder.recordWebSocketHandshakeTime(contextView, remoteAddress, proxyAddress, path, status, time); | |
| } | |
| } | |
| @Override | |
| void recordHandshakeFailure(io.netty.channel.Channel channel) { | |
| Duration time = Duration.ofNanos(System.nanoTime() - handshakeStartTime); | |
| if (proxyAddress == null) { | |
| recorder.recordWebSocketHandshakeTime(contextView, remoteAddress, path, "ERROR", time); | |
| } | |
| else { | |
| recorder.recordWebSocketHandshakeTime(contextView, remoteAddress, proxyAddress, path, "ERROR", time); | |
| } | |
| } | |
| final Sinks.One<WebSocketCloseStatus> onCloseState; | ||
| final boolean proxyPing; | ||
|
|
||
| @Nullable MicrometerWebSocketClientMetricsHandler micrometerWsHandler; |
There was a problem hiding this comment.
We have handshake timer for all
| @Nullable MicrometerWebSocketClientMetricsHandler micrometerWsHandler; | |
| @Nullable AbstractWebSocketClientMetricsHandler micrometerWsHandler; |
| MicrometerWebSocketClientMetricsHandler micrometerHandler = new MicrometerWebSocketClientMetricsHandler( | ||
| MicrometerWebSocketClientMetricsRecorder.INSTANCE, | ||
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); | ||
| micrometerHandler.startHandshake(channel); | ||
| this.micrometerWsHandler = micrometerHandler; | ||
| wsHandler = micrometerHandler; |
There was a problem hiding this comment.
Let's here only construct the new handler
| MicrometerWebSocketClientMetricsHandler micrometerHandler = new MicrometerWebSocketClientMetricsHandler( | |
| MicrometerWebSocketClientMetricsRecorder.INSTANCE, | |
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); | |
| micrometerHandler.startHandshake(channel); | |
| this.micrometerWsHandler = micrometerHandler; | |
| wsHandler = micrometerHandler; | |
| wsHandler = new MicrometerWebSocketClientMetricsHandler( | |
| MicrometerWebSocketClientMetricsRecorder.INSTANCE, | |
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); |
| wsHandler = new ContextAwareWebSocketClientMetricsHandler( | ||
| new DefaultContextAwareWebSocketClientMetricsRecorder(ctxHandler.recorder), | ||
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); |
There was a problem hiding this comment.
Wrap the recorder only if it is not ContextAwareWebSocketClientMetricsRecorder
| wsHandler = new ContextAwareWebSocketClientMetricsHandler( | |
| new DefaultContextAwareWebSocketClientMetricsRecorder(ctxHandler.recorder), | |
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); | |
| ContextAwareWebSocketClientMetricsRecorder wsRecorder; | |
| if (ctxHandler.recorder instanceof ContextAwareWebSocketClientMetricsRecorder) { | |
| wsRecorder = (ContextAwareWebSocketClientMetricsRecorder) ctxHandler.recorder; | |
| } | |
| else { | |
| wsRecorder = new DefaultContextAwareWebSocketClientMetricsRecorder(ctxHandler.recorder); | |
| } | |
| wsHandler = new ContextAwareWebSocketClientMetricsHandler( | |
| wsRecorder, | |
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); |
| else { | ||
| wsHandler = new WebSocketClientMetricsHandler( | ||
| MicrometerWebSocketClientMetricsRecorder.INSTANCE, | ||
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); | ||
| } |
There was a problem hiding this comment.
We will need a wrapper DefaultWebSocketClientMetricsRecorder similar to what we have to context aware. What do you think?
| else { | |
| wsHandler = new WebSocketClientMetricsHandler( | |
| MicrometerWebSocketClientMetricsRecorder.INSTANCE, | |
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); | |
| } | |
| else if (httpHandler instanceof HttpClientMetricsHandler) { | |
| HttpClientMetricsHandler plainHandler = (HttpClientMetricsHandler) httpHandler; | |
| WebSocketClientMetricsRecorder wsRecorder; | |
| if (plainHandler.recorder instanceof WebSocketClientMetricsRecorder) { | |
| wsRecorder = (WebSocketClientMetricsRecorder) plainHandler.recorder; | |
| } | |
| else { | |
| wsRecorder = new DefaultWebSocketClientMetricsRecorder(plainHandler.recorder); | |
| } | |
| wsHandler = new WebSocketClientMetricsHandler( | |
| wsRecorder, | |
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); | |
| } | |
| else { | |
| return; | |
| } |
| httpHandler.remoteAddress, httpHandler.proxyAddress, resolvedPath, ctxView, httpMethod); | ||
| } | ||
|
|
||
| channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.WsMetricsHandler, wsHandler); |
There was a problem hiding this comment.
| channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.WsMetricsHandler, wsHandler); | |
| wsHandler.startHandshake(channel); | |
| this.micrometerWsHandler = wsHandler; | |
| channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.WsMetricsHandler, wsHandler); |
|
|
||
| protected abstract WebSocketClientMetricsRecorder recorder(); | ||
|
|
||
| protected void recordConnectionClosed(ChannelHandlerContext ctx) { |
| } | ||
| } | ||
|
|
||
| protected void recordException(ChannelHandlerContext ctx) { |
| } | ||
| } | ||
|
|
||
| protected void recordRead(io.netty.channel.Channel channel, SocketAddress address) { |
There was a problem hiding this comment.
Do we need the method param channel?
|
Please also ensure you have |
Summary
Add metrics support for WebSocket clients, including handshake time,
data received/sent, connection duration, and error counting.
with Micrometer-based implementation
upgrade and track connection duration via handler lifecycle
Test plan
Related to #3820