Skip to content

Commit af0ba94

Browse files
committed
feat: add promise for connection open
1 parent eb7f460 commit af0ba94

File tree

4 files changed

+160
-133
lines changed

4 files changed

+160
-133
lines changed

lazer/sdk/js/examples/index.ts

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -6,65 +6,82 @@ import { PythLazerClient } from "../src/index.js";
66
// Ignore debug messages
77
console.debug = () => {};
88

9-
const client = new PythLazerClient(
10-
["wss://pyth-lazer.dourolabs.app/v1/stream"],
11-
"access_token",
12-
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
13-
console // Optionally log socket operations (to the console in this case.)
14-
);
9+
async function main() {
10+
try {
11+
const client = await PythLazerClient.create(
12+
["wss://pyth-lazer.dourolabs.app/v1/stream"],
13+
"access_token",
14+
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
15+
console // Optionally log socket operations (to the console in this case.)
16+
);
1517

16-
client.addMessageListener((message) => {
17-
console.info("got message:", message);
18-
switch (message.type) {
19-
case "json": {
20-
if (message.value.type == "streamUpdated") {
21-
console.info(
22-
"stream updated for subscription",
23-
message.value.subscriptionId,
24-
":",
25-
message.value.parsed?.priceFeeds
26-
);
18+
client.addMessageListener((message) => {
19+
console.info("got message:", message);
20+
switch (message.type) {
21+
case "json": {
22+
if (message.value.type == "streamUpdated") {
23+
console.info(
24+
"stream updated for subscription",
25+
message.value.subscriptionId,
26+
":",
27+
message.value.parsed?.priceFeeds
28+
);
29+
}
30+
break;
31+
}
32+
case "binary": {
33+
if ("solana" in message.value) {
34+
console.info(
35+
"solana message:",
36+
message.value.solana?.toString("hex")
37+
);
38+
}
39+
if ("evm" in message.value) {
40+
console.info("evm message:", message.value.evm?.toString("hex"));
41+
}
42+
break;
43+
}
2744
}
28-
break;
29-
}
30-
case "binary": {
31-
if ("solana" in message.value) {
32-
console.info("solana message:", message.value.solana?.toString("hex"));
33-
}
34-
if ("evm" in message.value) {
35-
console.info("evm message:", message.value.evm?.toString("hex"));
36-
}
37-
break;
38-
}
39-
}
40-
});
45+
});
4146

42-
// Create and remove one or more subscriptions on the fly
43-
await client.subscribe({
44-
type: "subscribe",
45-
subscriptionId: 1,
46-
priceFeedIds: [1, 2],
47-
properties: ["price"],
48-
chains: ["solana"],
49-
deliveryFormat: "binary",
50-
channel: "fixed_rate@200ms",
51-
parsed: false,
52-
jsonBinaryEncoding: "base64",
53-
});
54-
await client.subscribe({
55-
type: "subscribe",
56-
subscriptionId: 2,
57-
priceFeedIds: [1, 2, 3, 4, 5],
58-
properties: ["price"],
59-
chains: ["evm"],
60-
deliveryFormat: "json",
61-
channel: "fixed_rate@200ms",
62-
parsed: true,
63-
jsonBinaryEncoding: "hex",
64-
});
47+
// Create and remove one or more subscriptions on the fly
48+
await client.subscribe({
49+
type: "subscribe",
50+
subscriptionId: 1,
51+
priceFeedIds: [1, 2],
52+
properties: ["price"],
53+
chains: ["solana"],
54+
deliveryFormat: "binary",
55+
channel: "fixed_rate@200ms",
56+
parsed: false,
57+
jsonBinaryEncoding: "base64",
58+
});
59+
await client.subscribe({
60+
type: "subscribe",
61+
subscriptionId: 2,
62+
priceFeedIds: [1, 2, 3, 4, 5],
63+
properties: ["price"],
64+
chains: ["evm"],
65+
deliveryFormat: "json",
66+
channel: "fixed_rate@200ms",
67+
parsed: true,
68+
jsonBinaryEncoding: "hex",
69+
});
70+
71+
await new Promise((resolve) => setTimeout(resolve, 10_000));
6572

66-
await new Promise((resolve) => setTimeout(resolve, 10_000));
73+
await client.unsubscribe(1);
74+
await client.unsubscribe(2);
6775

68-
await client.unsubscribe(1);
69-
await client.unsubscribe(2);
70-
client.shutdown();
76+
await new Promise((resolve) => setTimeout(resolve, 10_000));
77+
client.shutdown();
78+
} catch (error) {
79+
console.error("Error initializing client:", error);
80+
process.exit(1);
81+
}
82+
}
83+
84+
main().catch((error) => {
85+
console.error("Unhandled error:", error);
86+
process.exit(1);
87+
});

lazer/sdk/js/src/client.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const UINT32_NUM_BYTES = 4;
3030
const UINT64_NUM_BYTES = 8;
3131

3232
export class PythLazerClient {
33-
wsp: WebSocketPool;
33+
private constructor(private readonly wsp: WebSocketPool) {}
3434

3535
/**
3636
* Creates a new PythLazerClient instance.
@@ -39,13 +39,14 @@ export class PythLazerClient {
3939
* @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream.
4040
* @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
4141
*/
42-
constructor(
42+
static async create(
4343
urls: string[],
4444
token: string,
4545
numConnections = 3,
4646
logger: Logger = dummyLogger
47-
) {
48-
this.wsp = new WebSocketPool(urls, token, numConnections, logger);
47+
): Promise<PythLazerClient> {
48+
const wsp = await WebSocketPool.create(urls, token, numConnections, logger);
49+
return new PythLazerClient(wsp);
4950
}
5051

5152
addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {

lazer/sdk/js/src/socket/resilient-web-socket.ts

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,10 @@
11
import type { ClientRequestArgs } from "node:http";
2-
32
import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws";
43
import type { Logger } from "ts-log";
54

6-
// Reconnect with expo backoff if we don't get a message or ping for 10 seconds
75
const HEARTBEAT_TIMEOUT_DURATION = 10_000;
6+
const CONNECTION_TIMEOUT = 5000;
87

9-
/**
10-
* This class wraps websocket to provide a resilient web socket client.
11-
*
12-
* It will reconnect if connection fails with exponential backoff. Also, it will reconnect
13-
* if it receives no ping request or regular message from server within a while as indication
14-
* of timeout (assuming the server sends either regularly).
15-
*
16-
* This class also logs events if logger is given and by replacing onError method you can handle
17-
* connection errors yourself (e.g: do not retry and close the connection).
18-
*/
198
export class ResilientWebSocket {
209
endpoint: string;
2110
wsClient: undefined | WebSocket;
@@ -24,10 +13,14 @@ export class ResilientWebSocket {
2413
private wsFailedAttempts: number;
2514
private heartbeatTimeout: undefined | NodeJS.Timeout;
2615
private logger: undefined | Logger;
16+
private connectionPromise: Promise<void> | undefined;
17+
private resolveConnection: (() => void) | undefined;
18+
private rejectConnection: ((error: Error) => void) | undefined;
2719

2820
onError: (error: ErrorEvent) => void;
2921
onMessage: (data: WebSocket.Data) => void;
3022
onReconnect: () => void;
23+
3124
constructor(
3225
endpoint: string,
3326
wsOptions?: ClientOptions | ClientRequestArgs,
@@ -64,23 +57,47 @@ export class ResilientWebSocket {
6457
}
6558
}
6659

67-
startWebSocket(): void {
60+
async startWebSocket(): Promise<void> {
6861
if (this.wsClient !== undefined) {
69-
return;
62+
// If there's an existing connection attempt, wait for it
63+
if (this.connectionPromise) {
64+
return this.connectionPromise;
65+
}
66+
return Promise.resolve();
7067
}
7168

7269
this.logger?.info(`Creating Web Socket client`);
7370

71+
// Create a new promise for this connection attempt
72+
this.connectionPromise = new Promise((resolve, reject) => {
73+
this.resolveConnection = resolve;
74+
this.rejectConnection = reject;
75+
});
76+
77+
// Set a connection timeout
78+
const timeoutId = setTimeout(() => {
79+
if (this.rejectConnection) {
80+
this.rejectConnection(
81+
new Error(`Connection timeout after ${CONNECTION_TIMEOUT}ms`)
82+
);
83+
}
84+
}, CONNECTION_TIMEOUT);
85+
7486
this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
7587
this.wsUserClosed = false;
7688

7789
this.wsClient.addEventListener("open", () => {
7890
this.wsFailedAttempts = 0;
7991
this.resetHeartbeat();
92+
clearTimeout(timeoutId);
93+
this.resolveConnection?.();
8094
});
8195

8296
this.wsClient.addEventListener("error", (event) => {
8397
this.onError(event);
98+
if (this.rejectConnection) {
99+
this.rejectConnection(new Error("WebSocket connection failed"));
100+
}
84101
});
85102

86103
this.wsClient.addEventListener("message", (event) => {
@@ -89,24 +106,23 @@ export class ResilientWebSocket {
89106
});
90107

91108
this.wsClient.addEventListener("close", () => {
109+
clearTimeout(timeoutId);
110+
if (this.rejectConnection) {
111+
this.rejectConnection(new Error("WebSocket closed before connecting"));
112+
}
92113
void this.handleClose();
93114
});
94115

95-
// Handle ping events if supported (Node.js only)
96116
if ("on" in this.wsClient) {
97-
// Ping handler is undefined in browser side
98117
this.wsClient.on("ping", () => {
99118
this.logger?.info("Ping received");
100119
this.resetHeartbeat();
101120
});
102121
}
122+
123+
return this.connectionPromise;
103124
}
104125

105-
/**
106-
* Reset the heartbeat timeout. This is called when we receive any message (ping or regular)
107-
* from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION,
108-
* we assume the connection is dead and reconnect.
109-
*/
110126
private resetHeartbeat(): void {
111127
if (this.heartbeatTimeout !== undefined) {
112128
clearTimeout(this.heartbeatTimeout);
@@ -145,6 +161,10 @@ export class ResilientWebSocket {
145161
} else {
146162
this.wsFailedAttempts += 1;
147163
this.wsClient = undefined;
164+
this.connectionPromise = undefined;
165+
this.resolveConnection = undefined;
166+
this.rejectConnection = undefined;
167+
148168
const waitTime = expoBackoff(this.wsFailedAttempts);
149169

150170
this.logger?.error(
@@ -163,7 +183,7 @@ export class ResilientWebSocket {
163183
return;
164184
}
165185

166-
this.startWebSocket();
186+
await this.startWebSocket();
167187
await this.waitForMaybeReadyWebSocket();
168188

169189
if (this.wsClient === undefined) {
@@ -180,6 +200,9 @@ export class ResilientWebSocket {
180200
if (this.wsClient !== undefined) {
181201
const client = this.wsClient;
182202
this.wsClient = undefined;
203+
this.connectionPromise = undefined;
204+
this.resolveConnection = undefined;
205+
this.rejectConnection = undefined;
183206
client.close();
184207
}
185208
this.wsUserClosed = true;

0 commit comments

Comments
 (0)