Skip to content

Commit 61aea11

Browse files
authored
Merge pull request #240 from emeraldpay/feat/additional-ws-metrics
2 parents 9f9edf5 + 7fd81f9 commit 61aea11

File tree

10 files changed

+169
-6
lines changed

10 files changed

+169
-6
lines changed

src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ open class ConfiguredUpstreams(
363363
Tag.of("chain", (Global.chainById(config.blockchain).chainCode))
364364
)
365365
val metrics = RpcMetrics(
366+
metricsTags,
366367
timer = Timer.builder("upstream.rpc.conn")
367368
.description("Request time through a HTTP JSON RPC connection")
368369
.tags(metricsTags)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright (c) 2023 EmeraldPay, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.emeraldpay.dshackle.upstream.ethereum
17+
18+
import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics
19+
import java.time.Duration
20+
import java.time.Instant
21+
import java.util.concurrent.atomic.AtomicReference
22+
import java.util.function.Consumer
23+
24+
/**
25+
* A consumer for a connection events that records the connection time metrics
26+
*/
27+
class ConnectionTimeMonitoring(
28+
val metrics: RpcMetrics,
29+
) : Consumer<WsConnection.ConnectionStatus> {
30+
31+
private val connectedAt = AtomicReference<Instant?>(null)
32+
33+
val connectionTime: Duration?
34+
get() = connectedAt.get()?.let {
35+
Duration.between(it, Instant.now()).coerceAtLeast(Duration.ofMillis(0))
36+
}
37+
38+
override fun accept(t: WsConnection.ConnectionStatus) {
39+
if (t == WsConnection.ConnectionStatus.CONNECTED) {
40+
connectedAt.set(Instant.now())
41+
} else if (t == WsConnection.ConnectionStatus.DISCONNECTED) {
42+
connectionTime?.let(metrics::recordConnectionTime)
43+
connectedAt.set(null)
44+
}
45+
}
46+
}

src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ open class EthereumWsFactory(
4646
)
4747

4848
RpcMetrics(
49+
metricsTags,
4950
timer = Timer.builder("upstream.ws.conn")
5051
.description("Request time through a WebSocket JSON RPC connection")
5152
.tags(metricsTags)

src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ open class WsConnectionImpl(
120120
private var keepConnection = true
121121
private var connection: Disposable? = null
122122
private val reconnecting = AtomicBoolean(false)
123-
private var onConnectionChange: Consumer<WsConnection.ConnectionStatus>? = null
123+
private var onConnectionChange: Consumer<WsConnection.ConnectionStatus> = defaultOnConnectionChange()
124124

125125
/**
126126
* true when the connection is actively receiving messages
@@ -130,8 +130,18 @@ open class WsConnectionImpl(
130130
override val isConnected: Boolean
131131
get() = connection != null && !reconnecting.get() && active
132132

133+
private fun defaultOnConnectionChange(): Consumer<WsConnection.ConnectionStatus> {
134+
return rpcMetrics?.let {
135+
ConnectionTimeMonitoring(it)
136+
} ?: Consumer { }
137+
}
138+
133139
override fun onConnectionChange(handler: Consumer<WsConnection.ConnectionStatus>?) {
134-
this.onConnectionChange = handler
140+
this.onConnectionChange = if (handler != null) {
141+
handler.andThen(defaultOnConnectionChange())
142+
} else {
143+
defaultOnConnectionChange()
144+
}
135145
}
136146

137147
fun setReconnectIntervalSeconds(value: Long) {
@@ -192,7 +202,7 @@ open class WsConnectionImpl(
192202
.doOnDisconnected {
193203
active = false
194204
disconnects.tryEmitNext(Instant.now())
195-
this.onConnectionChange?.accept(WsConnection.ConnectionStatus.DISCONNECTED)
205+
this.onConnectionChange.accept(WsConnection.ConnectionStatus.DISCONNECTED)
196206
log.info("Disconnected from $uri")
197207
if (keepConnection) {
198208
tryReconnectLater()
@@ -235,13 +245,13 @@ open class WsConnectionImpl(
235245
)
236246
.uri(uri)
237247
.handle { inbound, outbound ->
238-
this.onConnectionChange?.accept(WsConnection.ConnectionStatus.CONNECTED)
248+
this.onConnectionChange.accept(WsConnection.ConnectionStatus.CONNECTED)
239249
// mark as active once connected, because the actual message wouldn't come until a request is sent
240250
active = true
241251
handle(inbound, outbound)
242252
}
243253
.onErrorResume { t ->
244-
log.debug("Dropping WS connection to $uri. Error: ${t.message}")
254+
log.debug("Dropping WS connection to {}. Error: {}", uri, t.message)
245255
Mono.empty<Void>()
246256
}
247257
.subscribe()
@@ -374,6 +384,7 @@ open class WsConnectionImpl(
374384
val internalId = request.id.toLong()
375385
val onResponse = Sinks.one<JsonRpcResponse>()
376386
currentRequests[internalId.toInt()] = onResponse
387+
rpcMetrics?.onMessageEnqueued()
377388

378389
// a default response when nothing is received back from WS after a timeout
379390
val noResponse = JsonRpcException(
@@ -407,7 +418,10 @@ open class WsConnectionImpl(
407418
.switchIfEmpty(
408419
Mono.fromCallable { log.warn("No response for ${request.method} ${request.params}") }.then(Mono.error(noResponse))
409420
)
410-
.doFinally { currentRequests.remove(internalId.toInt()) }
421+
.doFinally {
422+
currentRequests.remove(internalId.toInt())
423+
rpcMetrics?.onMessageFinished()
424+
}
411425
}
412426

413427
override fun close() {

src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class GrpcUpstreams(
186186
)
187187

188188
val metrics = RpcMetrics(
189+
metricsTags,
189190
timer = Timer.builder("upstream.grpc.conn")
190191
.description("Request time through a Dshackle/gRPC connection")
191192
.tags(metricsTags)

src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,15 @@ class JsonRpcHttpClient(
100100
bytes.aggregate().asByteArray().map {
101101
Tuples.of(statusCode, it)
102102
}
103+
}.doFinally {
104+
metrics.onMessageFinished()
103105
}.single()
104106
}
105107

106108
override fun read(key: JsonRpcRequest): Mono<JsonRpcResponse> {
107109
val startTime = StopWatch()
108110
return Mono.just(key)
111+
.doOnSubscribe { metrics.onMessageEnqueued() }
109112
.map(JsonRpcRequest::toJson)
110113
.doOnNext { startTime.start() }
111114
.flatMap(this@JsonRpcHttpClient::execute)

src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/RpcMetrics.kt

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@ package io.emeraldpay.dshackle.upstream.rpcclient
1717

1818
import io.micrometer.core.instrument.Counter
1919
import io.micrometer.core.instrument.DistributionSummary
20+
import io.micrometer.core.instrument.Gauge
21+
import io.micrometer.core.instrument.Metrics
22+
import io.micrometer.core.instrument.Tag
2023
import io.micrometer.core.instrument.Timer
2124
import reactor.core.publisher.Mono
2225
import reactor.netty.ChannelPipelineConfigurer
2326
import reactor.netty.channel.ChannelMetricsRecorder
2427
import reactor.netty.channel.ChannelOperations
28+
import java.time.Duration
29+
import java.util.concurrent.atomic.AtomicInteger
2530
import java.util.function.Function
2631

2732
class RpcMetrics(
33+
tags: Iterable<Tag>,
2834
val timer: Timer,
2935
val fails: Counter,
3036
val responseSize: DistributionSummary,
@@ -33,6 +39,12 @@ class RpcMetrics(
3339
val connectionMetrics: ChannelMetricsRecorder,
3440
) {
3541

42+
private val connectionTime = Timer.builder("netty.client.connection_time")
43+
.tags(tags)
44+
.register(Metrics.globalRegistry)
45+
46+
private val queueSize = AtomicInteger(0)
47+
3648
val onChannelInit: ChannelPipelineConfigurer
3749
get() = ChannelPipelineConfigurer { connectionObserver, channel, remoteAddress ->
3850
// See reactor.netty.transport.TransportConfig$TransportChannelInitializer
@@ -50,4 +62,32 @@ class RpcMetrics(
5062
}
5163
}
5264
}
65+
66+
init {
67+
Gauge.builder("upstream.rpc.queue_size", queueSize.get()::toDouble)
68+
.tags(tags)
69+
.register(Metrics.globalRegistry)
70+
}
71+
72+
/**
73+
* Record the total connection time.
74+
* The difference from #recordConnectTime is that this one is applicable to long
75+
*/
76+
fun recordConnectionTime(time: Duration) {
77+
connectionTime.record(time)
78+
}
79+
80+
/**
81+
* Call when a new request is added to the queue, i.e., is about to be sent to the upstream
82+
*/
83+
fun onMessageEnqueued() {
84+
queueSize.incrementAndGet()
85+
}
86+
87+
/**
88+
* Call when a request it processed, as success, error, timeout or any other status indicating that the request is no longer in queue.
89+
*/
90+
fun onMessageFinished() {
91+
queueSize.decrementAndGet()
92+
}
5393
}

src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class EthereumGrpcUpstreamSpec extends Specification {
4848
MockGrpcServer mockServer = new MockGrpcServer()
4949
ObjectMapper objectMapper = Global.objectMapper
5050
RpcMetrics metrics = new RpcMetrics(
51+
[],
5152
Timer.builder("test1").register(TestingCommons.meterRegistry),
5253
Counter.builder("test2").register(TestingCommons.meterRegistry),
5354
DistributionSummary.builder("test3").register(TestingCommons.meterRegistry),
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Copyright (c) 2023 EmeraldPay, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.emeraldpay.dshackle.upstream.ethereum
17+
18+
import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics
19+
import io.kotest.core.spec.style.ShouldSpec
20+
import io.kotest.matchers.comparables.shouldBeGreaterThan
21+
import io.kotest.matchers.shouldBe
22+
import io.kotest.matchers.shouldNotBe
23+
import io.mockk.mockk
24+
import io.mockk.verify
25+
import java.time.Duration
26+
27+
class ConnectionTimeMonitoringTest : ShouldSpec({
28+
29+
should("Update time on CONNECT") {
30+
val monitoring = ConnectionTimeMonitoring(mockk())
31+
32+
monitoring.connectionTime shouldBe null
33+
monitoring.accept(WsConnection.ConnectionStatus.CONNECTED)
34+
Thread.sleep(25)
35+
monitoring.connectionTime shouldNotBe null
36+
monitoring.connectionTime!! shouldBeGreaterThan Duration.ofMillis(20)
37+
}
38+
39+
should("Record time on DISCONNECT") {
40+
val rpc = mockk<RpcMetrics>(relaxed = true)
41+
val monitoring = ConnectionTimeMonitoring(rpc)
42+
43+
monitoring.accept(WsConnection.ConnectionStatus.CONNECTED)
44+
Thread.sleep(25)
45+
monitoring.accept(WsConnection.ConnectionStatus.DISCONNECTED)
46+
47+
// erases the time after recording it
48+
monitoring.connectionTime shouldBe null
49+
50+
verify {
51+
rpc.recordConnectionTime(more(Duration.ofMillis(20)))
52+
}
53+
}
54+
})

src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import io.kotest.matchers.shouldBe
1010
import io.kotest.matchers.types.instanceOf
1111
import io.micrometer.core.instrument.Counter
1212
import io.micrometer.core.instrument.DistributionSummary
13+
import io.micrometer.core.instrument.Tag
1314
import io.micrometer.core.instrument.Timer
1415
import org.mockserver.integration.ClientAndServer
1516
import org.mockserver.model.HttpRequest
@@ -23,6 +24,7 @@ class JsonRpcHttpClientTest : ShouldSpec({
2324

2425
val port = SocketUtils.findAvailableTcpPort(20000)
2526
val metrics = RpcMetrics(
27+
emptyList<Tag>(),
2628
Timer.builder("test1").register(TestingCommonsKotlin.meterRegistry),
2729
Counter.builder("test2").register(TestingCommonsKotlin.meterRegistry),
2830
DistributionSummary.builder("test3").register(TestingCommonsKotlin.meterRegistry),

0 commit comments

Comments
 (0)