Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 43 additions & 17 deletions lazer/sdk/js/examples/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import { PythLazerClient } from "../src/index.js";

/* eslint-disable no-console */
const client = new PythLazerClient("ws://127.0.0.1:1234/v1/stream", "ctoken1");
// Ignore debug messages
console.debug = () => {};

Check failure on line 4 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected console statement

Check failure on line 4 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected empty arrow function

const client = new PythLazerClient(
["wss://pyth-lazer.dourolabs.app/v1/stream"],
"access_token",
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
console // Optionally log socket operations (to the console in this case.)
);

client.addMessageListener((message) => {
console.log("got message:", message);
console.info("got message:", message);

Check failure on line 14 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected console statement
switch (message.type) {
case "json": {
if (message.value.type == "streamUpdated") {
console.log(
console.info(

Check failure on line 18 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected console statement
"stream updated for subscription",
message.value.subscriptionId,
":",
Expand All @@ -18,24 +26,42 @@
}
case "binary": {
if ("solana" in message.value) {
console.log("solana message:", message.value.solana?.toString("hex"));
console.info("solana message:", message.value.solana?.toString("hex"));

Check failure on line 29 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected console statement
}
if ("evm" in message.value) {
console.log("evm message:", message.value.evm?.toString("hex"));
console.info("evm message:", message.value.evm?.toString("hex"));

Check failure on line 32 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected console statement
}
break;
}
}
});
client.ws.addEventListener("open", () => {
client.send({
type: "subscribe",
subscriptionId: 1,
priceFeedIds: [1, 2],
properties: ["price"],
chains: ["solana"],
deliveryFormat: "json",
channel: "fixed_rate@200ms",
jsonBinaryEncoding: "hex",
});

// Create and remove one or more subscriptions on the fly
client.subscribe({
type: "subscribe",
subscriptionId: 1,
priceFeedIds: [1, 2],
properties: ["price"],
chains: ["solana"],
deliveryFormat: "binary",
channel: "fixed_rate@200ms",
parsed: false,
jsonBinaryEncoding: "base64",
});
client.subscribe({
type: "subscribe",
subscriptionId: 2,
priceFeedIds: [1, 2, 3, 4, 5],
properties: ["price"],
chains: ["evm"],
deliveryFormat: "json",
channel: "fixed_rate@200ms",
parsed: true,
jsonBinaryEncoding: "hex",
});

await new Promise((resolve) => setTimeout(resolve, 10000));

Check failure on line 63 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Invalid group length in numeric value

client.unsubscribe(1);
client.unsubscribe(2);
client.shutdown();
6 changes: 4 additions & 2 deletions lazer/sdk/js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/pyth-lazer-sdk",
"version": "0.1.2",
"version": "0.2.0",
"description": "Pyth Lazer SDK",
"publishConfig": {
"access": "public"
Expand Down Expand Up @@ -63,6 +63,8 @@
"@solana/buffer-layout": "^4.0.1",
"@solana/web3.js": "^1.98.0",
"isomorphic-ws": "^5.0.0",
"ws": "^8.18.0"
"ws": "^8.18.0",
"@isaacs/ttlcache": "^1.4.1",
"ts-log": "^2.2.7"
}
}
67 changes: 45 additions & 22 deletions lazer/sdk/js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
type Response,
SOLANA_FORMAT_MAGIC_BE,
} from "./protocol.js";
import { WebSocketPool } from "./socket/WebSocketPool.js";

Check failure on line 12 in lazer/sdk/js/src/client.ts

View workflow job for this annotation

GitHub Actions / test

There should be at least one empty line between import groups
import { dummyLogger, type Logger } from "ts-log";

Check failure on line 13 in lazer/sdk/js/src/client.ts

View workflow job for this annotation

GitHub Actions / test

`ts-log` import should occur before import of `./protocol.js`

export type BinaryResponse = {
subscriptionId: number;
Expand All @@ -28,52 +30,58 @@
const UINT64_NUM_BYTES = 8;

export class PythLazerClient {
ws: WebSocket;
wsp: WebSocketPool;

constructor(url: string, token: string) {
const finalUrl = new URL(url);
finalUrl.searchParams.append("ACCESS_TOKEN", token);
this.ws = new WebSocket(finalUrl);
/**
* Creates a new PythLazerClient instance.
* @param urls List of WebSocket URLs of the Pyth Lazer service

Check failure on line 37 in lazer/sdk/js/src/client.ts

View workflow job for this annotation

GitHub Actions / test

tsdoc-param-tag-missing-hyphen: The @param block should be followed by a parameter name and then a hyphen
* @param token The access token for authentication
* @param numConnections The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream.
* @param logger Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
*/
constructor(
urls: string[],
token: string,
numConnections: number = 3,
logger: Logger = dummyLogger
) {
this.wsp = new WebSocketPool(urls, token, numConnections, logger);
}

addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
this.ws.addEventListener("message", (event: WebSocket.MessageEvent) => {
if (typeof event.data == "string") {
this.wsp.addMessageListener((data: WebSocket.Data) => {
if (typeof data == "string") {
handler({
type: "json",
value: JSON.parse(event.data) as Response,
value: JSON.parse(data) as Response,
});
} else if (Buffer.isBuffer(event.data)) {
} else if (Buffer.isBuffer(data)) {
let pos = 0;
const magic = event.data
.subarray(pos, pos + UINT32_NUM_BYTES)
.readUint32BE();
const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32BE();
pos += UINT32_NUM_BYTES;
if (magic != BINARY_UPDATE_FORMAT_MAGIC) {
throw new Error("binary update format magic mismatch");
}
// TODO: some uint64 values may not be representable as Number.
const subscriptionId = Number(
event.data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE()
data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE()
);
pos += UINT64_NUM_BYTES;

const value: BinaryResponse = { subscriptionId };
while (pos < event.data.length) {
const len = event.data
.subarray(pos, pos + UINT16_NUM_BYTES)
.readUint16BE();
while (pos < data.length) {
const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
pos += UINT16_NUM_BYTES;
const magic = event.data
const magic = data
.subarray(pos, pos + UINT32_NUM_BYTES)
.readUint32BE();
if (magic == EVM_FORMAT_MAGIC) {
value.evm = event.data.subarray(pos, pos + len);
value.evm = data.subarray(pos, pos + len);
} else if (magic == SOLANA_FORMAT_MAGIC_BE) {
value.solana = event.data.subarray(pos, pos + len);
value.solana = data.subarray(pos, pos + len);
} else if (magic == PARSED_FORMAT_MAGIC) {
value.parsed = JSON.parse(
event.data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString()
data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString()
) as ParsedPayload;
} else {
throw new Error("unknown magic: " + magic.toString());
Expand All @@ -87,7 +95,22 @@
});
}

subscribe(request: Request) {
if (request.type !== "subscribe") {
throw new Error("Request must be a subscribe request");
}
this.wsp.addSubscription(request);
}

unsubscribe(subscriptionId: number) {
this.wsp.removeSubscription(subscriptionId);
}

send(request: Request) {
this.ws.send(JSON.stringify(request));
this.wsp.sendRequest(request);
}

shutdown(): void {
this.wsp.shutdown();
}
}
183 changes: 183 additions & 0 deletions lazer/sdk/js/src/socket/ResillientWebSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import WebSocket, { type ClientOptions } from "isomorphic-ws";
import type { ClientRequestArgs } from "node:http";
import type { Logger } from "ts-log";

// Reconnect with expo backoff if we don't get a message or ping for 10 seconds
const HEARTBEAT_TIMEOUT_DURATION = 10000;

/**
* This class wraps websocket to provide a resilient web socket client.
*
* It will reconnect if connection fails with exponential backoff. Also, it will reconnect
* if it receives no ping request or regular message from server within a while as indication
* of timeout (assuming the server sends either regularly).
*
* This class also logs events if logger is given and by replacing onError method you can handle
* connection errors yourself (e.g: do not retry and close the connection).
*/
export class ResilientWebSocket {
endpoint: string;
wsClient: undefined | WebSocket;
wsUserClosed: boolean;
private wsOptions: ClientOptions | ClientRequestArgs | undefined;
private wsFailedAttempts: number;
private heartbeatTimeout: undefined | NodeJS.Timeout;
private logger: undefined | Logger;

onError: (error: Error) => void;
onMessage: (data: WebSocket.Data) => void;
onReconnect: () => void;
constructor(
endpoint: string,
wsOptions?: ClientOptions | ClientRequestArgs,
logger?: Logger
) {
this.endpoint = endpoint;
this.wsOptions = wsOptions;
this.logger = logger;

this.wsFailedAttempts = 0;
this.onError = (error: Error) => {
this.logger?.error(error);
};
this.wsUserClosed = true;
this.onMessage = () => {};
this.onReconnect = () => {};
}

async send(data: any) {
this.logger?.info(`Sending ${data}`);

await this.waitForMaybeReadyWebSocket();

if (this.wsClient === undefined) {
this.logger?.error(
"Couldn't connect to the websocket server. Error callback is called."
);
} else {
this.wsClient?.send(data);
}
}

async startWebSocket() {
if (this.wsClient !== undefined) {
return;
}

this.logger?.info(`Creating Web Socket client`);

this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
this.wsUserClosed = false;

this.wsClient.onopen = () => {
this.wsFailedAttempts = 0;
this.resetHeartbeat();
};

this.wsClient.onerror = (event) => {
this.onError(event.error);
};

this.wsClient.onmessage = (event) => {
this.resetHeartbeat();
this.onMessage(event.data);
};

this.wsClient.onclose = async () => {
if (this.heartbeatTimeout !== undefined) {
clearTimeout(this.heartbeatTimeout);
}

if (this.wsUserClosed === false) {
this.wsFailedAttempts += 1;
this.wsClient = undefined;
const waitTime = expoBackoff(this.wsFailedAttempts);

this.logger?.error(
`Connection closed unexpectedly or because of timeout. Reconnecting after ${waitTime}ms.`
);

await sleep(waitTime);
this.restartUnexpectedClosedWebsocket();
} else {
this.logger?.info("The connection has been closed successfully.");
}
};

if (this.wsClient.on !== undefined) {
// Ping handler is undefined in browser side
this.wsClient.on("ping", () => {
this.logger?.info("Ping received");
this.resetHeartbeat();
});
}
}

/**
* Reset the heartbeat timeout. This is called when we receive any message (ping or regular)
* from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION,
* we assume the connection is dead and reconnect.
*/
private resetHeartbeat() {
if (this.heartbeatTimeout !== undefined) {
clearTimeout(this.heartbeatTimeout);
}

this.heartbeatTimeout = setTimeout(() => {
this.logger?.warn(`Connection timed out. Reconnecting...`);
this.wsClient?.terminate();
this.restartUnexpectedClosedWebsocket();
}, HEARTBEAT_TIMEOUT_DURATION);
}

private async waitForMaybeReadyWebSocket() {
let waitedTime = 0;
while (
this.wsClient !== undefined &&
this.wsClient.readyState !== this.wsClient.OPEN
) {
if (waitedTime > 5000) {
this.wsClient.close();
return;
} else {
waitedTime += 10;
await sleep(10);
}
}
}

private async restartUnexpectedClosedWebsocket() {
if (this.wsUserClosed === true) {
return;
}

await this.startWebSocket();
await this.waitForMaybeReadyWebSocket();

if (this.wsClient === undefined) {
this.logger?.error(
"Couldn't reconnect to websocket. Error callback is called."
);
return;
}

this.onReconnect();
}

closeWebSocket() {
if (this.wsClient !== undefined) {
const client = this.wsClient;
this.wsClient = undefined;
client.close();
}
this.wsUserClosed = true;
}
}

async function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

function expoBackoff(attempts: number): number {
return 2 ** attempts * 100;
}
Loading
Loading