diff --git a/src/transports/metrics.ts b/src/transports/metrics.ts index be871bc7..d03fb8c1 100644 --- a/src/transports/metrics.ts +++ b/src/transports/metrics.ts @@ -29,15 +29,12 @@ export const messageSubsLabels = ( } } -// Record WS message and subscription metrics -// Recalculate cacheKey and feedId for metrics -// since avoiding storing extra info in expiring sorted set -export const recordWsMessageMetrics = ( +export const recordWsMessageSentMetrics = ( context: EndpointContext, subscribes: TypeFromDefinition[], unsubscribes: TypeFromDefinition[], -): void => { - const recordMetrics = (params: TypeFromDefinition, type: 'sub' | 'unsub') => { +) => { + for (const params of [...subscribes, ...unsubscribes]) { const baseLabels = messageSubsLabels(context, params) // Record total number of ws messages sent @@ -45,6 +42,19 @@ export const recordWsMessageMetrics = ( .get('wsMessageTotal') .labels({ ...baseLabels, direction: 'sent' }) .inc() + } +} + +// Record WS message and subscription metrics +// Recalculate cacheKey and feedId for metrics +// since avoiding storing extra info in expiring sorted set +export const recordWsMessageSubMetrics = ( + context: EndpointContext, + subscribes: TypeFromDefinition[], + unsubscribes: TypeFromDefinition[], +): void => { + const recordMetrics = (params: TypeFromDefinition, type: 'sub' | 'unsub') => { + const baseLabels = messageSubsLabels(context, params) // Record total number of subscriptions made if (type === 'sub') { diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index 967621f7..eafb3077 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -6,7 +6,11 @@ import { PartialSuccessfulResponse, ProviderResult, TimestampedProviderResult } import { TypeFromDefinition } from '../validation/input-params' import { TransportGenerics } from './' import { StreamingTransport, SubscriptionDeltas } from './abstract/streaming' -import { connectionErrorLabels, recordWsMessageMetrics } from './metrics' +import { + connectionErrorLabels, + recordWsMessageSentMetrics, + recordWsMessageSubMetrics, +} from './metrics' // Aliasing type for use at adapter level export { WebSocket, RawData as WebSocketRawData } @@ -453,6 +457,8 @@ export class WebSocketTransport< connectionClosed = true if (subscriptions.desired.length) { + // Clear subscription metrics for all active subscriptions + recordWsMessageSubMetrics(context, [], subscriptions.desired) censorLogs(() => logger.trace( `Connection will be reopened and will subscribe to new and resubscribe to existing: ${JSON.stringify( @@ -503,6 +509,7 @@ export class WebSocketTransport< ? subscriptions.stale.map((sub) => unsubscribeMessage(sub, context)) : subscriptions.stale, ) + recordWsMessageSentMetrics(context, subscriptions.new, subscriptions.stale) } else { logger.trace( "This ws transport has no builders configured, so we're not sending any messages", @@ -510,8 +517,8 @@ export class WebSocketTransport< } } - // Record WS message and subscription metrics - recordWsMessageMetrics(context, subscriptions.new, subscriptions.stale) + // Record WS subscription metrics + recordWsMessageSubMetrics(context, subscriptions.new, subscriptions.stale) // The background execute loop no longer sleeps between executions, so we have to do it here logger.trace( diff --git a/test/transports/websocket.test.ts b/test/transports/websocket.test.ts index be21f1ad..529d8c5d 100644 --- a/test/transports/websocket.test.ts +++ b/test/transports/websocket.test.ts @@ -293,6 +293,13 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => { const base = 'ETH' const quote = 'DOGE' const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 1000 + process.env['METRICS_ENABLED'] = 'true' + eaMetrics.clear() + + const labels = { + feed_id: "{'base':'eth','quote':'doge'}", + subscription_key: "test-{'base':'eth','quote':'doge'}", + } // Mock WS mockWebSocketProvider(WebSocketClassProvider) @@ -304,6 +311,12 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => { connectionCounter++ socket.on('message', () => { messageCounter++ + socket.send( + JSON.stringify({ + pair: `${base}/${quote}`, + value: price, + }), + ) }) }) @@ -314,21 +327,45 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => { const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) - const error = await testAdapter.request({ - base, - quote, + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base, quote }, + expectedResponse: { + data: { + result: price, + }, + result: price, + statusCode: 200, + }, }) - t.is(error.statusCode, 504) - // The WS connection should not send any messages to the EA, so we advance the clock until - // we reach the point where the EA will consider it unhealthy and reconnect. - await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 2 + 100) + let metrics = await testAdapter.getMetrics() + metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 }) + metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 1 }) + metrics.assert(t, { + name: 'ws_message_total', + labels: { ...labels, direction: 'sent' }, + expectedValue: 1, + }) + + // Advance to next cycle where connection is unhealthy and reconnect + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100) // The connection was opened twice t.is(connectionCounter, 2) // The subscribe message was sent twice as well, since when we reopened we resubscribed to everything t.is(messageCounter, 2) + // Only one active sub should be recorded + metrics = await testAdapter.getMetrics() + metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 }) + metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 2 }) + metrics.assert(t, { + name: 'ws_message_total', + labels: { ...labels, direction: 'sent' }, + expectedValue: 2, + }) + + process.env['METRICS_ENABLED'] = 'false' testAdapter.api.close() mockWsServer.close() await t.context.clock.runToLastAsync() @@ -432,6 +469,7 @@ test.serial( const base = 'ETH' const quote = 'DOGE' process.env['METRICS_ENABLED'] = 'true' + eaMetrics.clear() let execution = 0