diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index 7d88f0748e..f6170b542e 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -6,12 +6,24 @@ import { PythLazerClient } from "../src/index.js"; // Ignore debug messages console.debug = () => {}; -const client = await PythLazerClient.create( - ["wss://pyth-lazer.dourolabs.app/v1/stream"], - "access_token", - 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. - console, // Optionally log socket operations (to the console in this case.) -); +const client = await PythLazerClient.create({ + urls: [ + "wss://pyth-lazer-0.dourolabs.app/v1/stream", + "wss://pyth-lazer-1.dourolabs.app/v1/stream", + ], + token: "you-access-token-here", // Replace with your actual access token + numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4. + logger: console, // Optionally log socket operations (to the console in this case.) + onError: (error) => { + console.error("WebSocket error:", error); + }, + // Optional configuration for resilient WebSocket connections + rwsConfig: { + heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds + maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds + logAfterRetryCount: 10, // Optional log after how many retries + }, +}); // Read and process messages from the Lazer stream client.addMessageListener((message) => { @@ -47,7 +59,7 @@ client.addAllConnectionsDownListener(() => { }); // Create and remove one or more subscriptions on the fly -await client.subscribe({ +client.subscribe({ type: "subscribe", subscriptionId: 1, priceFeedIds: [1, 2], @@ -58,7 +70,7 @@ await client.subscribe({ parsed: false, jsonBinaryEncoding: "base64", }); -await client.subscribe({ +client.subscribe({ type: "subscribe", subscriptionId: 2, priceFeedIds: [1, 2, 3, 4, 5], @@ -72,6 +84,6 @@ await client.subscribe({ await new Promise((resolve) => setTimeout(resolve, 10_000)); -await client.unsubscribe(1); -await client.unsubscribe(2); +client.unsubscribe(1); +client.unsubscribe(2); client.shutdown(); diff --git a/lazer/sdk/js/package.json b/lazer/sdk/js/package.json index 8fe7a34600..c2e2a2b764 100644 --- a/lazer/sdk/js/package.json +++ b/lazer/sdk/js/package.json @@ -1,6 +1,6 @@ { "name": "@pythnetwork/pyth-lazer-sdk", - "version": "0.5.1", + "version": "1.0.0", "description": "Pyth Lazer SDK", "publishConfig": { "access": "public" diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index 16dd155144..b09b4eefad 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -1,9 +1,8 @@ import WebSocket from "isomorphic-ws"; -import type { Logger } from "ts-log"; -import { dummyLogger } from "ts-log"; import type { ParsedPayload, Request, Response } from "./protocol.js"; import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js"; +import type { WebSocketPoolConfig } from "./socket/websocket-pool.js"; import { WebSocketPool } from "./socket/websocket-pool.js"; export type BinaryResponse = { @@ -35,13 +34,8 @@ export class PythLazerClient { * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. The connections will round-robin across the provided URLs. * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`. */ - static async create( - urls: string[], - token: string, - numConnections = 3, - logger: Logger = dummyLogger, - ): Promise { - const wsp = await WebSocketPool.create(urls, token, numConnections, logger); + static async create(config: WebSocketPoolConfig): Promise { + const wsp = await WebSocketPool.create(config); return new PythLazerClient(wsp); } @@ -102,19 +96,19 @@ export class PythLazerClient { }); } - async subscribe(request: Request): Promise { + subscribe(request: Request) { if (request.type !== "subscribe") { throw new Error("Request must be a subscribe request"); } - await this.wsp.addSubscription(request); + this.wsp.addSubscription(request); } - async unsubscribe(subscriptionId: number): Promise { - await this.wsp.removeSubscription(subscriptionId); + unsubscribe(subscriptionId: number) { + this.wsp.removeSubscription(subscriptionId); } - async send(request: Request): Promise { - await this.wsp.sendRequest(request); + send(request: Request) { + this.wsp.sendRequest(request); } /** diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index 8966804a57..6131100d97 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -3,49 +3,67 @@ import type { ClientRequestArgs } from "node:http"; import type { ClientOptions, ErrorEvent } from "isomorphic-ws"; import WebSocket from "isomorphic-ws"; import type { Logger } from "ts-log"; +import { dummyLogger } from "ts-log"; -const HEARTBEAT_TIMEOUT_DURATION = 10_000; -const CONNECTION_TIMEOUT = 5000; +const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds +const DEFAULT_MAX_RETRY_DELAY_MS = 1000; // 1 second' +const DEFAULT_LOG_AFTER_RETRY_COUNT = 10; -export class ResilientWebSocket { +export type ResilientWebSocketConfig = { endpoint: string; + wsOptions?: ClientOptions | ClientRequestArgs | undefined; + logger?: Logger; + heartbeatTimeoutDurationMs?: number; + maxRetryDelayMs?: number; + logAfterRetryCount?: number; +}; + +export class ResilientWebSocket { + private endpoint: string; + private wsOptions?: ClientOptions | ClientRequestArgs | undefined; + private logger: Logger; + private heartbeatTimeoutDurationMs: number; + private maxRetryDelayMs: number; + private logAfterRetryCount: number; + wsClient: undefined | WebSocket; - wsUserClosed: boolean; - private wsOptions: ClientOptions | ClientRequestArgs | undefined; + wsUserClosed = false; private wsFailedAttempts: number; - private heartbeatTimeout: undefined | NodeJS.Timeout; - private logger: undefined | Logger; - private connectionPromise: Promise | undefined; - private resolveConnection: (() => void) | undefined; - private rejectConnection: ((error: Error) => void) | undefined; + private heartbeatTimeout?: NodeJS.Timeout | undefined; + private retryTimeout?: NodeJS.Timeout | undefined; private _isReconnecting = false; - get isReconnecting(): boolean { + isReconnecting(): boolean { return this._isReconnecting; } - get isConnected(): boolean { + isConnected(): this is this & { wsClient: WebSocket } { return this.wsClient?.readyState === WebSocket.OPEN; } + private shouldLogRetry() { + return this.wsFailedAttempts % this.logAfterRetryCount === 0; + } + onError: (error: ErrorEvent) => void; onMessage: (data: WebSocket.Data) => void; onReconnect: () => void; - constructor( - endpoint: string, - wsOptions?: ClientOptions | ClientRequestArgs, - logger?: Logger, - ) { - this.endpoint = endpoint; - this.wsOptions = wsOptions; - this.logger = logger; + constructor(config: ResilientWebSocketConfig) { + this.endpoint = config.endpoint; + this.wsOptions = config.wsOptions; + this.logger = config.logger ?? dummyLogger; + this.heartbeatTimeoutDurationMs = + config.heartbeatTimeoutDurationMs ?? + DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS; + this.maxRetryDelayMs = config.maxRetryDelayMs ?? DEFAULT_MAX_RETRY_DELAY_MS; + this.logAfterRetryCount = + config.logAfterRetryCount ?? DEFAULT_LOG_AFTER_RETRY_COUNT; this.wsFailedAttempts = 0; this.onError = (error: ErrorEvent) => { - this.logger?.error(error.error); + void error; }; - this.wsUserClosed = true; this.onMessage = (data: WebSocket.Data): void => { void data; }; @@ -54,62 +72,67 @@ export class ResilientWebSocket { }; } - async send(data: string | Buffer) { - this.logger?.info(`Sending message`); - - await this.waitForMaybeReadyWebSocket(); + send(data: string | Buffer) { + this.logger.debug(`Sending message`); - if (this.wsClient === undefined) { - this.logger?.error( - "Couldn't connect to the websocket server. Error callback is called.", - ); - } else { + if (this.isConnected()) { this.wsClient.send(data); + } else { + this.logger.warn( + `WebSocket to ${this.endpoint} is not connected. Cannot send message.`, + ); } } - async startWebSocket(): Promise { - if (this.wsClient !== undefined) { - // If there's an existing connection attempt, wait for it - if (this.connectionPromise) { - return this.connectionPromise; - } + startWebSocket() { + if (this.wsUserClosed) { + this.logger.error( + "Connection was explicitly closed by user. Will not reconnect.", + ); return; } - this.logger?.info(`Creating Web Socket client`); + if (this.wsClient !== undefined) { + this.logger.info("WebSocket client already started."); + return; + } - // Create a new promise for this connection attempt - this.connectionPromise = new Promise((resolve, reject) => { - this.resolveConnection = resolve; - this.rejectConnection = reject; - }); + if (this.wsFailedAttempts == 0) { + this.logger.info(`Creating Web Socket client`); + } - // Set a connection timeout - const timeoutId = setTimeout(() => { - if (this.rejectConnection) { - this.rejectConnection( - new Error(`Connection timeout after ${String(CONNECTION_TIMEOUT)}ms`), - ); - } - }, CONNECTION_TIMEOUT); + if (this.retryTimeout !== undefined) { + clearTimeout(this.retryTimeout); + this.retryTimeout = undefined; + } this.wsClient = new WebSocket(this.endpoint, this.wsOptions); - this.wsUserClosed = false; this.wsClient.addEventListener("open", () => { + this.logger.info("WebSocket connection established"); this.wsFailedAttempts = 0; - this.resetHeartbeat(); - clearTimeout(timeoutId); this._isReconnecting = false; - this.resolveConnection?.(); + this.resetHeartbeat(); + this.onReconnect(); + }); + + this.wsClient.addEventListener("close", (e) => { + if (this.wsUserClosed) { + this.logger.info( + `WebSocket connection to ${this.endpoint} closed by user`, + ); + } else { + if (this.shouldLogRetry()) { + this.logger.warn( + `WebSocket connection to ${this.endpoint} closed unexpectedly: Code: ${e.code.toString()}`, + ); + } + this.handleReconnect(); + } }); this.wsClient.addEventListener("error", (event) => { this.onError(event); - if (this.rejectConnection) { - this.rejectConnection(new Error("WebSocket connection failed")); - } }); this.wsClient.addEventListener("message", (event) => { @@ -117,22 +140,12 @@ export class ResilientWebSocket { this.onMessage(event.data); }); - this.wsClient.addEventListener("close", () => { - clearTimeout(timeoutId); - if (this.rejectConnection) { - this.rejectConnection(new Error("WebSocket closed before connecting")); - } - void this.handleClose(); - }); - if ("on" in this.wsClient) { this.wsClient.on("ping", () => { - this.logger?.info("Ping received"); + this.logger.info("Ping received"); this.resetHeartbeat(); }); } - - return this.connectionPromise; } private resetHeartbeat(): void { @@ -141,91 +154,66 @@ export class ResilientWebSocket { } this.heartbeatTimeout = setTimeout(() => { - this.logger?.warn("Connection timed out. Reconnecting..."); + this.logger.warn("Connection timed out. Reconnecting..."); this.wsClient?.terminate(); - void this.restartUnexpectedClosedWebsocket(); - }, HEARTBEAT_TIMEOUT_DURATION); + this.handleReconnect(); + }, this.heartbeatTimeoutDurationMs); } - private async waitForMaybeReadyWebSocket(): Promise { - let waitedTime = 0; - while ( - this.wsClient !== undefined && - this.wsClient.readyState !== this.wsClient.OPEN - ) { - if (waitedTime > 5000) { - this.wsClient.close(); - return; - } else { - waitedTime += 10; - await sleep(10); - } + private handleReconnect() { + if (this.wsUserClosed) { + this.logger.info( + "WebSocket connection closed by user, not reconnecting.", + ); + return; } - } - private async handleClose(): Promise { if (this.heartbeatTimeout !== undefined) { clearTimeout(this.heartbeatTimeout); } - if (this.wsUserClosed) { - this.logger?.info("The connection has been closed successfully."); - } else { - this.wsFailedAttempts += 1; - this.wsClient = undefined; - this.connectionPromise = undefined; - this.resolveConnection = undefined; - this.rejectConnection = undefined; - - const waitTime = expoBackoff(this.wsFailedAttempts); - - this._isReconnecting = true; - this.logger?.error( - "Connection closed unexpectedly or because of timeout. Reconnecting after " + - String(waitTime) + - "ms.", - ); - - await sleep(waitTime); - await this.restartUnexpectedClosedWebsocket(); + if (this.retryTimeout !== undefined) { + clearTimeout(this.retryTimeout); } - } - private async restartUnexpectedClosedWebsocket(): Promise { - if (this.wsUserClosed) { - return; - } + this.wsFailedAttempts += 1; + this.wsClient = undefined; - await this.startWebSocket(); - await this.waitForMaybeReadyWebSocket(); + this._isReconnecting = true; - if (this.wsClient === undefined) { - this.logger?.error( - "Couldn't reconnect to websocket. Error callback is called.", + if (this.shouldLogRetry()) { + this.logger.error( + "Connection closed unexpectedly or because of timeout. Reconnecting after " + + String(this.retryDelayMs()) + + "ms.", ); - return; } - this.onReconnect(); + this.retryTimeout = setTimeout(() => { + this.startWebSocket(); + }, this.retryDelayMs()); } closeWebSocket(): void { if (this.wsClient !== undefined) { - const client = this.wsClient; + this.wsClient.close(); this.wsClient = undefined; - this.connectionPromise = undefined; - this.resolveConnection = undefined; - this.rejectConnection = undefined; - client.close(); } this.wsUserClosed = true; } -} -async function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - -function expoBackoff(attempts: number): number { - return 2 ** attempts * 100; + /** + * Calculates the delay in milliseconds for exponential backoff based on the number of failed attempts. + * + * The delay increases exponentially with each attempt, starting at 20ms for the first attempt, + * and is capped at maxRetryDelayMs for attempts greater than or equal to 10. + * + * @returns The calculated delay in milliseconds before the next retry. + */ + private retryDelayMs(): number { + if (this.wsFailedAttempts >= 10) { + return this.maxRetryDelayMs; + } + return Math.min(2 ** this.wsFailedAttempts * 10, this.maxRetryDelayMs); + } } diff --git a/lazer/sdk/js/src/socket/websocket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts index f7b940bc56..6e57811c0c 100644 --- a/lazer/sdk/js/src/socket/websocket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -1,12 +1,23 @@ import TTLCache from "@isaacs/ttlcache"; +import type { ErrorEvent } from "isomorphic-ws"; import WebSocket from "isomorphic-ws"; import type { Logger } from "ts-log"; import { dummyLogger } from "ts-log"; import type { Request, Response } from "../protocol.js"; +import type { ResilientWebSocketConfig } from "./resilient-websocket.js"; import { ResilientWebSocket } from "./resilient-websocket.js"; -const DEFAULT_NUM_CONNECTIONS = 3; +const DEFAULT_NUM_CONNECTIONS = 4; + +export type WebSocketPoolConfig = { + urls: string[]; + token: string; + numConnections?: number; + logger?: Logger; + rwsConfig?: Omit; + onError?: (error: ErrorEvent) => void; +}; export class WebSocketPool { rwsPool: ResilientWebSocket[]; @@ -17,7 +28,7 @@ export class WebSocketPool { private wasAllDown = true; private checkConnectionStatesInterval: NodeJS.Timeout; - private constructor(private readonly logger: Logger = dummyLogger) { + private constructor(private readonly logger: Logger) { this.rwsPool = []; this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds this.subscriptions = new Map(); @@ -38,32 +49,32 @@ export class WebSocketPool { * @param numConnections - Number of parallel WebSocket connections to maintain (default: 3) * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`. */ - static async create( - urls: string[], - token: string, - numConnections: number = DEFAULT_NUM_CONNECTIONS, - logger: Logger = dummyLogger, - ): Promise { - if (urls.length === 0) { + static async create(config: WebSocketPoolConfig): Promise { + if (config.urls.length === 0) { throw new Error("No URLs provided"); } + const logger = config.logger ?? dummyLogger; const pool = new WebSocketPool(logger); - - // Create all websocket instances - const connectionPromises: Promise[] = []; + const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS; for (let i = 0; i < numConnections; i++) { - const url = urls[i % urls.length]; + const url = config.urls[i % config.urls.length]; if (!url) { throw new Error(`URLs must not be null or empty`); } const wsOptions = { + ...config.rwsConfig?.wsOptions, headers: { - Authorization: `Bearer ${token}`, + Authorization: `Bearer ${config.token}`, }, }; - const rws = new ResilientWebSocket(url, wsOptions, logger); + const rws = new ResilientWebSocket({ + ...config.rwsConfig, + endpoint: url, + wsOptions, + logger, + }); // If a websocket client unexpectedly disconnects, ResilientWebSocket will reestablish // the connection and call the onReconnect callback. @@ -73,7 +84,7 @@ export class WebSocketPool { } for (const [, request] of pool.subscriptions) { try { - void rws.send(JSON.stringify(request)); + rws.send(JSON.stringify(request)); } catch (error) { pool.logger.error( "Failed to resend subscription on reconnect:", @@ -83,25 +94,25 @@ export class WebSocketPool { } }; + if (config.onError) { + rws.onError = config.onError; + } // Handle all client messages ourselves. Dedupe before sending to registered message handlers. rws.onMessage = pool.dedupeHandler; pool.rwsPool.push(rws); - - // Start the websocket and collect the promise - connectionPromises.push(rws.startWebSocket()); + rws.startWebSocket(); } - // Wait for all connections to be established - try { - await Promise.all(connectionPromises); - } catch (error) { - // If any connection fails, clean up and throw - pool.shutdown(); - throw error; + pool.logger.info( + `Started WebSocketPool with ${numConnections.toString()} connections. Waiting for at least one to connect...`, + ); + + while (!pool.isAnyConnectionEstablished()) { + await new Promise((resolve) => setTimeout(resolve, 100)); } pool.logger.info( - `Successfully established ${numConnections.toString()} redundant WebSocket connections`, + `At least one WebSocket connection is established. WebSocketPool is ready.`, ); return pool; @@ -149,33 +160,27 @@ export class WebSocketPool { } }; - async sendRequest(request: Request): Promise { - const sendPromises = this.rwsPool.map(async (rws) => { - try { - await rws.send(JSON.stringify(request)); - } catch (error) { - this.logger.error("Failed to send request:", error); - throw error; - } - }); - await Promise.all(sendPromises); + sendRequest(request: Request) { + for (const rws of this.rwsPool) { + rws.send(JSON.stringify(request)); + } } - async addSubscription(request: Request): Promise { + addSubscription(request: Request) { if (request.type !== "subscribe") { throw new Error("Request must be a subscribe request"); } this.subscriptions.set(request.subscriptionId, request); - await this.sendRequest(request); + this.sendRequest(request); } - async removeSubscription(subscriptionId: number): Promise { + removeSubscription(subscriptionId: number) { this.subscriptions.delete(subscriptionId); const request: Request = { type: "unsubscribe", subscriptionId, }; - await this.sendRequest(request); + this.sendRequest(request); } addMessageListener(handler: (data: WebSocket.Data) => void): void { @@ -191,7 +196,11 @@ export class WebSocketPool { } private areAllConnectionsDown(): boolean { - return this.rwsPool.every((ws) => !ws.isConnected || ws.isReconnecting); + return this.rwsPool.every((ws) => !ws.isConnected() || ws.isReconnecting()); + } + + private isAnyConnectionEstablished(): boolean { + return this.rwsPool.some((ws) => ws.isConnected()); } private checkConnectionStates(): void {