Skip to content

Commit 6d5d449

Browse files
authored
enhance ws reconnect logic (#547)
1 parent 620e8eb commit 6d5d449

File tree

1 file changed

+58
-18
lines changed

1 file changed

+58
-18
lines changed

src/transports/websocket.ts

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import { connectionErrorLabels, recordWsMessageMetrics } from './metrics'
1010

1111
// Aliasing type for use at adapter level
1212
export { WebSocket, RawData as WebSocketRawData }
13-
13+
// Explicit initial grace (1s) after open for web socket; after that, rely on "time since last message"
14+
const INITIAL_WS_GRACE_MS = 1000
1415
const logger = makeLogger('WebSocketTransport')
1516

1617
type WebSocketClass = new (
@@ -159,7 +160,11 @@ export class WebSocketTransport<
159160
}
160161

161162
connectionClosed(): boolean {
162-
return !this.wsConnection || this.wsConnection.readyState === WebSocket.CLOSED
163+
return (
164+
!this.wsConnection ||
165+
this.wsConnection.readyState === WebSocket.CLOSED ||
166+
this.wsConnection.readyState === WebSocket.CLOSING
167+
)
163168
}
164169

165170
serializeMessage(payload: unknown): string {
@@ -187,6 +192,7 @@ export class WebSocketTransport<
187192

188193
// Called when any message is received by the open connection
189194
message: async (event: WebSocket.MessageEvent) => {
195+
this.lastMessageReceivedAt = Date.now()
190196
const parsed = this.deserializeMessage(event.data)
191197
censorLogs(() => logger.trace(`Got ws message: ${event.data}`))
192198
const providerDataReceived = Date.now()
@@ -202,8 +208,6 @@ export class WebSocketTransport<
202208
})
203209
logger.trace(`Writing ${results?.length ?? 0} responses to cache`)
204210
if (Array.isArray(results) && results.length > 0) {
205-
// Updating the last message received time here, to only care about messages we use
206-
this.lastMessageReceivedAt = Date.now()
207211
await this.responseCache.write(this.name, results)
208212
}
209213

@@ -338,20 +342,14 @@ export class WebSocketTransport<
338342
const now = Date.now()
339343
const timeSinceLastMessage = Math.max(0, now - this.lastMessageReceivedAt)
340344
const timeSinceConnectionOpened = Math.max(0, now - this.connectionOpenedAt)
341-
const timeSinceLastActivity = Math.min(timeSinceLastMessage, timeSinceConnectionOpened)
342345
const connectionUnresponsive =
343-
timeSinceLastActivity > 0 &&
344-
timeSinceLastActivity > context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL
346+
timeSinceConnectionOpened >= INITIAL_WS_GRACE_MS &&
347+
timeSinceLastMessage > context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL
345348

346349
let connectionClosed = this.connectionClosed()
347-
logger.trace(`WS conn staleness info:
348-
now: ${now} |
349-
timeSinceLastMessage: ${timeSinceLastMessage} |
350-
timeSinceConnectionOpened: ${timeSinceConnectionOpened} |
351-
timeSinceLastActivity: ${timeSinceLastActivity} |
352-
subscriptionUnresponsiveTtl: ${context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL} |
353-
connectionUnresponsive: ${connectionUnresponsive} |
354-
`)
350+
logger.trace(
351+
`WS conn staleness info: now: ${now} | timeSinceLastMessage: ${timeSinceLastMessage} | timeSinceConnectionOpened: ${timeSinceConnectionOpened} | subscriptionUnresponsiveTtl: ${context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL} | connectionUnresponsive: ${connectionUnresponsive} |`,
352+
)
355353

356354
// Check if we should close the current connection
357355
if (!connectionClosed && (urlChanged || connectionUnresponsive)) {
@@ -372,13 +370,18 @@ export class WebSocketTransport<
372370
// Check if connection was opened very recently; if so, wait a bit before continuing.
373371
// This is so if we just opened the connection and are waiting to receive some messages,
374372
// we don't close is immediately after and miss the chance to receive them
375-
if (timeSinceConnectionOpened < 1000) {
373+
if (timeSinceConnectionOpened < INITIAL_WS_GRACE_MS) {
376374
logger.info(
377375
`Connection was opened only ${timeSinceConnectionOpened}ms ago, waiting for that to get to 1s before continuing...`,
378376
)
379-
await sleep(1000 - timeSinceConnectionOpened)
377+
await sleep(INITIAL_WS_GRACE_MS - timeSinceConnectionOpened)
378+
}
379+
if (this.wsConnection && this.wsConnection.readyState !== WebSocket.CLOSED) {
380+
const old = this.wsConnection
381+
// Prevent any further sends/reads via this reference
382+
this.wsConnection = undefined
383+
await waitForGracefulClose(old, 1500)
380384
}
381-
this.wsConnection?.close(1000)
382385
connectionClosed = true
383386

384387
if (subscriptions.desired.length) {
@@ -504,3 +507,40 @@ export class WebsocketReverseMappingTransport<
504507
return this.requestMapping.get(value)
505508
}
506509
}
510+
511+
/**
512+
* Waits for a WebSocket connection to close gracefully.
513+
*
514+
* @param ws - The WebSocket instance (from the `ws` package).
515+
* @param gracefulMs - Max time in milliseconds to wait for a graceful close before terminating. Defaults to 1500 ms.
516+
* @returns A promise that resolves when the socket has closed or been terminated.
517+
*/
518+
async function waitForGracefulClose(ws: WebSocket, gracefulMs = 1500): Promise<void> {
519+
if (!ws || ws.readyState === WebSocket.CLOSED) {
520+
return
521+
}
522+
523+
const closed = new Promise<void>((resolve) => {
524+
ws.once('close', resolve)
525+
})
526+
527+
try {
528+
ws.close(1000)
529+
} catch (err) {
530+
logger.debug(`ws.close threw: ${(err as Error).message}`)
531+
}
532+
533+
try {
534+
await Promise.race([closed, sleep(gracefulMs)])
535+
} catch (err) {
536+
logger.debug(`waiting for close failed: ${(err as Error).message}`)
537+
}
538+
const state: number = ws.readyState
539+
if (state !== WebSocket.CLOSED) {
540+
try {
541+
ws.terminate()
542+
} catch (err) {
543+
logger.debug(`ws.terminate threw: ${(err as Error).message}`)
544+
}
545+
}
546+
}

0 commit comments

Comments
 (0)