Skip to content

Commit a5be099

Browse files
authored
fix: Error: WebSocket is not open: readyState 2 (CLOSING) (#5846)
1 parent 08490a2 commit a5be099

File tree

2 files changed

+49
-20
lines changed

2 files changed

+49
-20
lines changed

packages/cubejs-backend-shared/src/env.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1315,6 +1315,9 @@ const variables: Record<string, (...args: any) => any> = {
13151315
.asString(),
13161316
cubeStorePass: () => get('CUBEJS_CUBESTORE_PASS')
13171317
.asString(),
1318+
cubeStoreMaxConnectRetries: () => get('CUBEJS_CUBESTORE_MAX_CONNECT_RETRIES')
1319+
.default('5')
1320+
.asInt(),
13181321

13191322
// Redis
13201323
redisPoolMin: () => get('CUBEJS_REDIS_POOL_MIN')

packages/cubejs-cubestore-driver/src/WebSocketConnection.ts

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import WebSocket from 'ws';
22
import { flatbuffers } from 'flatbuffers';
33
import { InlineTable } from '@cubejs-backend/base-driver';
4+
import { getEnv } from '@cubejs-backend/shared';
45
import {
56
HttpCommand,
67
HttpError,
@@ -13,13 +14,19 @@ import {
1314
export class WebSocketConnection {
1415
protected messageCounter: number;
1516

17+
protected maxConnectRetries: number;
18+
19+
protected currentConnectionTry: number;
20+
1621
protected webSocket: any;
1722

1823
private url: string;
1924

2025
public constructor(url: string) {
2126
this.url = url;
2227
this.messageCounter = 1;
28+
this.maxConnectRetries = getEnv('cubeStoreMaxConnectRetries');
29+
this.currentConnectionTry = 0;
2330
}
2431

2532
protected async initWebSocket() {
@@ -38,22 +45,33 @@ export class WebSocketConnection {
3845
}, 5000);
3946

4047
webSocket.sendAsync = async (message) => new Promise<void>((resolveSend, rejectSend) => {
41-
webSocket.send(message, (err) => {
42-
if (err) {
43-
rejectSend(err);
44-
} else {
45-
resolveSend();
46-
}
47-
});
48+
// If socket is closing this message should be resent
49+
if (webSocket.readyState === WebSocket.OPEN) {
50+
webSocket.send(message, (err) => {
51+
if (err) {
52+
rejectSend(err);
53+
} else {
54+
resolveSend();
55+
}
56+
});
57+
}
4858
});
4959
webSocket.on('open', () => resolve(webSocket));
5060
webSocket.on('error', (err) => {
51-
reject(err);
61+
this.currentConnectionTry += 1;
62+
if (this.currentConnectionTry < this.maxConnectRetries) {
63+
setTimeout(async () => {
64+
resolve(this.initWebSocket());
65+
}, this.retryWaitTime());
66+
} else {
67+
reject(err);
68+
}
5269
if (webSocket === this.webSocket) {
5370
this.webSocket = undefined;
5471
}
5572
});
5673
webSocket.on('pong', () => {
74+
this.currentConnectionTry = 0;
5775
webSocket.lastHeartBeat = new Date();
5876
});
5977
webSocket.on('close', () => {
@@ -74,7 +92,7 @@ export class WebSocketConnection {
7492
webSocket.sentMessages[key].reject(e);
7593
}
7694
}
77-
}, 1000);
95+
}, this.retryWaitTime());
7896
}
7997

8098
if (webSocket === this.webSocket) {
@@ -136,20 +154,28 @@ export class WebSocketConnection {
136154
return this.webSocket.readyPromise;
137155
}
138156

157+
private retryWaitTime() {
158+
return 1000 * this.currentConnectionTry;
159+
}
160+
139161
private async sendMessage(messageId: number, buffer: Uint8Array): Promise<any> {
140162
const socket = await this.initWebSocket();
141163
return new Promise((resolve, reject) => {
142-
socket.send(buffer, (err) => {
143-
if (err) {
144-
delete socket.sentMessages[messageId];
145-
reject(err);
146-
}
147-
});
148-
socket.sentMessages[messageId] = {
149-
resolve,
150-
reject,
151-
buffer
152-
};
164+
if (socket.readyState === WebSocket.OPEN) {
165+
socket.send(buffer, (err) => {
166+
if (err) {
167+
delete socket.sentMessages[messageId];
168+
reject(err);
169+
}
170+
});
171+
socket.sentMessages[messageId] = {
172+
resolve,
173+
reject,
174+
buffer
175+
};
176+
} else {
177+
resolve(this.sendMessage(messageId, buffer));
178+
}
153179
});
154180
}
155181

0 commit comments

Comments
 (0)