Skip to content

Commit 12c933f

Browse files
committed
Fix websocket memory leaks and infinite reconnection on error
1 parent eb659ca commit 12c933f

File tree

1 file changed

+19
-1
lines changed
  • zeppelin-web-angular/projects/zeppelin-sdk/src

1 file changed

+19
-1
lines changed

zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export class Message {
4343
public connectedStatus = false;
4444
public connectedStatus$ = new Subject<boolean>();
4545
private ws: WebSocketSubject<WebSocketMessage<MessageDataTypeMap>> | null = null;
46+
private wsSubscription: Subscription | null = null;
4647
private open$ = new Subject<Event>();
4748
private close$ = new Subject<CloseEvent>();
4849
private sent$ = new Subject<WebSocketMessage<MessageSendDataTypeMap>>();
@@ -99,13 +100,26 @@ export class Message {
99100
if (!this.wsUrl) {
100101
throw new Error('WebSocket URL is not set. Please call setWsUrl() before connect()');
101102
}
103+
104+
// Unsubscribe from existing subscription first
105+
if (this.wsSubscription) {
106+
this.wsSubscription.unsubscribe();
107+
this.wsSubscription = null;
108+
}
109+
110+
// Then close existing WebSocket
111+
if (this.ws) {
112+
this.ws.complete();
113+
this.ws = null;
114+
}
115+
102116
this.ws = webSocket<WebSocketMessage<MessageDataTypeMap>>({
103117
url: this.wsUrl,
104118
openObserver: this.open$,
105119
closeObserver: this.close$
106120
});
107121

108-
this.ws
122+
this.wsSubscription = this.ws
109123
.pipe(
110124
// reconnect
111125
retryWhen(errors => errors.pipe(mergeMap(() => this.close$.pipe(take(1), delay(4000)))))
@@ -190,6 +204,10 @@ export class Message {
190204
}
191205

192206
destroy(): void {
207+
if (this.wsSubscription) {
208+
this.wsSubscription.unsubscribe();
209+
this.wsSubscription = null;
210+
}
193211
if (this.ws) {
194212
this.ws.complete();
195213
this.ws = null;

0 commit comments

Comments
 (0)