Skip to content

Commit 7e2059b

Browse files
committed
ref #1789 - reject "hanging" promises when closing connection
1 parent 2203be5 commit 7e2059b

File tree

3 files changed

+29
-8
lines changed

3 files changed

+29
-8
lines changed

packages/client/lib/client/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,11 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
268268
.on('data', data => this.#queue.parseResponse(data))
269269
.on('error', err => {
270270
this.emit('error', err);
271-
this.#queue.flushWaitingForReply(err);
271+
if (!this.#socket.isOpen) {
272+
this.#queue.flushAll(err);
273+
} else {
274+
this.#queue.flushWaitingForReply(err);
275+
}
272276
})
273277
.on('connect', () => this.emit('connect'))
274278
.on('ready', () => {

packages/client/lib/client/socket.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import * as net from 'net';
33
import * as tls from 'tls';
44
import { encodeCommand } from '../commander';
55
import { RedisCommandArguments } from '../commands';
6-
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, AuthError } from '../errors';
6+
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, AuthError, ReconnectStrategyError } from '../errors';
77
import { promiseTimeout } from '../utils';
88

99
export interface RedisSocketCommonOptions {
@@ -93,9 +93,16 @@ export default class RedisSocket extends EventEmitter {
9393
}
9494

9595
async #connect(hadError?: boolean): Promise<void> {
96-
this.#isOpen = true;
97-
this.#socket = await this.#retryConnection(0, hadError);
98-
this.#writableNeedDrain = false;
96+
try {
97+
this.#isOpen = true;
98+
this.#socket = await this.#retryConnection(0, hadError);
99+
this.#writableNeedDrain = false;
100+
} catch (err) {
101+
this.#isOpen = false;
102+
this.emit('error', err);
103+
this.emit('end');
104+
throw err;
105+
}
99106

100107
if (!this.#isOpen) {
101108
this.disconnect();
@@ -134,17 +141,16 @@ export default class RedisSocket extends EventEmitter {
134141
try {
135142
return await this.#createSocket();
136143
} catch (err) {
137-
this.emit('error', err);
138-
139144
if (!this.#isOpen) {
140145
throw err;
141146
}
142147

143148
const retryIn = (this.#options?.reconnectStrategy ?? RedisSocket.#defaultReconnectStrategy)(retries);
144149
if (retryIn instanceof Error) {
145-
throw retryIn;
150+
throw new ReconnectStrategyError(retryIn, err);
146151
}
147152

153+
this.emit('error', err);
148154
await promiseTimeout(retryIn);
149155
return this.#retryConnection(retries + 1);
150156
}

packages/client/lib/errors.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,14 @@ export class RootNodesUnavailableError extends Error {
4545
super('All the root nodes are unavailable');
4646
}
4747
}
48+
49+
export class ReconnectStrategyError extends Error {
50+
originalError: Error;
51+
socketError: unknown;
52+
53+
constructor(originalError: Error, socketError: unknown) {
54+
super(originalError.message);
55+
this.originalError = originalError;
56+
this.socketError = socketError;
57+
}
58+
}

0 commit comments

Comments
 (0)