Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions docs/components/transport-types/websocket-transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,55 @@ In the example above, after the connection is established a custom authenticatio

As shown in the first example, **builders** object contains two methods, **subscribeMessage** and **unsubscribeMessage**. These methods can be provided for the WS transport to use to send subscription/unsubscription messages to Data Provider. Both accept _params_ which is the current input parameters of the request and should return object or string as payload that will be sent to Data Provider. If the payload is object it will be automatically stringified to JSON.

### Heartbeat messages

Some WebSocket providers require periodic heartbeat messages to keep the connection alive. The `WebSocketTransport` supports sending heartbeat messages automatically at a configurable interval.

To enable heartbeat functionality, provide a `heartbeat` handler in the `handlers` object. The heartbeat will automatically start when the connection is opened and stop when the connection is closed.

The `heartbeat` handler receives the WebSocket connection and adapter context, allowing you to implement any heartbeat logic you need. You can use WebSocket protocol-level ping, send custom messages, or perform any other heartbeat-related operations.

**Using WebSocket protocol-level ping:**

```typescript
handlers: {
heartbeat: (connection) => {
connection.ping()
},
}
```

**Using custom heartbeat message:**

```typescript
handlers: {
heartbeat: (connection, context) => {
connection.send(JSON.stringify({
type: 'ping',
timestamp: Date.now(),
}))
},
}
```

**Using ping with optional data:**

```typescript
handlers: {
heartbeat: (connection) => {
connection.ping('heartbeat-data')
},
}
```

The heartbeat interval is controlled by the `WS_HEARTBEAT_INTERVAL_MS` adapter setting (default: 10000ms). The heartbeat will automatically stop if:

- The connection is closed
- The connection state is no longer `OPEN`
- A new heartbeat is started (replaces the previous one)

**Note:** The heartbeat only starts if the `heartbeat` handler is provided. If you don't need heartbeat functionality, simply omit the `heartbeat` handler.

### Retrieving and storing the response or errors

As shown in the first example, the **handlers** object accepts a function called **message** which will be executed when Data Provider sends a message through the WS connection. It takes this message as its first argument and the adapter context as the second, and should build and return a list of response objects (_ProviderResult_) that will be stored in the response cache for the endpoint.
Expand Down
1 change: 1 addition & 0 deletions docs/reference-tables/ea-settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,6 @@
| TLS_PUBLIC_KEY | string | undefined | Base64 Public Key of TSL/SSL certificate | - Value must be a valid base64 string | |
| WARMUP_SUBSCRIPTION_TTL | number | 300000 | TTL for batch warmer subscriptions | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 0 | 3600000 |
| WS_CONNECTION_OPEN_TIMEOUT | number | 10000 | The maximum amount of time in milliseconds to wait for the websocket connection to open (including custom open handler) | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 500 | 30000 |
| WS_HEARTBEAT_INTERVAL_MS | number | 10000 | The number of ms between each hearbeat message that EA sends to server, only works if heartbeat handler is provided | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 5000 | 300000 |
| WS_SUBSCRIPTION_TTL | number | 120000 | The time in ms a request will live in the subscription set before becoming stale | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 0 | 3600000 |
| WS_SUBSCRIPTION_UNRESPONSIVE_TTL | number | 120000 | The maximum acceptable time (in milliseconds) since the last message was received and stored in the cache on a WebSocket connection before it is considered unresponsive, causing the adapter to close and attempt to reopen it. | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 1000 | 180000 |
7 changes: 7 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ export const BaseSettingsDefinition = {
default: 10_000,
validate: validator.integer({ min: 500, max: 30_000 }),
},
WS_HEARTBEAT_INTERVAL_MS: {
description:
'The number of ms between each hearbeat message that EA sends to server, only works if heartbeat handler is provided',
type: 'number',
default: 10_000,
validate: validator.integer({ min: 5_000, max: 300_000 }),
},
CACHE_POLLING_MAX_RETRIES: {
description:
'Max amount of times to attempt to find EA response in the cache after the Transport has been set up',
Expand Down
46 changes: 46 additions & 0 deletions src/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,20 @@ export interface WebSocketTransportConfig<T extends WebsocketTransportGenerics>
* Note: any listeners set in this method will be cleared after its execution.
*
* @param wsConnection - the WebSocket with an established connection
* @param context - the background context for the Adapter
* @returns an empty Promise, or void
*/
open?: (wsConnection: WebSocket, context: EndpointContext<T>) => Promise<void> | void

/**
* Handles when client is ready to send a heartbeat to server
*
* @param wsConnection - the WebSocket with an established connection
* @param context - the background context for the Adapter
* @returns an empty Promise, or void
*/
heartbeat?: (wsConnection: WebSocket, context: EndpointContext<T>) => Promise<void> | void

/**
* Handles when the websocket connection dispatches an error event
* Optional to let the adapter handle the event in its own way if it decides to
Expand Down Expand Up @@ -159,6 +169,7 @@ export class WebSocketTransport<
lastMessageReceivedAt = 0
connectionOpenedAt = 0
streamHandlerInvocationsWithNoConnection = 0
heartbeatInterval?: NodeJS.Timeout

constructor(private config: WebSocketTransportConfig<T>) {
super()
Expand All @@ -179,6 +190,39 @@ export class WebSocketTransport<
return JSON.parse(data.toString()) as T['Provider']['WsMessage']
}

startHeartbeat(context: EndpointContext<T>): void {
if (this.config.handlers.heartbeat) {
this.stopHeartbeat()

const intervalId = setInterval(async () => {
if (this.heartbeatInterval !== intervalId) {
clearInterval(intervalId)
return
}

if (this.wsConnection && this.wsConnection.readyState === WebSocket.OPEN) {
try {
logger.debug('Calling heartbeat handler')
await this.config.handlers.heartbeat?.(this.wsConnection, context)
} catch (error) {
logger.warn({ error }, 'Heartbeat handler failed, will be tried later.')
}
} else {
this.stopHeartbeat()
}
}, context.adapterSettings.WS_HEARTBEAT_INTERVAL_MS)

this.heartbeatInterval = intervalId
}
}

stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval)
this.heartbeatInterval = undefined
}
}

buildConnectionHandlers(
context: EndpointContext<T>,
connection: WebSocket,
Expand All @@ -192,6 +236,7 @@ export class WebSocketTransport<
await this.config.handlers.open(connection, context)
logger.debug('Successfully executed connection opened handler')
}
this.startHeartbeat(context)
connectionReadyResolve(event.target)
},

Expand Down Expand Up @@ -262,6 +307,7 @@ export class WebSocketTransport<
this.config.handlers.close(event, context)
logger.debug('Successfully executed connection close handler')
}
this.stopHeartbeat()
},
}
}
Expand Down
Loading
Loading