diff --git a/docs/components/transport-types/websocket-transport.md b/docs/components/transport-types/websocket-transport.md index 78481228..9b4a628b 100644 --- a/docs/components/transport-types/websocket-transport.md +++ b/docs/components/transport-types/websocket-transport.md @@ -109,6 +109,55 @@ In the example above, after the connection is established a custom authenticatio As shown in the first example, **builders** object contains two methods, **subscribeMessage** and **unsubscribeMessage**. These methods can be provided for the WS transport to use to send subscription/unsubscription messages to Data Provider. Both accept _params_ which is the current input parameters of the request and should return object or string as payload that will be sent to Data Provider. If the payload is object it will be automatically stringified to JSON. +### Heartbeat messages + +Some WebSocket providers require periodic heartbeat messages to keep the connection alive. The `WebSocketTransport` supports sending heartbeat messages automatically at a configurable interval. + +To enable heartbeat functionality, provide a `heartbeat` handler in the `handlers` object. The heartbeat will automatically start when the connection is opened and stop when the connection is closed. + +The `heartbeat` handler receives the WebSocket connection and adapter context, allowing you to implement any heartbeat logic you need. You can use WebSocket protocol-level ping, send custom messages, or perform any other heartbeat-related operations. + +**Using WebSocket protocol-level ping:** + +```typescript +handlers: { + heartbeat: (connection) => { + connection.ping() + }, +} +``` + +**Using custom heartbeat message:** + +```typescript +handlers: { + heartbeat: (connection, context) => { + connection.send(JSON.stringify({ + type: 'ping', + timestamp: Date.now(), + })) + }, +} +``` + +**Using ping with optional data:** + +```typescript +handlers: { + heartbeat: (connection) => { + connection.ping('heartbeat-data') + }, +} +``` + +The heartbeat interval is controlled by the `WS_HEARTBEAT_INTERVAL_MS` adapter setting (default: 10000ms). The heartbeat will automatically stop if: + +- The connection is closed +- The connection state is no longer `OPEN` +- A new heartbeat is started (replaces the previous one) + +**Note:** The heartbeat only starts if the `heartbeat` handler is provided. If you don't need heartbeat functionality, simply omit the `heartbeat` handler. + ### Retrieving and storing the response or errors As shown in the first example, the **handlers** object accepts a function called **message** which will be executed when Data Provider sends a message through the WS connection. It takes this message as its first argument and the adapter context as the second, and should build and return a list of response objects (_ProviderResult_) that will be stored in the response cache for the endpoint. diff --git a/docs/reference-tables/ea-settings.md b/docs/reference-tables/ea-settings.md index 16034ee2..dfb14bc9 100644 --- a/docs/reference-tables/ea-settings.md +++ b/docs/reference-tables/ea-settings.md @@ -63,5 +63,6 @@ | TLS_PUBLIC_KEY | string | undefined | Base64 Public Key of TSL/SSL certificate | - Value must be a valid base64 string | | | WARMUP_SUBSCRIPTION_TTL | number | 300000 | TTL for batch warmer subscriptions | - Value must be an integer
- Value must be above the minimum
- Value must be below the maximum | 0 | 3600000 | | WS_CONNECTION_OPEN_TIMEOUT | number | 10000 | The maximum amount of time in milliseconds to wait for the websocket connection to open (including custom open handler) | - Value must be an integer
- Value must be above the minimum
- Value must be below the maximum | 500 | 30000 | +| WS_HEARTBEAT_INTERVAL_MS | number | 10000 | The number of ms between each hearbeat message that EA sends to server, only works if heartbeat handler is provided | - Value must be an integer
- Value must be above the minimum
- Value must be below the maximum | 5000 | 300000 | | WS_SUBSCRIPTION_TTL | number | 120000 | The time in ms a request will live in the subscription set before becoming stale | - Value must be an integer
- Value must be above the minimum
- Value must be below the maximum | 0 | 3600000 | | WS_SUBSCRIPTION_UNRESPONSIVE_TTL | number | 120000 | The maximum acceptable time (in milliseconds) since the last message was received and stored in the cache on a WebSocket connection before it is considered unresponsive, causing the adapter to close and attempt to reopen it. | - Value must be an integer
- Value must be above the minimum
- Value must be below the maximum | 1000 | 180000 | diff --git a/src/config/index.ts b/src/config/index.ts index 2251aad9..bed422b0 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -249,6 +249,13 @@ export const BaseSettingsDefinition = { default: 10_000, validate: validator.integer({ min: 500, max: 30_000 }), }, + WS_HEARTBEAT_INTERVAL_MS: { + description: + 'The number of ms between each hearbeat message that EA sends to server, only works if heartbeat handler is provided', + type: 'number', + default: 10_000, + validate: validator.integer({ min: 5_000, max: 300_000 }), + }, CACHE_POLLING_MAX_RETRIES: { description: 'Max amount of times to attempt to find EA response in the cache after the Transport has been set up', diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index 2a53cfca..967621f7 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -64,10 +64,20 @@ export interface WebSocketTransportConfig * Note: any listeners set in this method will be cleared after its execution. * * @param wsConnection - the WebSocket with an established connection + * @param context - the background context for the Adapter * @returns an empty Promise, or void */ open?: (wsConnection: WebSocket, context: EndpointContext) => Promise | void + /** + * Handles when client is ready to send a heartbeat to server + * + * @param wsConnection - the WebSocket with an established connection + * @param context - the background context for the Adapter + * @returns an empty Promise, or void + */ + heartbeat?: (wsConnection: WebSocket, context: EndpointContext) => Promise | void + /** * Handles when the websocket connection dispatches an error event * Optional to let the adapter handle the event in its own way if it decides to @@ -159,6 +169,7 @@ export class WebSocketTransport< lastMessageReceivedAt = 0 connectionOpenedAt = 0 streamHandlerInvocationsWithNoConnection = 0 + heartbeatInterval?: NodeJS.Timeout constructor(private config: WebSocketTransportConfig) { super() @@ -179,6 +190,39 @@ export class WebSocketTransport< return JSON.parse(data.toString()) as T['Provider']['WsMessage'] } + startHeartbeat(context: EndpointContext): void { + if (this.config.handlers.heartbeat) { + this.stopHeartbeat() + + const intervalId = setInterval(async () => { + if (this.heartbeatInterval !== intervalId) { + clearInterval(intervalId) + return + } + + if (this.wsConnection && this.wsConnection.readyState === WebSocket.OPEN) { + try { + logger.debug('Calling heartbeat handler') + await this.config.handlers.heartbeat?.(this.wsConnection, context) + } catch (error) { + logger.warn({ error }, 'Heartbeat handler failed, will be tried later.') + } + } else { + this.stopHeartbeat() + } + }, context.adapterSettings.WS_HEARTBEAT_INTERVAL_MS) + + this.heartbeatInterval = intervalId + } + } + + stopHeartbeat(): void { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval) + this.heartbeatInterval = undefined + } + } + buildConnectionHandlers( context: EndpointContext, connection: WebSocket, @@ -192,6 +236,7 @@ export class WebSocketTransport< await this.config.handlers.open(connection, context) logger.debug('Successfully executed connection opened handler') } + this.startHeartbeat(context) connectionReadyResolve(event.target) }, @@ -262,6 +307,7 @@ export class WebSocketTransport< this.config.handlers.close(event, context) logger.debug('Successfully executed connection close handler') } + this.stopHeartbeat() }, } } diff --git a/test/transports/websocket.test.ts b/test/transports/websocket.test.ts index 646023a0..be21f1ad 100644 --- a/test/transports/websocket.test.ts +++ b/test/transports/websocket.test.ts @@ -2,7 +2,8 @@ import { InstalledClock } from '@sinonjs/fake-timers' import { installTimers } from '../helper' import untypedTest, { TestFn } from 'ava' import { Server } from 'mock-socket' -import { Adapter, AdapterEndpoint } from '../../src/adapter' +import WebSocket from 'ws' +import { Adapter, AdapterEndpoint, EndpointContext } from '../../src/adapter' import { AdapterConfig, EmptyCustomSettings } from '../../src/config' import { metrics as eaMetrics } from '../../src/metrics' import { @@ -57,7 +58,13 @@ type WebSocketTypes = { const BACKGROUND_EXECUTE_MS_WS = 5000 -const createAdapter = (envDefaultOverrides: Record): Adapter => { +const createAdapter = ( + envDefaultOverrides: Record, + heartbeatHandler?: ( + connection: WebSocket, + context: EndpointContext, + ) => Promise | void, +): Adapter => { const websocketTransport = new WebSocketTransport({ url: () => ENDPOINT_URL, options: () => { @@ -88,6 +95,7 @@ const createAdapter = (envDefaultOverrides: Record `S:${params.base}/${params.quote}`, @@ -532,7 +540,7 @@ test.serial( process.env['METRICS_ENABLED'] = 'false' await testAdapter.api.close() mockWsServer.close() - await t.context.clock.runAllAsync() + await t.context.clock.runToLastAsync() }, ) @@ -632,7 +640,7 @@ test.serial('does not crash the server when new connection errors', async (t) => t.pass() await testAdapter.api.close() mockWsServer.close() - await t.context.clock.runAllAsync() + await t.context.clock.runToLastAsync() }) test.serial('closed ws connection should have a 1000 status code', async (t) => { @@ -788,7 +796,7 @@ test.serial('does not hang the background execution if the open handler hangs', process.env['METRICS_ENABLED'] = 'false' await testAdapter.api.close() mockWsServer.close() - await t.context.clock.runAllAsync() + await t.context.clock.runToLastAsync() }) test.serial('if defined the close handler is called when the websocket is closed', async (t) => { @@ -884,7 +892,7 @@ test.serial('if defined the close handler is called when the websocket is closed await testAdapter.api.close() mockWsServer.close() - await t.context.clock.runAllAsync() + await t.context.clock.runToLastAsync() t.true(handlerCalled) }) @@ -988,7 +996,7 @@ test.serial( await testAdapter.api.close() mockWsServer.close() - await t.context.clock.runAllAsync() + await t.context.clock.runToLastAsync() t.true(handlerCalled) }, @@ -1099,4 +1107,170 @@ test.serial('can set reverse mapping and read from it', async (t) => { statusCode: 200, }, }) + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() +}) + +test.serial('sends heartbeat using ping at configured interval', async (t) => { + const base = 'ETH' + const quote = 'DOGE' + const HEARTBEAT_INTERVAL = 5000 + + // Mock WS + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + let heartbeatCallCount = 0 + + mockWsServer.on('connection', (socket) => { + socket.on('message', () => { + socket.send( + JSON.stringify({ + pair: `${base}/${quote}`, + value: price, + }), + ) + }) + }) + + const adapter = createAdapter( + { + WS_HEARTBEAT_INTERVAL_MS: HEARTBEAT_INTERVAL, + }, + () => { + heartbeatCallCount++ + }, + ) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base, quote }, + expectedResponse: { + data: { + result: price, + }, + result: price, + statusCode: 200, + }, + }) + + const heartBeatRounds = 2 + + await runAllUntilTime(t.context.clock, HEARTBEAT_INTERVAL * heartBeatRounds) + + t.is(heartbeatCallCount, heartBeatRounds) + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() +}) + +test.serial('stops heartbeat when connection closes', async (t) => { + const base = 'ETH' + const quote = 'DOGE' + const HEARTBEAT_INTERVAL = 5000 + // Mock WS + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + let heartbeatCallCount = 0 + + mockWsServer.on('connection', (socket) => { + socket.on('message', () => { + socket.send( + JSON.stringify({ + pair: `${base}/${quote}`, + value: price, + }), + ) + }) + }) + + const adapter = createAdapter( + { + WS_HEARTBEAT_INTERVAL_MS: HEARTBEAT_INTERVAL, + }, + () => { + heartbeatCallCount++ + }, + ) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base, quote }, + expectedResponse: { + data: { + result: price, + }, + result: price, + statusCode: 200, + }, + }) + + const heartbeatCountBeforeClose = heartbeatCallCount + + testAdapter.api.close() + mockWsServer.close() + + await runAllUntilTime(t.context.clock, HEARTBEAT_INTERVAL * 2) + + t.is(heartbeatCallCount, heartbeatCountBeforeClose) + + await t.context.clock.runToLastAsync() +}) + +test.serial('does not heartbeat when handler throws an error', async (t) => { + const base = 'ETH' + const quote = 'DOGE' + const HEARTBEAT_INTERVAL = 5000 + // Mock WS + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + let heartbeatCallCount = 0 + + mockWsServer.on('connection', (socket) => { + socket.on('message', () => { + socket.send( + JSON.stringify({ + pair: `${base}/${quote}`, + value: price, + }), + ) + }) + }) + + const adapter = createAdapter( + { + WS_HEARTBEAT_INTERVAL_MS: HEARTBEAT_INTERVAL, + }, + () => { + heartbeatCallCount++ + throw new Error('Heartbeat handler error') + }, + ) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base, quote }, + expectedResponse: { + data: { + result: price, + }, + result: price, + statusCode: 200, + }, + }) + + const heartBeatRounds = 2 + + await runAllUntilTime(t.context.clock, HEARTBEAT_INTERVAL * heartBeatRounds) + + t.is(heartbeatCallCount, heartBeatRounds) + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() })