Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion lazer/sdk/js/examples/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import { PythLazerClient } from "../src/index.js";
// Ignore debug messages
console.debug = () => {};

const client = new PythLazerClient(
const client = await PythLazerClient.create(
["wss://pyth-lazer.dourolabs.app/v1/stream"],
"access_token",
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
console // Optionally log socket operations (to the console in this case.)
);

// Read and process messages from the Lazer stream
client.addMessageListener((message) => {
console.info("got message:", message);
switch (message.type) {
Expand All @@ -39,6 +40,12 @@ client.addMessageListener((message) => {
}
});

// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down)
// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown().
client.addAllConnectionsDownListener(() => {
console.error("All connections are down!");
});

// Create and remove one or more subscriptions on the fly
await client.subscribe({
type: "subscribe",
Expand Down
2 changes: 1 addition & 1 deletion lazer/sdk/js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/pyth-lazer-sdk",
"version": "0.2.1",
"version": "0.3.0",
"description": "Pyth Lazer SDK",
"publishConfig": {
"access": "public"
Expand Down
28 changes: 22 additions & 6 deletions lazer/sdk/js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
type Response,
SOLANA_FORMAT_MAGIC_BE,
} from "./protocol.js";
import { WebSocketPool } from "./socket/web-socket-pool.js";
import { WebSocketPool } from "./socket/websocket-pool.js";

export type BinaryResponse = {
subscriptionId: number;
Expand All @@ -30,24 +30,31 @@ const UINT32_NUM_BYTES = 4;
const UINT64_NUM_BYTES = 8;

export class PythLazerClient {
wsp: WebSocketPool;
private constructor(private readonly wsp: WebSocketPool) {}

/**
* Creates a new PythLazerClient instance.
* @param urls - List of WebSocket URLs of the Pyth Lazer service
* @param token - The access token for authentication
* @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream.
* @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. The connections will round-robin across the provided URLs.
* @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
*/
constructor(
static async create(
urls: string[],
token: string,
numConnections = 3,
logger: Logger = dummyLogger
) {
this.wsp = new WebSocketPool(urls, token, numConnections, logger);
): Promise<PythLazerClient> {
const wsp = await WebSocketPool.create(urls, token, numConnections, logger);
return new PythLazerClient(wsp);
}

/**
* Adds a message listener that receives either JSON or binary responses from the WebSocket connections.
* The listener will be called for each message received, with deduplication across redundant connections.
* @param handler - Callback function that receives the parsed message. The message can be either a JSON response
* or a binary response containing EVM, Solana, or parsed payload data.
*/
addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
this.wsp.addMessageListener((data: WebSocket.Data) => {
if (typeof data == "string") {
Expand Down Expand Up @@ -110,6 +117,15 @@ export class PythLazerClient {
await this.wsp.sendRequest(request);
}

/**
* Registers a handler function that will be called whenever all WebSocket connections are down or attempting to reconnect.
* The connections may still try to reconnect in the background. To shut down the pool, call `shutdown()`.
* @param handler - Function to be called when all connections are down
*/
addAllConnectionsDownListener(handler: () => void): void {
this.wsp.addAllConnectionsDownListener(handler);
}

shutdown(): void {
this.wsp.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,9 @@ import type { ClientRequestArgs } from "node:http";
import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws";
import type { Logger } from "ts-log";

// Reconnect with expo backoff if we don't get a message or ping for 10 seconds
const HEARTBEAT_TIMEOUT_DURATION = 10_000;
const CONNECTION_TIMEOUT = 5000;

/**
* This class wraps websocket to provide a resilient web socket client.
*
* It will reconnect if connection fails with exponential backoff. Also, it will reconnect
* if it receives no ping request or regular message from server within a while as indication
* of timeout (assuming the server sends either regularly).
*
* This class also logs events if logger is given and by replacing onError method you can handle
* connection errors yourself (e.g: do not retry and close the connection).
*/
export class ResilientWebSocket {
endpoint: string;
wsClient: undefined | WebSocket;
Expand All @@ -24,10 +14,23 @@ export class ResilientWebSocket {
private wsFailedAttempts: number;
private heartbeatTimeout: undefined | NodeJS.Timeout;
private logger: undefined | Logger;
private connectionPromise: Promise<void> | undefined;
private resolveConnection: (() => void) | undefined;
private rejectConnection: ((error: Error) => void) | undefined;
private _isReconnecting = false;

get isReconnecting(): boolean {
return this._isReconnecting;
}

get isConnected(): boolean {
return this.wsClient?.readyState === WebSocket.OPEN;
}

onError: (error: ErrorEvent) => void;
onMessage: (data: WebSocket.Data) => void;
onReconnect: () => void;

constructor(
endpoint: string,
wsOptions?: ClientOptions | ClientRequestArgs,
Expand Down Expand Up @@ -64,23 +67,48 @@ export class ResilientWebSocket {
}
}

startWebSocket(): void {
async startWebSocket(): Promise<void> {
if (this.wsClient !== undefined) {
// If there's an existing connection attempt, wait for it
if (this.connectionPromise) {
return this.connectionPromise;
}
return;
}

this.logger?.info(`Creating Web Socket client`);

// Create a new promise for this connection attempt
this.connectionPromise = new Promise((resolve, reject) => {
this.resolveConnection = resolve;
this.rejectConnection = reject;
});

// Set a connection timeout
const timeoutId = setTimeout(() => {
if (this.rejectConnection) {
this.rejectConnection(
new Error(`Connection timeout after ${String(CONNECTION_TIMEOUT)}ms`)
);
}
}, CONNECTION_TIMEOUT);

this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
this.wsUserClosed = false;

this.wsClient.addEventListener("open", () => {
this.wsFailedAttempts = 0;
this.resetHeartbeat();
clearTimeout(timeoutId);
this._isReconnecting = false;
this.resolveConnection?.();
});

this.wsClient.addEventListener("error", (event) => {
this.onError(event);
if (this.rejectConnection) {
this.rejectConnection(new Error("WebSocket connection failed"));
}
});

this.wsClient.addEventListener("message", (event) => {
Expand All @@ -89,24 +117,23 @@ export class ResilientWebSocket {
});

this.wsClient.addEventListener("close", () => {
clearTimeout(timeoutId);
if (this.rejectConnection) {
this.rejectConnection(new Error("WebSocket closed before connecting"));
}
void this.handleClose();
});

// Handle ping events if supported (Node.js only)
if ("on" in this.wsClient) {
// Ping handler is undefined in browser side
this.wsClient.on("ping", () => {
this.logger?.info("Ping received");
this.resetHeartbeat();
});
}

return this.connectionPromise;
}

/**
* Reset the heartbeat timeout. This is called when we receive any message (ping or regular)
* from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION,
* we assume the connection is dead and reconnect.
*/
private resetHeartbeat(): void {
if (this.heartbeatTimeout !== undefined) {
clearTimeout(this.heartbeatTimeout);
Expand Down Expand Up @@ -145,8 +172,13 @@ export class ResilientWebSocket {
} else {
this.wsFailedAttempts += 1;
this.wsClient = undefined;
this.connectionPromise = undefined;
this.resolveConnection = undefined;
this.rejectConnection = undefined;

const waitTime = expoBackoff(this.wsFailedAttempts);

this._isReconnecting = true;
this.logger?.error(
"Connection closed unexpectedly or because of timeout. Reconnecting after " +
String(waitTime) +
Expand All @@ -163,7 +195,7 @@ export class ResilientWebSocket {
return;
}

this.startWebSocket();
await this.startWebSocket();
await this.waitForMaybeReadyWebSocket();

if (this.wsClient === undefined) {
Expand All @@ -180,6 +212,9 @@ export class ResilientWebSocket {
if (this.wsClient !== undefined) {
const client = this.wsClient;
this.wsClient = undefined;
this.connectionPromise = undefined;
this.resolveConnection = undefined;
this.rejectConnection = undefined;
client.close();
}
this.wsUserClosed = true;
Expand Down
Loading
Loading