Skip to content

Commit 6365866

Browse files
committed
Comments
1 parent 00dfba8 commit 6365866

File tree

3 files changed

+32
-15
lines changed

3 files changed

+32
-15
lines changed

src/transports/metrics.ts

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,33 @@ export const messageSubsLabels = <T extends TransportGenerics>(
2929
}
3030
}
3131

32+
export const recordWsMessageSentMetrics = <T extends TransportGenerics>(
33+
context: EndpointContext<T>,
34+
subscribes: TypeFromDefinition<T['Parameters']>[],
35+
unsubscribes: TypeFromDefinition<T['Parameters']>[],
36+
) => {
37+
for (const params of [...subscribes, ...unsubscribes]) {
38+
const baseLabels = messageSubsLabels(context, params)
39+
40+
// Record total number of ws messages sent
41+
metrics
42+
.get('wsMessageTotal')
43+
.labels({ ...baseLabels, direction: 'sent' })
44+
.inc()
45+
}
46+
}
47+
3248
// Record WS message and subscription metrics
3349
// Recalculate cacheKey and feedId for metrics
3450
// since avoiding storing extra info in expiring sorted set
35-
export const recordWsMessageMetrics = <T extends TransportGenerics>(
51+
export const recordWsMessageSubMetrics = <T extends TransportGenerics>(
3652
context: EndpointContext<T>,
3753
subscribes: TypeFromDefinition<T['Parameters']>[],
3854
unsubscribes: TypeFromDefinition<T['Parameters']>[],
39-
wsMessageTotal: boolean = true,
4055
): void => {
4156
const recordMetrics = (params: TypeFromDefinition<T['Parameters']>, type: 'sub' | 'unsub') => {
4257
const baseLabels = messageSubsLabels(context, params)
4358

44-
// Record total number of ws messages sent
45-
if (wsMessageTotal) {
46-
metrics
47-
.get('wsMessageTotal')
48-
.labels({ ...baseLabels, direction: 'sent' })
49-
.inc()
50-
}
51-
5259
// Record total number of subscriptions made
5360
if (type === 'sub') {
5461
metrics.get('wsSubscriptionTotal').labels(baseLabels).inc()

src/transports/websocket.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ import { PartialSuccessfulResponse, ProviderResult, TimestampedProviderResult }
66
import { TypeFromDefinition } from '../validation/input-params'
77
import { TransportGenerics } from './'
88
import { StreamingTransport, SubscriptionDeltas } from './abstract/streaming'
9-
import { connectionErrorLabels, recordWsMessageMetrics } from './metrics'
9+
import {
10+
connectionErrorLabels,
11+
recordWsMessageSentMetrics,
12+
recordWsMessageSubMetrics,
13+
} from './metrics'
1014

1115
// Aliasing type for use at adapter level
1216
export { WebSocket, RawData as WebSocketRawData }
@@ -453,8 +457,8 @@ export class WebSocketTransport<
453457
connectionClosed = true
454458

455459
if (subscriptions.desired.length) {
456-
// Clear wsSubscriptionActive metrics for all active subscriptions
457-
recordWsMessageMetrics(context, [], subscriptions.desired, false)
460+
// Clear subscription metrics for all active subscriptions
461+
recordWsMessageSubMetrics(context, [], subscriptions.desired)
458462
censorLogs(() =>
459463
logger.trace(
460464
`Connection will be reopened and will subscribe to new and resubscribe to existing: ${JSON.stringify(
@@ -505,15 +509,16 @@ export class WebSocketTransport<
505509
? subscriptions.stale.map((sub) => unsubscribeMessage(sub, context))
506510
: subscriptions.stale,
507511
)
512+
recordWsMessageSentMetrics(context, subscriptions.new, subscriptions.stale)
508513
} else {
509514
logger.trace(
510515
"This ws transport has no builders configured, so we're not sending any messages",
511516
)
512517
}
513518
}
514519

515-
// Record WS message and subscription metrics
516-
recordWsMessageMetrics(context, subscriptions.new, subscriptions.stale)
520+
// Record WS subscription metrics
521+
recordWsMessageSubMetrics(context, subscriptions.new, subscriptions.stale)
517522

518523
// The background execute loop no longer sleeps between executions, so we have to do it here
519524
logger.trace(

test/transports/websocket.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,11 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
341341
let metrics = await testAdapter.getMetrics()
342342
metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 })
343343
metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 1 })
344+
metrics.assert(t, {
345+
name: 'ws_message_total',
346+
labels: { ...labels, direction: 'sent' },
347+
expectedValue: 1,
348+
})
344349

345350
// Advance to next cycle where connection is unhealthy and reconnect
346351
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100)

0 commit comments

Comments
 (0)