Skip to content

Commit 83ae95e

Browse files
committed
feat: add promise for all connections down
1 parent af0ba94 commit 83ae95e

File tree

4 files changed

+129
-64
lines changed

4 files changed

+129
-64
lines changed

lazer/sdk/js/examples/index.ts

Lines changed: 67 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -7,78 +7,81 @@ import { PythLazerClient } from "../src/index.js";
77
console.debug = () => {};
88

99
async function main() {
10-
try {
11-
const client = await PythLazerClient.create(
12-
["wss://pyth-lazer.dourolabs.app/v1/stream"],
13-
"access_token",
14-
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
15-
console // Optionally log socket operations (to the console in this case.)
16-
);
10+
const client = await PythLazerClient.create(
11+
["wss://pyth-lazer.dourolabs.app/v1/stream"],
12+
"access_token",
13+
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
14+
console // Optionally log socket operations (to the console in this case.)
15+
);
1716

18-
client.addMessageListener((message) => {
19-
console.info("got message:", message);
20-
switch (message.type) {
21-
case "json": {
22-
if (message.value.type == "streamUpdated") {
23-
console.info(
24-
"stream updated for subscription",
25-
message.value.subscriptionId,
26-
":",
27-
message.value.parsed?.priceFeeds
28-
);
29-
}
30-
break;
17+
// Monitor for all connections being down
18+
client.onAllConnectionsDown().then(() => {
19+
// Handle complete connection failure.
20+
// The connections will keep attempting to reconnect with expo backoff.
21+
// To shutdown the client completely, call shutdown().
22+
console.error("All connections are down!");
23+
});
24+
25+
client.addMessageListener((message) => {
26+
console.info("got message:", message);
27+
switch (message.type) {
28+
case "json": {
29+
if (message.value.type == "streamUpdated") {
30+
console.info(
31+
"stream updated for subscription",
32+
message.value.subscriptionId,
33+
":",
34+
message.value.parsed?.priceFeeds
35+
);
36+
}
37+
break;
38+
}
39+
case "binary": {
40+
if ("solana" in message.value) {
41+
console.info(
42+
"solana message:",
43+
message.value.solana?.toString("hex")
44+
);
3145
}
32-
case "binary": {
33-
if ("solana" in message.value) {
34-
console.info(
35-
"solana message:",
36-
message.value.solana?.toString("hex")
37-
);
38-
}
39-
if ("evm" in message.value) {
40-
console.info("evm message:", message.value.evm?.toString("hex"));
41-
}
42-
break;
46+
if ("evm" in message.value) {
47+
console.info("evm message:", message.value.evm?.toString("hex"));
4348
}
49+
break;
4450
}
45-
});
51+
}
52+
});
4653

47-
// Create and remove one or more subscriptions on the fly
48-
await client.subscribe({
49-
type: "subscribe",
50-
subscriptionId: 1,
51-
priceFeedIds: [1, 2],
52-
properties: ["price"],
53-
chains: ["solana"],
54-
deliveryFormat: "binary",
55-
channel: "fixed_rate@200ms",
56-
parsed: false,
57-
jsonBinaryEncoding: "base64",
58-
});
59-
await client.subscribe({
60-
type: "subscribe",
61-
subscriptionId: 2,
62-
priceFeedIds: [1, 2, 3, 4, 5],
63-
properties: ["price"],
64-
chains: ["evm"],
65-
deliveryFormat: "json",
66-
channel: "fixed_rate@200ms",
67-
parsed: true,
68-
jsonBinaryEncoding: "hex",
69-
});
54+
// Create and remove one or more subscriptions on the fly
55+
await client.subscribe({
56+
type: "subscribe",
57+
subscriptionId: 1,
58+
priceFeedIds: [1, 2],
59+
properties: ["price"],
60+
chains: ["solana"],
61+
deliveryFormat: "binary",
62+
channel: "fixed_rate@200ms",
63+
parsed: false,
64+
jsonBinaryEncoding: "base64",
65+
});
66+
await client.subscribe({
67+
type: "subscribe",
68+
subscriptionId: 2,
69+
priceFeedIds: [1, 2, 3, 4, 5],
70+
properties: ["price"],
71+
chains: ["evm"],
72+
deliveryFormat: "json",
73+
channel: "fixed_rate@200ms",
74+
parsed: true,
75+
jsonBinaryEncoding: "hex",
76+
});
7077

71-
await new Promise((resolve) => setTimeout(resolve, 10_000));
78+
await new Promise((resolve) => setTimeout(resolve, 10_000));
7279

73-
await client.unsubscribe(1);
74-
await client.unsubscribe(2);
80+
await client.unsubscribe(1);
81+
await client.unsubscribe(2);
7582

76-
await new Promise((resolve) => setTimeout(resolve, 10_000));
77-
client.shutdown();
78-
} catch (error) {
79-
console.error("Error initializing client:", error);
80-
process.exit(1);
81-
}
83+
await new Promise((resolve) => setTimeout(resolve, 10_000));
84+
client.shutdown();
8285
}
8386

8487
main().catch((error) => {

lazer/sdk/js/src/client.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@ export class PythLazerClient {
111111
await this.wsp.sendRequest(request);
112112
}
113113

114+
/**
115+
* Returns a promise that resolves when all WebSocket connections are down or attempting to reconnect
116+
*/
117+
onAllConnectionsDown(): Promise<void> {
118+
return this.wsp.onAllConnectionsDown();
119+
}
120+
114121
shutdown(): void {
115122
this.wsp.shutdown();
116123
}

lazer/sdk/js/src/socket/resilient-web-socket.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@ export class ResilientWebSocket {
1616
private connectionPromise: Promise<void> | undefined;
1717
private resolveConnection: (() => void) | undefined;
1818
private rejectConnection: ((error: Error) => void) | undefined;
19+
private _isReconnecting: boolean = false;
20+
21+
get isReconnecting(): boolean {
22+
return this._isReconnecting;
23+
}
24+
25+
get isConnected(): boolean {
26+
return this.wsClient?.readyState === WebSocket.OPEN;
27+
}
1928

2029
onError: (error: ErrorEvent) => void;
2130
onMessage: (data: WebSocket.Data) => void;
@@ -90,6 +99,7 @@ export class ResilientWebSocket {
9099
this.wsFailedAttempts = 0;
91100
this.resetHeartbeat();
92101
clearTimeout(timeoutId);
102+
this._isReconnecting = false;
93103
this.resolveConnection?.();
94104
});
95105

@@ -167,6 +177,7 @@ export class ResilientWebSocket {
167177

168178
const waitTime = expoBackoff(this.wsFailedAttempts);
169179

180+
this._isReconnecting = true;
170181
this.logger?.error(
171182
"Connection closed unexpectedly or because of timeout. Reconnecting after " +
172183
String(waitTime) +

lazer/sdk/js/src/socket/web-socket-pool.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ export class WebSocketPool {
1212
private cache: TTLCache<string, boolean>;
1313
private subscriptions: Map<number, Request>; // id -> subscription Request
1414
private messageListeners: ((event: WebSocket.Data) => void)[];
15+
private allConnectionsDownListeners: (() => void)[];
16+
private wasAllDown: boolean = true;
1517

1618
private constructor(private readonly logger: Logger = dummyLogger) {
1719
this.rwsPool = [];
1820
this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds
1921
this.subscriptions = new Map();
2022
this.messageListeners = [];
23+
this.allConnectionsDownListeners = [];
24+
25+
// Start monitoring connection states
26+
setInterval(() => this.checkConnectionStates(), 100);
2127
}
2228

2329
/**
@@ -172,12 +178,50 @@ export class WebSocketPool {
172178
this.messageListeners.push(handler);
173179
}
174180

181+
/**
182+
* Monitors if all websocket connections are currently down or in reconnecting state
183+
* Returns a promise that resolves when all connections are down
184+
*/
185+
onAllConnectionsDown(): Promise<void> {
186+
return new Promise((resolve) => {
187+
if (this.areAllConnectionsDown()) {
188+
resolve();
189+
} else {
190+
this.allConnectionsDownListeners.push(resolve);
191+
}
192+
});
193+
}
194+
195+
private areAllConnectionsDown(): boolean {
196+
return this.rwsPool.every((ws) => !ws.isConnected || ws.isReconnecting);
197+
}
198+
199+
private checkConnectionStates(): void {
200+
const allDown = this.areAllConnectionsDown();
201+
202+
// If all connections just went down
203+
if (allDown && !this.wasAllDown) {
204+
this.wasAllDown = true;
205+
this.logger.error("All WebSocket connections are down or reconnecting");
206+
// Notify all listeners
207+
while (this.allConnectionsDownListeners.length > 0) {
208+
const listener = this.allConnectionsDownListeners.shift();
209+
listener?.();
210+
}
211+
}
212+
// If at least one connection was restored
213+
if (!allDown && this.wasAllDown) {
214+
this.wasAllDown = false;
215+
}
216+
}
217+
175218
shutdown(): void {
176219
for (const rws of this.rwsPool) {
177220
rws.closeWebSocket();
178221
}
179222
this.rwsPool = [];
180223
this.subscriptions.clear();
181224
this.messageListeners = [];
225+
this.allConnectionsDownListeners = [];
182226
}
183227
}

0 commit comments

Comments
 (0)