Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/transports/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ export const recordWsMessageMetrics = <T extends TransportGenerics>(
context: EndpointContext<T>,
subscribes: TypeFromDefinition<T['Parameters']>[],
unsubscribes: TypeFromDefinition<T['Parameters']>[],
wsMessageTotal: boolean = true,
): void => {
const recordMetrics = (params: TypeFromDefinition<T['Parameters']>, type: 'sub' | 'unsub') => {
const baseLabels = messageSubsLabels(context, params)

// Record total number of ws messages sent
metrics
.get('wsMessageTotal')
.labels({ ...baseLabels, direction: 'sent' })
.inc()
if (wsMessageTotal) {
metrics
.get('wsMessageTotal')
.labels({ ...baseLabels, direction: 'sent' })
.inc()
}

// Record total number of subscriptions made
if (type === 'sub') {
Expand Down
2 changes: 2 additions & 0 deletions src/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ export class WebSocketTransport<
connectionClosed = true

if (subscriptions.desired.length) {
// Clear wsSubscriptionActive metrics for all active subscriptions
recordWsMessageMetrics(context, [], subscriptions.desired, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: This streamHandler is really long and hard to read. Do you think it would make sense to split up into multiple functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but not sure how to make that happen, I can put in a TODO and create a ticket but I doubt it would be picked up.

I would've done if not code freeze

censorLogs(() =>
logger.trace(
`Connection will be reopened and will subscribe to new and resubscribe to existing: ${JSON.stringify(
Expand Down
47 changes: 40 additions & 7 deletions test/transports/websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
const base = 'ETH'
const quote = 'DOGE'
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 1000
process.env['METRICS_ENABLED'] = 'true'
eaMetrics.clear()

const labels = {
feed_id: "{'base':'eth','quote':'doge'}",
subscription_key: "test-{'base':'eth','quote':'doge'}",
}

// Mock WS
mockWebSocketProvider(WebSocketClassProvider)
Expand All @@ -304,6 +311,12 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {
connectionCounter++
socket.on('message', () => {
messageCounter++
socket.send(
JSON.stringify({
pair: `${base}/${quote}`,
value: price,
}),
)
})
})

Expand All @@ -314,21 +327,40 @@ test.serial('reconnects if connection becomes unresponsive', async (t) => {

const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)

const error = await testAdapter.request({
base,
quote,
await testAdapter.startBackgroundExecuteThenGetResponse(t, {
requestData: { base, quote },
expectedResponse: {
data: {
result: price,
},
result: price,
statusCode: 200,
},
})
t.is(error.statusCode, 504)

// The WS connection should not send any messages to the EA, so we advance the clock until
// we reach the point where the EA will consider it unhealthy and reconnect.
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 2 + 100)
let metrics = await testAdapter.getMetrics()
metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 })
metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 1 })

// Advance to next cycle where connection is unhealthy and reconnect
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100)

// The connection was opened twice
t.is(connectionCounter, 2)
// The subscribe message was sent twice as well, since when we reopened we resubscribed to everything
t.is(messageCounter, 2)

// Only one active sub should be recorded
metrics = await testAdapter.getMetrics()
metrics.assert(t, { name: 'ws_subscription_active', labels, expectedValue: 1 })
metrics.assert(t, { name: 'ws_subscription_total', labels, expectedValue: 2 })
metrics.assert(t, {
name: 'ws_message_total',
labels: { ...labels, direction: 'sent' },
expectedValue: 2,
})

process.env['METRICS_ENABLED'] = 'false'
testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()
Expand Down Expand Up @@ -432,6 +464,7 @@ test.serial(
const base = 'ETH'
const quote = 'DOGE'
process.env['METRICS_ENABLED'] = 'true'
eaMetrics.clear()

let execution = 0

Expand Down
Loading