Skip to content

Commit 05f85b9

Browse files
committed
feat: improve js sdk reliability via redundant parallel websocket cxns
1 parent 7d68135 commit 05f85b9

File tree

6 files changed

+451
-44
lines changed

6 files changed

+451
-44
lines changed

lazer/sdk/js/examples/index.ts

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
import { PythLazerClient } from "../src/index.js";
22

3-
/* eslint-disable no-console */
4-
const client = new PythLazerClient("ws://127.0.0.1:1234/v1/stream", "ctoken1");
3+
// Ignore debug messages
4+
console.debug = () => {};
5+
6+
const client = new PythLazerClient(
7+
["wss://pyth-lazer.dourolabs.app/v1/stream"],
8+
"access_token",
9+
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. Default is 3.
10+
console // Optionally log socket operations (to the console in this case.)
11+
);
12+
513
client.addMessageListener((message) => {
6-
console.log("got message:", message);
14+
console.info("got message:", message);
715
switch (message.type) {
816
case "json": {
917
if (message.value.type == "streamUpdated") {
10-
console.log(
18+
console.info(
1119
"stream updated for subscription",
1220
message.value.subscriptionId,
1321
":",
@@ -18,24 +26,42 @@ client.addMessageListener((message) => {
1826
}
1927
case "binary": {
2028
if ("solana" in message.value) {
21-
console.log("solana message:", message.value.solana?.toString("hex"));
29+
console.info("solana message:", message.value.solana?.toString("hex"));
2230
}
2331
if ("evm" in message.value) {
24-
console.log("evm message:", message.value.evm?.toString("hex"));
32+
console.info("evm message:", message.value.evm?.toString("hex"));
2533
}
2634
break;
2735
}
2836
}
2937
});
30-
client.ws.addEventListener("open", () => {
31-
client.send({
32-
type: "subscribe",
33-
subscriptionId: 1,
34-
priceFeedIds: [1, 2],
35-
properties: ["price"],
36-
chains: ["solana"],
37-
deliveryFormat: "json",
38-
channel: "fixed_rate@200ms",
39-
jsonBinaryEncoding: "hex",
40-
});
38+
39+
// Create and remove one or more subscriptions on the fly
40+
client.subscribe({
41+
type: "subscribe",
42+
subscriptionId: 1,
43+
priceFeedIds: [1, 2],
44+
properties: ["price"],
45+
chains: ["solana"],
46+
deliveryFormat: "binary",
47+
channel: "fixed_rate@200ms",
48+
parsed: false,
49+
jsonBinaryEncoding: "base64",
50+
});
51+
client.subscribe({
52+
type: "subscribe",
53+
subscriptionId: 2,
54+
priceFeedIds: [1, 2, 3, 4, 5],
55+
properties: ["price"],
56+
chains: ["evm"],
57+
deliveryFormat: "json",
58+
channel: "fixed_rate@200ms",
59+
parsed: true,
60+
jsonBinaryEncoding: "hex",
4161
});
62+
63+
await new Promise((resolve) => setTimeout(resolve, 10000));
64+
65+
client.unsubscribe(1);
66+
client.unsubscribe(2);
67+
client.shutdown();

lazer/sdk/js/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
"@solana/buffer-layout": "^4.0.1",
6464
"@solana/web3.js": "^1.98.0",
6565
"isomorphic-ws": "^5.0.0",
66-
"ws": "^8.18.0"
66+
"ws": "^8.18.0",
67+
"@isaacs/ttlcache": "^1.4.1",
68+
"ts-log": "^2.2.7"
6769
}
6870
}

lazer/sdk/js/src/client.ts

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import {
99
type Response,
1010
SOLANA_FORMAT_MAGIC_BE,
1111
} from "./protocol.js";
12+
import { WebSocketPool } from "./socket/WebSocketPool.js";
13+
import { dummyLogger, type Logger } from "ts-log";
1214

1315
export type BinaryResponse = {
1416
subscriptionId: number;
@@ -28,52 +30,58 @@ const UINT32_NUM_BYTES = 4;
2830
const UINT64_NUM_BYTES = 8;
2931

3032
export class PythLazerClient {
31-
ws: WebSocket;
33+
wsp: WebSocketPool;
3234

33-
constructor(url: string, token: string) {
34-
const finalUrl = new URL(url);
35-
finalUrl.searchParams.append("ACCESS_TOKEN", token);
36-
this.ws = new WebSocket(finalUrl);
35+
/**
36+
* Creates a new PythLazerClient instance.
37+
* @param urls List of WebSocket URLs of the Pyth Lazer service
38+
* @param token The access token for authentication
39+
* @param numConnections The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream.
40+
* @param logger Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
41+
*/
42+
constructor(
43+
urls: string[],
44+
token: string,
45+
numConnections: number = 3,
46+
logger: Logger = dummyLogger
47+
) {
48+
this.wsp = new WebSocketPool(urls, token, numConnections, logger);
3749
}
3850

3951
addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
40-
this.ws.addEventListener("message", (event: WebSocket.MessageEvent) => {
41-
if (typeof event.data == "string") {
52+
this.wsp.addMessageListener((data: WebSocket.Data) => {
53+
if (typeof data == "string") {
4254
handler({
4355
type: "json",
44-
value: JSON.parse(event.data) as Response,
56+
value: JSON.parse(data) as Response,
4557
});
46-
} else if (Buffer.isBuffer(event.data)) {
58+
} else if (Buffer.isBuffer(data)) {
4759
let pos = 0;
48-
const magic = event.data
49-
.subarray(pos, pos + UINT32_NUM_BYTES)
50-
.readUint32BE();
60+
const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32BE();
5161
pos += UINT32_NUM_BYTES;
5262
if (magic != BINARY_UPDATE_FORMAT_MAGIC) {
5363
throw new Error("binary update format magic mismatch");
5464
}
5565
// TODO: some uint64 values may not be representable as Number.
5666
const subscriptionId = Number(
57-
event.data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE()
67+
data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE()
5868
);
5969
pos += UINT64_NUM_BYTES;
6070

6171
const value: BinaryResponse = { subscriptionId };
62-
while (pos < event.data.length) {
63-
const len = event.data
64-
.subarray(pos, pos + UINT16_NUM_BYTES)
65-
.readUint16BE();
72+
while (pos < data.length) {
73+
const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
6674
pos += UINT16_NUM_BYTES;
67-
const magic = event.data
75+
const magic = data
6876
.subarray(pos, pos + UINT32_NUM_BYTES)
6977
.readUint32BE();
7078
if (magic == EVM_FORMAT_MAGIC) {
71-
value.evm = event.data.subarray(pos, pos + len);
79+
value.evm = data.subarray(pos, pos + len);
7280
} else if (magic == SOLANA_FORMAT_MAGIC_BE) {
73-
value.solana = event.data.subarray(pos, pos + len);
81+
value.solana = data.subarray(pos, pos + len);
7482
} else if (magic == PARSED_FORMAT_MAGIC) {
7583
value.parsed = JSON.parse(
76-
event.data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString()
84+
data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString()
7785
) as ParsedPayload;
7886
} else {
7987
throw new Error("unknown magic: " + magic.toString());
@@ -87,7 +95,22 @@ export class PythLazerClient {
8795
});
8896
}
8997

98+
subscribe(request: Request) {
99+
if (request.type !== "subscribe") {
100+
throw new Error("Request must be a subscribe request");
101+
}
102+
this.wsp.addSubscription(request);
103+
}
104+
105+
unsubscribe(subscriptionId: number) {
106+
this.wsp.removeSubscription(subscriptionId);
107+
}
108+
90109
send(request: Request) {
91-
this.ws.send(JSON.stringify(request));
110+
this.wsp.sendRequest(request);
111+
}
112+
113+
shutdown(): void {
114+
this.wsp.shutdown();
92115
}
93116
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
import WebSocket, { type ClientOptions } from "isomorphic-ws";
2+
import type { ClientRequestArgs } from "node:http";
3+
import type { Logger } from "ts-log";
4+
5+
// Reconnect with expo backoff if we don't get a message or ping for 10 seconds
6+
const HEARTBEAT_TIMEOUT_DURATION = 10000;
7+
8+
/**
9+
* This class wraps websocket to provide a resilient web socket client.
10+
*
11+
* It will reconnect if connection fails with exponential backoff. Also, it will reconnect
12+
* if it receives no ping request or regular message from server within a while as indication
13+
* of timeout (assuming the server sends either regularly).
14+
*
15+
* This class also logs events if logger is given and by replacing onError method you can handle
16+
* connection errors yourself (e.g: do not retry and close the connection).
17+
*/
18+
export class ResilientWebSocket {
19+
endpoint: string;
20+
wsClient: undefined | WebSocket;
21+
wsUserClosed: boolean;
22+
private wsOptions: ClientOptions | ClientRequestArgs | undefined;
23+
private wsFailedAttempts: number;
24+
private heartbeatTimeout: undefined | NodeJS.Timeout;
25+
private logger: undefined | Logger;
26+
27+
onError: (error: Error) => void;
28+
onMessage: (data: WebSocket.Data) => void;
29+
onReconnect: () => void;
30+
constructor(
31+
endpoint: string,
32+
wsOptions?: ClientOptions | ClientRequestArgs,
33+
logger?: Logger
34+
) {
35+
this.endpoint = endpoint;
36+
this.wsOptions = wsOptions;
37+
this.logger = logger;
38+
39+
this.wsFailedAttempts = 0;
40+
this.onError = (error: Error) => {
41+
this.logger?.error(error);
42+
};
43+
this.wsUserClosed = true;
44+
this.onMessage = () => {};
45+
this.onReconnect = () => {};
46+
}
47+
48+
async send(data: any) {
49+
this.logger?.info(`Sending ${data}`);
50+
51+
await this.waitForMaybeReadyWebSocket();
52+
53+
if (this.wsClient === undefined) {
54+
this.logger?.error(
55+
"Couldn't connect to the websocket server. Error callback is called."
56+
);
57+
} else {
58+
this.wsClient?.send(data);
59+
}
60+
}
61+
62+
async startWebSocket() {
63+
if (this.wsClient !== undefined) {
64+
return;
65+
}
66+
67+
this.logger?.info(`Creating Web Socket client`);
68+
69+
this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
70+
this.wsUserClosed = false;
71+
72+
this.wsClient.onopen = () => {
73+
this.wsFailedAttempts = 0;
74+
this.resetHeartbeat();
75+
};
76+
77+
this.wsClient.onerror = (event) => {
78+
this.onError(event.error);
79+
};
80+
81+
this.wsClient.onmessage = (event) => {
82+
this.resetHeartbeat();
83+
this.onMessage(event.data);
84+
};
85+
86+
this.wsClient.onclose = async () => {
87+
if (this.heartbeatTimeout !== undefined) {
88+
clearTimeout(this.heartbeatTimeout);
89+
}
90+
91+
if (this.wsUserClosed === false) {
92+
this.wsFailedAttempts += 1;
93+
this.wsClient = undefined;
94+
const waitTime = expoBackoff(this.wsFailedAttempts);
95+
96+
this.logger?.error(
97+
`Connection closed unexpectedly or because of timeout. Reconnecting after ${waitTime}ms.`
98+
);
99+
100+
await sleep(waitTime);
101+
this.restartUnexpectedClosedWebsocket();
102+
} else {
103+
this.logger?.info("The connection has been closed successfully.");
104+
}
105+
};
106+
107+
if (this.wsClient.on !== undefined) {
108+
// Ping handler is undefined in browser side
109+
this.wsClient.on("ping", () => {
110+
this.logger?.info("Ping received");
111+
this.resetHeartbeat();
112+
});
113+
}
114+
}
115+
116+
/**
117+
* Reset the heartbeat timeout. This is called when we receive any message (ping or regular)
118+
* from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION,
119+
* we assume the connection is dead and reconnect.
120+
*/
121+
private resetHeartbeat() {
122+
if (this.heartbeatTimeout !== undefined) {
123+
clearTimeout(this.heartbeatTimeout);
124+
}
125+
126+
this.heartbeatTimeout = setTimeout(() => {
127+
this.logger?.warn(`Connection timed out. Reconnecting...`);
128+
this.wsClient?.terminate();
129+
this.restartUnexpectedClosedWebsocket();
130+
}, HEARTBEAT_TIMEOUT_DURATION);
131+
}
132+
133+
private async waitForMaybeReadyWebSocket() {
134+
let waitedTime = 0;
135+
while (
136+
this.wsClient !== undefined &&
137+
this.wsClient.readyState !== this.wsClient.OPEN
138+
) {
139+
if (waitedTime > 5000) {
140+
this.wsClient.close();
141+
return;
142+
} else {
143+
waitedTime += 10;
144+
await sleep(10);
145+
}
146+
}
147+
}
148+
149+
private async restartUnexpectedClosedWebsocket() {
150+
if (this.wsUserClosed === true) {
151+
return;
152+
}
153+
154+
await this.startWebSocket();
155+
await this.waitForMaybeReadyWebSocket();
156+
157+
if (this.wsClient === undefined) {
158+
this.logger?.error(
159+
"Couldn't reconnect to websocket. Error callback is called."
160+
);
161+
return;
162+
}
163+
164+
this.onReconnect();
165+
}
166+
167+
closeWebSocket() {
168+
if (this.wsClient !== undefined) {
169+
const client = this.wsClient;
170+
this.wsClient = undefined;
171+
client.close();
172+
}
173+
this.wsUserClosed = true;
174+
}
175+
}
176+
177+
async function sleep(ms: number) {
178+
return new Promise((resolve) => setTimeout(resolve, ms));
179+
}
180+
181+
function expoBackoff(attempts: number): number {
182+
return 2 ** attempts * 100;
183+
}

0 commit comments

Comments
 (0)