Skip to content

Commit f8c2f53

Browse files
authored
Clear wsSubscriptionActive metrics when wss disconnent (#643)
* Clear wsSubscriptionActive metrics when wss disconnent * Comments
1 parent e33af1a commit f8c2f53

File tree

3 files changed

+71
-16
lines changed

3 files changed

+71
-16
lines changed

src/transports/metrics.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,32 @@ export const messageSubsLabels = <T extends TransportGenerics>(
2929
}
3030
}
3131

32-
// Record WS message and subscription metrics
33-
// Recalculate cacheKey and feedId for metrics
34-
// since avoiding storing extra info in expiring sorted set
35-
export const recordWsMessageMetrics = <T extends TransportGenerics>(
32+
export const recordWsMessageSentMetrics = <T extends TransportGenerics>(
3633
context: EndpointContext<T>,
3734
subscribes: TypeFromDefinition<T['Parameters']>[],
3835
unsubscribes: TypeFromDefinition<T['Parameters']>[],
39-
): void => {
40-
const recordMetrics = (params: TypeFromDefinition<T['Parameters']>, type: 'sub' | 'unsub') => {
36+
) => {
37+
for (const params of [...subscribes, ...unsubscribes]) {
4138
const baseLabels = messageSubsLabels(context, params)
4239

4340
// Record total number of ws messages sent
4441
metrics
4542
.get('wsMessageTotal')
4643
.labels({ ...baseLabels, direction: 'sent' })
4744
.inc()
45+
}
46+
}
47+
48+
// Record WS message and subscription metrics
49+
// Recalculate cacheKey and feedId for metrics
50+
// since avoiding storing extra info in expiring sorted set
51+
export const recordWsMessageSubMetrics = <T extends TransportGenerics>(
52+
context: EndpointContext<T>,
53+
subscribes: TypeFromDefinition<T['Parameters']>[],
54+
unsubscribes: TypeFromDefinition<T['Parameters']>[],
55+
): void => {
56+
const recordMetrics = (params: TypeFromDefinition<T['Parameters']>, type: 'sub' | 'unsub') => {
57+
const baseLabels = messageSubsLabels(context, params)
4858

4959
// Record total number of subscriptions made
5060
if (type === 'sub') {

src/transports/websocket.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ import { PartialSuccessfulResponse, ProviderResult, TimestampedProviderResult }
66
import { TypeFromDefinition } from '../validation/input-params'
77
import { TransportGenerics } from './'
88
import { StreamingTransport, SubscriptionDeltas } from './abstract/streaming'
9-
import { connectionErrorLabels, recordWsMessageMetrics } from './metrics'
9+
import {
10+
connectionErrorLabels,
11+
recordWsMessageSentMetrics,
12+
recordWsMessageSubMetrics,
13+
} from './metrics'
1014

1115
// Aliasing type for use at adapter level
1216
export { WebSocket, RawData as WebSocketRawData }
@@ -453,6 +457,8 @@ export class WebSocketTransport<
453457
connectionClosed = true
454458

455459
if (subscriptions.desired.length) {
460+
// Clear subscription metrics for all active subscriptions
461+
recordWsMessageSubMetrics(context, [], subscriptions.desired)
456462
censorLogs(() =>
457463
logger.trace(
458464
`Connection will be reopened and will subscribe to new and resubscribe to existing: ${JSON.stringify(
@@ -503,15 +509,16 @@ export class WebSocketTransport<
503509
? subscriptions.stale.map((sub) => unsubscribeMessage(sub, context))
504510
: subscriptions.stale,
505511
)
512+
recordWsMessageSentMetrics(context, subscriptions.new, subscriptions.stale)
506513
} else {
507514
logger.trace(
508515
"This ws transport has no builders configured, so we're not sending any messages",
509516
)
510517
}
511518
}
512519

513-
// Record WS message and subscription metrics
514-
recordWsMessageMetrics(context, subscriptions.new, subscriptions.stale)
520+
// Record WS subscription metrics
521+
recordWsMessageSubMetrics(context, subscriptions.new, subscriptions.stale)
515522

516523
// The background execute loop no longer sleeps between executions, so we have to do it here
517524
logger.trace(

test/transports/websocket.test.ts

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,13 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
293293
const base = 'ETH'
294294
const quote = 'DOGE'
295295
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 1000
296+
process.env['METRICS_ENABLED'] = 'true'
297+
eaMetrics.clear()
298+
299+
const labels = {
300+
feed_id: "{'base':'eth','quote':'doge'}",
301+
subscription_key: "test-{'base':'eth','quote':'doge'}",
302+
}
296303

297304
// Mock WS
298305
mockWebSocketProvider(WebSocketClassProvider)
@@ -304,6 +311,12 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
304311
connectionCounter++
305312
socket.on('message', () => {
306313
messageCounter++
314+
socket.send(
315+
JSON.stringify({
316+
pair: `${base}/${quote}`,
317+
value: price,
318+
}),
319+
)
307320
})
308321
})
309322

@@ -314,21 +327,45 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
314327

315328
const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)
316329

317-
const error = await testAdapter.request({
318-
base,
319-
quote,
330+
await testAdapter.startBackgroundExecuteThenGetResponse(t, {
331+
requestData: { base, quote },
332+
expectedResponse: {
333+
data: {
334+
result: price,
335+
},
336+
result: price,
337+
statusCode: 200,
338+
},
320339
})
321-
t.is(error.statusCode, 504)
322340

323-
// The WS connection should not send any messages to the EA, so we advance the clock until
324-
// we reach the point where the EA will consider it unhealthy and reconnect.
325-
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 2 + 100)
341+
let metrics = await testAdapter.getMetrics()
342+
metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 })
343+
metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 1 })
344+
metrics.assert(t, {
345+
name: 'ws_message_total',
346+
labels: { ...labels, direction: 'sent' },
347+
expectedValue: 1,
348+
})
349+
350+
// Advance to next cycle where connection is unhealthy and reconnect
351+
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100)
326352

327353
// The connection was opened twice
328354
t.is(connectionCounter, 2)
329355
// The subscribe message was sent twice as well, since when we reopened we resubscribed to everything
330356
t.is(messageCounter, 2)
331357

358+
// Only one active sub should be recorded
359+
metrics = await testAdapter.getMetrics()
360+
metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 })
361+
metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 2 })
362+
metrics.assert(t, {
363+
name: 'ws_message_total',
364+
labels: { ...labels, direction: 'sent' },
365+
expectedValue: 2,
366+
})
367+
368+
process.env['METRICS_ENABLED'] = 'false'
332369
testAdapter.api.close()
333370
mockWsServer.close()
334371
await t.context.clock.runToLastAsync()
@@ -432,6 +469,7 @@ test.serial(
432469
const base = 'ETH'
433470
const quote = 'DOGE'
434471
process.env['METRICS_ENABLED'] = 'true'
472+
eaMetrics.clear()
435473

436474
let execution = 0
437475

0 commit comments

Comments
 (0)