Skip to content

Commit 00dfba8

Browse files
committed
Clear wsSubscriptionActive metrics when wss disconnent
1 parent e33af1a commit 00dfba8

File tree

3 files changed

+49
-11
lines changed

3 files changed

+49
-11
lines changed

src/transports/metrics.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,18 @@ export const recordWsMessageMetrics = <T extends TransportGenerics>(
3636
context: EndpointContext<T>,
3737
subscribes: TypeFromDefinition<T['Parameters']>[],
3838
unsubscribes: TypeFromDefinition<T['Parameters']>[],
39+
wsMessageTotal: boolean = true,
3940
): void => {
4041
const recordMetrics = (params: TypeFromDefinition<T['Parameters']>, type: 'sub' | 'unsub') => {
4142
const baseLabels = messageSubsLabels(context, params)
4243

4344
// Record total number of ws messages sent
44-
metrics
45-
.get('wsMessageTotal')
46-
.labels({ ...baseLabels, direction: 'sent' })
47-
.inc()
45+
if (wsMessageTotal) {
46+
metrics
47+
.get('wsMessageTotal')
48+
.labels({ ...baseLabels, direction: 'sent' })
49+
.inc()
50+
}
4851

4952
// Record total number of subscriptions made
5053
if (type === 'sub') {

src/transports/websocket.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,8 @@ export class WebSocketTransport<
453453
connectionClosed = true
454454

455455
if (subscriptions.desired.length) {
456+
// Clear wsSubscriptionActive metrics for all active subscriptions
457+
recordWsMessageMetrics(context, [], subscriptions.desired, false)
456458
censorLogs(() =>
457459
logger.trace(
458460
`Connection will be reopened and will subscribe to new and resubscribe to existing: ${JSON.stringify(

test/transports/websocket.test.ts

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,13 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
293293
const base = 'ETH'
294294
const quote = 'DOGE'
295295
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 1000
296+
process.env['METRICS_ENABLED'] = 'true'
297+
eaMetrics.clear()
298+
299+
const labels = {
300+
feed_id: "{'base':'eth','quote':'doge'}",
301+
subscription_key: "test-{'base':'eth','quote':'doge'}",
302+
}
296303

297304
// Mock WS
298305
mockWebSocketProvider(WebSocketClassProvider)
@@ -304,6 +311,12 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
304311
connectionCounter++
305312
socket.on('message', () => {
306313
messageCounter++
314+
socket.send(
315+
JSON.stringify({
316+
pair: `${base}/${quote}`,
317+
value: price,
318+
}),
319+
)
307320
})
308321
})
309322

@@ -314,21 +327,40 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
314327

315328
const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)
316329

317-
const error = await testAdapter.request({
318-
base,
319-
quote,
330+
await testAdapter.startBackgroundExecuteThenGetResponse(t, {
331+
requestData: { base, quote },
332+
expectedResponse: {
333+
data: {
334+
result: price,
335+
},
336+
result: price,
337+
statusCode: 200,
338+
},
320339
})
321-
t.is(error.statusCode, 504)
322340

323-
// The WS connection should not send any messages to the EA, so we advance the clock until
324-
// we reach the point where the EA will consider it unhealthy and reconnect.
325-
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 2 + 100)
341+
let metrics = await testAdapter.getMetrics()
342+
metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 })
343+
metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 1 })
344+
345+
// Advance to next cycle where connection is unhealthy and reconnect
346+
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100)
326347

327348
// The connection was opened twice
328349
t.is(connectionCounter, 2)
329350
// The subscribe message was sent twice as well, since when we reopened we resubscribed to everything
330351
t.is(messageCounter, 2)
331352

353+
// Only one active sub should be recorded
354+
metrics = await testAdapter.getMetrics()
355+
metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 })
356+
metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 2 })
357+
metrics.assert(t, {
358+
name: 'ws_message_total',
359+
labels: { ...labels, direction: 'sent' },
360+
expectedValue: 2,
361+
})
362+
363+
process.env['METRICS_ENABLED'] = 'false'
332364
testAdapter.api.close()
333365
mockWsServer.close()
334366
await t.context.clock.runToLastAsync()
@@ -432,6 +464,7 @@ test.serial(
432464
const base = 'ETH'
433465
const quote = 'DOGE'
434466
process.env['METRICS_ENABLED'] = 'true'
467+
eaMetrics.clear()
435468

436469
let execution = 0
437470

0 commit comments

Comments
 (0)