diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index d17a212dff..3f32cf2d41 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -1,13 +1,24 @@ +/* eslint-disable no-console */ +/* eslint-disable @typescript-eslint/no-empty-function */ + import { PythLazerClient } from "../src/index.js"; -/* eslint-disable no-console */ -const client = new PythLazerClient("ws://127.0.0.1:1234/v1/stream", "ctoken1"); +// Ignore debug messages +console.debug = () => {}; + +const client = new PythLazerClient( + ["wss://pyth-lazer.dourolabs.app/v1/stream"], + "access_token", + 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. + console // Optionally log socket operations (to the console in this case.) +); + client.addMessageListener((message) => { - console.log("got message:", message); + console.info("got message:", message); switch (message.type) { case "json": { if (message.value.type == "streamUpdated") { - console.log( + console.info( "stream updated for subscription", message.value.subscriptionId, ":", @@ -18,24 +29,42 @@ client.addMessageListener((message) => { } case "binary": { if ("solana" in message.value) { - console.log("solana message:", message.value.solana?.toString("hex")); + console.info("solana message:", message.value.solana?.toString("hex")); } if ("evm" in message.value) { - console.log("evm message:", message.value.evm?.toString("hex")); + console.info("evm message:", message.value.evm?.toString("hex")); } break; } } }); -client.ws.addEventListener("open", () => { - client.send({ - type: "subscribe", - subscriptionId: 1, - priceFeedIds: [1, 2], - properties: ["price"], - chains: ["solana"], - deliveryFormat: "json", - channel: "fixed_rate@200ms", - jsonBinaryEncoding: "hex", - }); + +// Create and remove one or more subscriptions on the fly +await client.subscribe({ + type: "subscribe", + subscriptionId: 1, + priceFeedIds: [1, 2], + properties: ["price"], + chains: ["solana"], + deliveryFormat: "binary", + channel: "fixed_rate@200ms", + parsed: false, + jsonBinaryEncoding: "base64", +}); +await client.subscribe({ + type: "subscribe", + subscriptionId: 2, + priceFeedIds: [1, 2, 3, 4, 5], + properties: ["price"], + chains: ["evm"], + deliveryFormat: "json", + channel: "fixed_rate@200ms", + parsed: true, + jsonBinaryEncoding: "hex", }); + +await new Promise((resolve) => setTimeout(resolve, 10_000)); + +await client.unsubscribe(1); +await client.unsubscribe(2); +client.shutdown(); diff --git a/lazer/sdk/js/package.json b/lazer/sdk/js/package.json index 0abe25dc25..2b975af960 100644 --- a/lazer/sdk/js/package.json +++ b/lazer/sdk/js/package.json @@ -1,6 +1,6 @@ { "name": "@pythnetwork/pyth-lazer-sdk", - "version": "0.1.2", + "version": "0.2.0", "description": "Pyth Lazer SDK", "publishConfig": { "access": "public" @@ -63,6 +63,8 @@ "@solana/buffer-layout": "^4.0.1", "@solana/web3.js": "^1.98.0", "isomorphic-ws": "^5.0.0", - "ws": "^8.18.0" + "ws": "^8.18.0", + "@isaacs/ttlcache": "^1.4.1", + "ts-log": "^2.2.7" } } diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index c887edef16..fc732e5acc 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -1,4 +1,5 @@ import WebSocket from "isomorphic-ws"; +import { dummyLogger, type Logger } from "ts-log"; import { BINARY_UPDATE_FORMAT_MAGIC, @@ -9,6 +10,7 @@ import { type Response, SOLANA_FORMAT_MAGIC_BE, } from "./protocol.js"; +import { WebSocketPool } from "./socket/web-socket-pool.js"; export type BinaryResponse = { subscriptionId: number; @@ -28,52 +30,58 @@ const UINT32_NUM_BYTES = 4; const UINT64_NUM_BYTES = 8; export class PythLazerClient { - ws: WebSocket; + wsp: WebSocketPool; - constructor(url: string, token: string) { - const finalUrl = new URL(url); - finalUrl.searchParams.append("ACCESS_TOKEN", token); - this.ws = new WebSocket(finalUrl); + /** + * Creates a new PythLazerClient instance. + * @param urls - List of WebSocket URLs of the Pyth Lazer service + * @param token - The access token for authentication + * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. + * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`. + */ + constructor( + urls: string[], + token: string, + numConnections = 3, + logger: Logger = dummyLogger + ) { + this.wsp = new WebSocketPool(urls, token, numConnections, logger); } addMessageListener(handler: (event: JsonOrBinaryResponse) => void) { - this.ws.addEventListener("message", (event: WebSocket.MessageEvent) => { - if (typeof event.data == "string") { + this.wsp.addMessageListener((data: WebSocket.Data) => { + if (typeof data == "string") { handler({ type: "json", - value: JSON.parse(event.data) as Response, + value: JSON.parse(data) as Response, }); - } else if (Buffer.isBuffer(event.data)) { + } else if (Buffer.isBuffer(data)) { let pos = 0; - const magic = event.data - .subarray(pos, pos + UINT32_NUM_BYTES) - .readUint32BE(); + const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32BE(); pos += UINT32_NUM_BYTES; if (magic != BINARY_UPDATE_FORMAT_MAGIC) { throw new Error("binary update format magic mismatch"); } // TODO: some uint64 values may not be representable as Number. const subscriptionId = Number( - event.data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE() + data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE() ); pos += UINT64_NUM_BYTES; const value: BinaryResponse = { subscriptionId }; - while (pos < event.data.length) { - const len = event.data - .subarray(pos, pos + UINT16_NUM_BYTES) - .readUint16BE(); + while (pos < data.length) { + const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE(); pos += UINT16_NUM_BYTES; - const magic = event.data + const magic = data .subarray(pos, pos + UINT32_NUM_BYTES) .readUint32BE(); if (magic == EVM_FORMAT_MAGIC) { - value.evm = event.data.subarray(pos, pos + len); + value.evm = data.subarray(pos, pos + len); } else if (magic == SOLANA_FORMAT_MAGIC_BE) { - value.solana = event.data.subarray(pos, pos + len); + value.solana = data.subarray(pos, pos + len); } else if (magic == PARSED_FORMAT_MAGIC) { value.parsed = JSON.parse( - event.data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString() + data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString() ) as ParsedPayload; } else { throw new Error("unknown magic: " + magic.toString()); @@ -87,7 +95,22 @@ export class PythLazerClient { }); } - send(request: Request) { - this.ws.send(JSON.stringify(request)); + async subscribe(request: Request): Promise { + if (request.type !== "subscribe") { + throw new Error("Request must be a subscribe request"); + } + await this.wsp.addSubscription(request); + } + + async unsubscribe(subscriptionId: number): Promise { + await this.wsp.removeSubscription(subscriptionId); + } + + async send(request: Request): Promise { + await this.wsp.sendRequest(request); + } + + shutdown(): void { + this.wsp.shutdown(); } } diff --git a/lazer/sdk/js/src/socket/resilient-web-socket.ts b/lazer/sdk/js/src/socket/resilient-web-socket.ts new file mode 100644 index 0000000000..7b221f3245 --- /dev/null +++ b/lazer/sdk/js/src/socket/resilient-web-socket.ts @@ -0,0 +1,195 @@ +import type { ClientRequestArgs } from "node:http"; + +import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws"; +import type { Logger } from "ts-log"; + +// Reconnect with expo backoff if we don't get a message or ping for 10 seconds +const HEARTBEAT_TIMEOUT_DURATION = 10_000; + +/** + * This class wraps websocket to provide a resilient web socket client. + * + * It will reconnect if connection fails with exponential backoff. Also, it will reconnect + * if it receives no ping request or regular message from server within a while as indication + * of timeout (assuming the server sends either regularly). + * + * This class also logs events if logger is given and by replacing onError method you can handle + * connection errors yourself (e.g: do not retry and close the connection). + */ +export class ResilientWebSocket { + endpoint: string; + wsClient: undefined | WebSocket; + wsUserClosed: boolean; + private wsOptions: ClientOptions | ClientRequestArgs | undefined; + private wsFailedAttempts: number; + private heartbeatTimeout: undefined | NodeJS.Timeout; + private logger: undefined | Logger; + + onError: (error: ErrorEvent) => void; + onMessage: (data: WebSocket.Data) => void; + onReconnect: () => void; + constructor( + endpoint: string, + wsOptions?: ClientOptions | ClientRequestArgs, + logger?: Logger + ) { + this.endpoint = endpoint; + this.wsOptions = wsOptions; + this.logger = logger; + + this.wsFailedAttempts = 0; + this.onError = (error: ErrorEvent) => { + this.logger?.error(error.error); + }; + this.wsUserClosed = true; + this.onMessage = (data: WebSocket.Data): void => { + void data; + }; + this.onReconnect = (): void => { + // Empty function, can be set by the user. + }; + } + + async send(data: string | Buffer) { + this.logger?.info(`Sending message`); + + await this.waitForMaybeReadyWebSocket(); + + if (this.wsClient === undefined) { + this.logger?.error( + "Couldn't connect to the websocket server. Error callback is called." + ); + } else { + this.wsClient.send(data); + } + } + + startWebSocket(): void { + if (this.wsClient !== undefined) { + return; + } + + this.logger?.info(`Creating Web Socket client`); + + this.wsClient = new WebSocket(this.endpoint, this.wsOptions); + this.wsUserClosed = false; + + this.wsClient.addEventListener("open", () => { + this.wsFailedAttempts = 0; + this.resetHeartbeat(); + }); + + this.wsClient.addEventListener("error", (event) => { + this.onError(event); + }); + + this.wsClient.addEventListener("message", (event) => { + this.resetHeartbeat(); + this.onMessage(event.data); + }); + + this.wsClient.addEventListener("close", () => { + void this.handleClose(); + }); + + // Handle ping events if supported (Node.js only) + if ("on" in this.wsClient) { + // Ping handler is undefined in browser side + this.wsClient.on("ping", () => { + this.logger?.info("Ping received"); + this.resetHeartbeat(); + }); + } + } + + /** + * Reset the heartbeat timeout. This is called when we receive any message (ping or regular) + * from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION, + * we assume the connection is dead and reconnect. + */ + private resetHeartbeat(): void { + if (this.heartbeatTimeout !== undefined) { + clearTimeout(this.heartbeatTimeout); + } + + this.heartbeatTimeout = setTimeout(() => { + this.logger?.warn("Connection timed out. Reconnecting..."); + this.wsClient?.terminate(); + void this.restartUnexpectedClosedWebsocket(); + }, HEARTBEAT_TIMEOUT_DURATION); + } + + private async waitForMaybeReadyWebSocket(): Promise { + let waitedTime = 0; + while ( + this.wsClient !== undefined && + this.wsClient.readyState !== this.wsClient.OPEN + ) { + if (waitedTime > 5000) { + this.wsClient.close(); + return; + } else { + waitedTime += 10; + await sleep(10); + } + } + } + + private async handleClose(): Promise { + if (this.heartbeatTimeout !== undefined) { + clearTimeout(this.heartbeatTimeout); + } + + if (this.wsUserClosed) { + this.logger?.info("The connection has been closed successfully."); + } else { + this.wsFailedAttempts += 1; + this.wsClient = undefined; + const waitTime = expoBackoff(this.wsFailedAttempts); + + this.logger?.error( + "Connection closed unexpectedly or because of timeout. Reconnecting after " + + String(waitTime) + + "ms." + ); + + await sleep(waitTime); + await this.restartUnexpectedClosedWebsocket(); + } + } + + private async restartUnexpectedClosedWebsocket(): Promise { + if (this.wsUserClosed) { + return; + } + + this.startWebSocket(); + await this.waitForMaybeReadyWebSocket(); + + if (this.wsClient === undefined) { + this.logger?.error( + "Couldn't reconnect to websocket. Error callback is called." + ); + return; + } + + this.onReconnect(); + } + + closeWebSocket(): void { + if (this.wsClient !== undefined) { + const client = this.wsClient; + this.wsClient = undefined; + client.close(); + } + this.wsUserClosed = true; + } +} + +async function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function expoBackoff(attempts: number): number { + return 2 ** attempts * 100; +} diff --git a/lazer/sdk/js/src/socket/web-socket-pool.ts b/lazer/sdk/js/src/socket/web-socket-pool.ts new file mode 100644 index 0000000000..c35d9c6f3c --- /dev/null +++ b/lazer/sdk/js/src/socket/web-socket-pool.ts @@ -0,0 +1,197 @@ +import TTLCache from "@isaacs/ttlcache"; +import WebSocket from "isomorphic-ws"; +import { dummyLogger, type Logger } from "ts-log"; + +import { ResilientWebSocket } from "./resilient-web-socket.js"; +import type { Request, Response } from "../protocol.js"; + +// Number of redundant parallel WebSocket connections +const DEFAULT_NUM_CONNECTIONS = 3; + +export class WebSocketPool { + rwsPool: ResilientWebSocket[]; + private cache: TTLCache; + private subscriptions: Map; // id -> subscription Request + private messageListeners: ((event: WebSocket.Data) => void)[]; + + /** + * Creates a new WebSocketPool instance that uses multiple redundant WebSocket connections for reliability. + * Usage semantics are similar to using a regular WebSocket client. + * @param urls - List of WebSocket URLs to connect to + * @param token - Authentication token to use for the connections + * @param numConnections - Number of parallel WebSocket connections to maintain (default: 3) + * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`. + */ + constructor( + urls: string[], + token: string, + numConnections: number = DEFAULT_NUM_CONNECTIONS, + private readonly logger: Logger = dummyLogger + ) { + if (urls.length === 0) { + throw new Error("No URLs provided"); + } + // This cache is used to deduplicate messages received across different websocket clients in the pool. + // A TTL cache is used to prevent unbounded memory usage. A very short TTL of 10 seconds is chosen since + // deduplication only needs to happen between messages received very close together in time. + this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds + this.rwsPool = []; + this.subscriptions = new Map(); + this.messageListeners = []; + for (let i = 0; i < numConnections; i++) { + const url = urls[i % urls.length]; + if (!url) { + throw new Error(`URLs must not be null or empty`); + } + const wsOptions = { + headers: { + Authorization: `Bearer ${token}`, + }, + }; + const rws = new ResilientWebSocket(url, wsOptions, logger); + + // If a websocket client unexpectedly disconnects, ResilientWebSocket will reestablish + // the connection and call the onReconnect callback. + // When we reconnect, replay all subscription messages to resume the data stream. + rws.onReconnect = () => { + if (rws.wsUserClosed) { + return; + } + for (const [, request] of this.subscriptions) { + try { + void rws.send(JSON.stringify(request)); + } catch (error) { + this.logger.error( + "Failed to resend subscription on reconnect:", + error + ); + } + } + }; + // Handle all client messages ourselves. Dedupe before sending to registered message handlers. + rws.onMessage = this.dedupeHandler; + this.rwsPool.push(rws); + } + + // Let it rip + // TODO: wait for sockets to receive `open` msg before subscribing? + for (const rws of this.rwsPool) { + rws.startWebSocket(); + } + + this.logger.info( + `Using ${numConnections.toString()} redundant WebSocket connections` + ); + } + + /** + * Checks for error responses in JSON messages and throws appropriate errors + */ + private handleErrorMessages(data: string): void { + const message = JSON.parse(data) as Response; + if (message.type === "subscriptionError") { + throw new Error( + `Error occurred for subscription ID ${String( + message.subscriptionId + )}: ${message.error}` + ); + } else if (message.type === "error") { + throw new Error(`Error: ${message.error}`); + } + } + + /** + * Handles incoming websocket messages by deduplicating identical messages received across + * multiple connections before forwarding to registered handlers + */ + dedupeHandler = (data: WebSocket.Data): void => { + // For string data, use the whole string as the cache key. This avoids expensive JSON parsing during deduping. + // For binary data, use the hex string representation as the cache key + const cacheKey = + typeof data === "string" + ? data + : Buffer.from(data as Buffer).toString("hex"); + + // If we've seen this exact message recently, drop it + if (this.cache.has(cacheKey)) { + this.logger.debug("Dropping duplicate message"); + return; + } + + // Haven't seen this message, cache it and forward to handlers + this.cache.set(cacheKey, true); + + // Check for errors in JSON responses + if (typeof data === "string") { + this.handleErrorMessages(data); + } + + for (const handler of this.messageListeners) { + handler(data); + } + }; + + /** + * Sends a message to all websockets in the pool + * @param request - The request to send + */ + async sendRequest(request: Request): Promise { + // Send to all websockets in the pool + const sendPromises = this.rwsPool.map(async (rws) => { + try { + await rws.send(JSON.stringify(request)); + } catch (error) { + this.logger.error("Failed to send request:", error); + throw error; // Re-throw the error + } + }); + await Promise.all(sendPromises); + } + + /** + * Adds a subscription by sending a subscribe request to all websockets in the pool + * and storing it for replay on reconnection + * @param request - The subscription request to send + */ + async addSubscription(request: Request): Promise { + if (request.type !== "subscribe") { + throw new Error("Request must be a subscribe request"); + } + this.subscriptions.set(request.subscriptionId, request); + await this.sendRequest(request); + } + + /** + * Removes a subscription by sending an unsubscribe request to all websockets in the pool + * and removing it from stored subscriptions + * @param subscriptionId - The ID of the subscription to remove + */ + async removeSubscription(subscriptionId: number): Promise { + this.subscriptions.delete(subscriptionId); + const request: Request = { + type: "unsubscribe", + subscriptionId, + }; + await this.sendRequest(request); + } + + /** + * Adds a message handler function to receive websocket messages + * @param handler - Function that will be called with each received message + */ + addMessageListener(handler: (data: WebSocket.Data) => void): void { + this.messageListeners.push(handler); + } + + /** + * Elegantly closes all websocket connections in the pool + */ + shutdown(): void { + for (const rws of this.rwsPool) { + rws.closeWebSocket(); + } + this.rwsPool = []; + this.subscriptions.clear(); + this.messageListeners = []; + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8fbeacc3ff..83abc81069 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1525,6 +1525,9 @@ importers: lazer/sdk/js: dependencies: + '@isaacs/ttlcache': + specifier: ^1.4.1 + version: 1.4.1 '@solana/buffer-layout': specifier: ^4.0.1 version: 4.0.1 @@ -1534,6 +1537,9 @@ importers: isomorphic-ws: specifier: ^5.0.0 version: 5.0.0(ws@8.18.0(bufferutil@4.0.8)(utf-8-validate@5.0.10)) + ts-log: + specifier: ^2.2.7 + version: 2.2.7 ws: specifier: ^8.18.0 version: 8.18.0(bufferutil@4.0.8)(utf-8-validate@5.0.10) @@ -21667,6 +21673,9 @@ packages: ts-log@2.2.5: resolution: {integrity: sha512-PGcnJoTBnVGy6yYNFxWVNkdcAuAMstvutN9MgDJIV6L0oG8fB+ZNNy1T+wJzah8RPGor1mZuPQkVfXNDpy9eHA==} + ts-log@2.2.7: + resolution: {integrity: sha512-320x5Ggei84AxzlXp91QkIGSw5wgaLT6GeAH0KsqDmRZdVWW2OiSeVvElVoatk3f7nicwXlElXsoFkARiGE2yg==} + ts-mixer@6.0.4: resolution: {integrity: sha512-ufKpbmrugz5Aou4wcr5Wc1UUFWOLhq+Fm6qa6P0w0K5Qw2yhaUoiWszhCVuNQyNwrlGiscHOmqYoAox1PtvgjA==} @@ -33901,7 +33910,7 @@ snapshots: axios: 1.7.2 axios-retry: 3.9.1 isomorphic-ws: 4.0.1(ws@8.18.0(bufferutil@4.0.8)(utf-8-validate@6.0.4)) - ts-log: 2.2.5 + ts-log: 2.2.7 ws: 8.18.0(bufferutil@4.0.8)(utf-8-validate@6.0.4) transitivePeerDependencies: - bufferutil @@ -35972,13 +35981,13 @@ snapshots: dependencies: '@noble/hashes': 1.1.5 '@noble/secp256k1': 1.6.3 - '@scure/base': 1.1.7 + '@scure/base': 1.1.9 '@scure/bip32@1.1.5': dependencies: '@noble/hashes': 1.2.0 '@noble/secp256k1': 1.7.1 - '@scure/base': 1.1.7 + '@scure/base': 1.1.9 '@scure/bip32@1.3.2': dependencies: @@ -36012,7 +36021,7 @@ snapshots: '@scure/bip39@1.1.1': dependencies: '@noble/hashes': 1.2.0 - '@scure/base': 1.1.7 + '@scure/base': 1.1.9 '@scure/bip39@1.2.1': dependencies: @@ -58744,6 +58753,8 @@ snapshots: ts-log@2.2.5: {} + ts-log@2.2.7: {} + ts-mixer@6.0.4: {} ts-mocha@10.0.0(mocha@9.2.2):