Skip to content

Commit 1598ca3

Browse files
authored
cleanup websocket connection on reconnect (#41)
The SDK was creating **multiple WebSocket connections** to the same destination when reconnecting after a connection loss, causing: - Resource leaks on both client and server - Duplicate message processing - Event listener accumulation - Memory leaks
1 parent c1224f1 commit 1598ca3

File tree

4 files changed

+3852
-3
lines changed

4 files changed

+3852
-3
lines changed

typescript/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@chainlink/data-streams-sdk",
3-
"version": "1.0.4",
3+
"version": "1.0.5",
44
"description": "TypeScript SDK for Chainlink Data Streams",
55
"main": "dist/src/index.js",
66
"types": "dist/src/index.d.ts",

typescript/src/stream/connection-manager.ts

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,53 @@ export class ConnectionManager extends EventEmitter {
214214
}
215215
}
216216

217+
/**
218+
* Clean up a WebSocket connection without scheduling reconnection
219+
* This is a helper method to prevent duplicate connections
220+
*/
221+
private cleanupWebSocket(connection: ManagedConnection): void {
222+
if (!connection.ws) {
223+
return;
224+
}
225+
226+
this.logger.connectionDebug(
227+
`Cleaning up WebSocket for ${connection.id} (readyState: ${connection.ws.readyState})`
228+
);
229+
230+
// Store reference to WebSocket before cleanup
231+
const ws = connection.ws;
232+
233+
// Add error handler BEFORE removing listeners to catch any async errors during termination
234+
// This prevents "WebSocket was closed before the connection was established" from crashing the process
235+
ws.once("error", (error: Error) => {
236+
// Silently ignore errors during cleanup - connection is being terminated anyway
237+
this.logger.connectionDebug(`Error during WebSocket cleanup for ${connection.id}:`, error);
238+
});
239+
240+
// Remove all other event listeners to prevent memory leaks
241+
ws.removeAllListeners("open");
242+
ws.removeAllListeners("close");
243+
ws.removeAllListeners("message");
244+
ws.removeAllListeners("ping");
245+
ws.removeAllListeners("pong");
246+
ws.removeAllListeners("unexpected-response");
247+
248+
// Terminate the connection if it's still open or connecting
249+
if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) {
250+
try {
251+
ws.terminate();
252+
} catch (error) {
253+
// Ignore synchronous errors during cleanup
254+
this.logger.connectionDebug(`Synchronous error terminating WebSocket for ${connection.id}:`, error);
255+
}
256+
}
257+
258+
connection.ws = null;
259+
260+
// Stop health monitoring
261+
this.stopHealthMonitoring(connection);
262+
}
263+
217264
/**
218265
* Create and establish a single connection to an origin
219266
*/
@@ -252,8 +299,19 @@ export class ConnectionManager extends EventEmitter {
252299
private async establishConnection(connection: ManagedConnection): Promise<void> {
253300
return new Promise((resolve, reject) => {
254301
try {
302+
// CRITICAL FIX: Clean up old WebSocket before creating new one to prevent multiple connections
303+
const hadExistingConnection = !!connection.ws;
304+
if (hadExistingConnection) {
305+
this.logger.connectionDebug(`Cleaning up existing WebSocket for ${connection.id} before reconnection`);
306+
this.cleanupWebSocket(connection);
307+
}
308+
255309
this.updateConnectionState(connection, ConnectionState.CONNECTING, "WebSocket connection initiated");
256310

311+
this.logger.connectionDebug(
312+
`Creating new WebSocket for ${connection.id} to ${connection.origin} (cleaned up old: ${hadExistingConnection})`
313+
);
314+
257315
// Build WebSocket URL with feed IDs
258316
const feedIdsParam = this.managerConfig.feedIds.join(",");
259317

@@ -641,17 +699,30 @@ export class ConnectionManager extends EventEmitter {
641699

642700
if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) {
643701
const timeout = setTimeout(() => {
644-
ws.terminate();
702+
// Use cleanup method to ensure proper termination
703+
this.cleanupWebSocket(connection);
645704
resolve();
646705
}, 1000);
647706

648707
ws.once("close", () => {
649708
clearTimeout(timeout);
709+
// Clean up after close
710+
this.cleanupWebSocket(connection);
650711
resolve();
651712
});
652713

653-
ws.close();
714+
try {
715+
ws.close();
716+
} catch (error) {
717+
// If close fails, force cleanup
718+
this.logger.connectionDebug(`Error closing WebSocket for ${connection.id}:`, error);
719+
this.cleanupWebSocket(connection);
720+
clearTimeout(timeout);
721+
resolve();
722+
}
654723
} else {
724+
// For already closed connections, just cleanup
725+
this.cleanupWebSocket(connection);
655726
resolve();
656727
}
657728
});

0 commit comments

Comments
 (0)