Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions src/transports/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,32 @@ export const messageSubsLabels = <T extends TransportGenerics>(
}
}

// Record WS message and subscription metrics
// Recalculate cacheKey and feedId for metrics
// since avoiding storing extra info in expiring sorted set
export const recordWsMessageMetrics = <T extends TransportGenerics>(
export const recordWsMessageSentMetrics = <T extends TransportGenerics>(
context: EndpointContext<T>,
subscribes: TypeFromDefinition<T['Parameters']>[],
unsubscribes: TypeFromDefinition<T['Parameters']>[],
): void => {
const recordMetrics = (params: TypeFromDefinition<T['Parameters']>, type: 'sub' | 'unsub') => {
) => {
for (const params of [...subscribes, ...unsubscribes]) {
const baseLabels = messageSubsLabels(context, params)

// Record total number of ws messages sent
metrics
.get('wsMessageTotal')
.labels({ ...baseLabels, direction: 'sent' })
.inc()
}
}

// Record WS message and subscription metrics
// Recalculate cacheKey and feedId for metrics
// since avoiding storing extra info in expiring sorted set
export const recordWsMessageSubMetrics = <T extends TransportGenerics>(
context: EndpointContext<T>,
subscribes: TypeFromDefinition<T['Parameters']>[],
unsubscribes: TypeFromDefinition<T['Parameters']>[],
): void => {
const recordMetrics = (params: TypeFromDefinition<T['Parameters']>, type: 'sub' | 'unsub') => {
const baseLabels = messageSubsLabels(context, params)

// Record total number of subscriptions made
if (type === 'sub') {
Expand Down
13 changes: 10 additions & 3 deletions src/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import { PartialSuccessfulResponse, ProviderResult, TimestampedProviderResult }
import { TypeFromDefinition } from '../validation/input-params'
import { TransportGenerics } from './'
import { StreamingTransport, SubscriptionDeltas } from './abstract/streaming'
import { connectionErrorLabels, recordWsMessageMetrics } from './metrics'
import {
connectionErrorLabels,
recordWsMessageSentMetrics,
recordWsMessageSubMetrics,
} from './metrics'

// Aliasing type for use at adapter level
export { WebSocket, RawData as WebSocketRawData }
Expand Down Expand Up @@ -453,6 +457,8 @@ export class WebSocketTransport<
connectionClosed = true

if (subscriptions.desired.length) {
// Clear subscription metrics for all active subscriptions
recordWsMessageSubMetrics(context, [], subscriptions.desired)
censorLogs(() =>
logger.trace(
`Connection will be reopened and will subscribe to new and resubscribe to existing: ${JSON.stringify(
Expand Down Expand Up @@ -503,15 +509,16 @@ export class WebSocketTransport<
? subscriptions.stale.map((sub) => unsubscribeMessage(sub, context))
: subscriptions.stale,
)
recordWsMessageSentMetrics(context, subscriptions.new, subscriptions.stale)
} else {
logger.trace(
"This ws transport has no builders configured, so we're not sending any messages",
)
}
}

// Record WS message and subscription metrics
recordWsMessageMetrics(context, subscriptions.new, subscriptions.stale)
// Record WS subscription metrics
recordWsMessageSubMetrics(context, subscriptions.new, subscriptions.stale)

// The background execute loop no longer sleeps between executions, so we have to do it here
logger.trace(
Expand Down
52 changes: 45 additions & 7 deletions test/transports/websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
const base = 'ETH'
const quote = 'DOGE'
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 1000
process.env['METRICS_ENABLED'] = 'true'
eaMetrics.clear()

const labels = {
feed_id: "{'base':'eth','quote':'doge'}",
subscription_key: "test-{'base':'eth','quote':'doge'}",
}

// Mock WS
mockWebSocketProvider(WebSocketClassProvider)
Expand All @@ -304,6 +311,12 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
connectionCounter++
socket.on('message', () => {
messageCounter++
socket.send(
JSON.stringify({
pair: `${base}/${quote}`,
value: price,
}),
)
})
})

Expand All @@ -314,21 +327,45 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {

const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)

const error = await testAdapter.request({
base,
quote,
await testAdapter.startBackgroundExecuteThenGetResponse(t, {
requestData: { base, quote },
expectedResponse: {
data: {
result: price,
},
result: price,
statusCode: 200,
},
})
t.is(error.statusCode, 504)

// The WS connection should not send any messages to the EA, so we advance the clock until
// we reach the point where the EA will consider it unhealthy and reconnect.
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 2 + 100)
let metrics = await testAdapter.getMetrics()
metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 })
metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 1 })
metrics.assert(t, {
name: 'ws_message_total',
labels: { ...labels, direction: 'sent' },
expectedValue: 1,
})

// Advance to next cycle where connection is unhealthy and reconnect
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100)

// The connection was opened twice
t.is(connectionCounter, 2)
// The subscribe message was sent twice as well, since when we reopened we resubscribed to everything
t.is(messageCounter, 2)

// Only one active sub should be recorded
metrics = await testAdapter.getMetrics()
metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 })
metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 2 })
metrics.assert(t, {
name: 'ws_message_total',
labels: { ...labels, direction: 'sent' },
expectedValue: 2,
})

process.env['METRICS_ENABLED'] = 'false'
testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()
Expand Down Expand Up @@ -432,6 +469,7 @@ test.serial(
const base = 'ETH'
const quote = 'DOGE'
process.env['METRICS_ENABLED'] = 'true'
eaMetrics.clear()

let execution = 0

Expand Down
Loading