Skip to content

Commit f2a3076

Browse files
Merge pull request #1 from nkaradzhov/command-timeout
use AbortSignal utilities instead of setTimeout
2 parents dd72fea + 0d16695 commit f2a3076

File tree

4 files changed

+32
-44
lines changed

4 files changed

+32
-44
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ export default class RedisCommandsQueue {
144144
if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) {
145145
return Promise.reject(new Error('The queue is full'));
146146
} else if (options?.abortSignal?.aborted) {
147-
return Promise.reject(new AbortError());
147+
return Promise.reject(new AbortError(options?.abortSignal?.reason));
148148
}
149149

150150
return new Promise((resolve, reject) => {
@@ -165,7 +165,7 @@ export default class RedisCommandsQueue {
165165
signal,
166166
listener: () => {
167167
this.#toWrite.remove(node);
168-
value.reject(new AbortError());
168+
value.reject(new AbortError(signal.reason));
169169
}
170170
};
171171
signal.addEventListener('abort', value.abort.listener, { once: true });

packages/client/lib/client/index.spec.ts

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { strict as assert } from 'node:assert';
22
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
33
import RedisClient, { RedisClientOptions, RedisClientType } from '.';
4-
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, CommandTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors';
4+
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, WatchError } from '../errors';
55
import { defineScript } from '../lua-script';
66
import { spy } from 'sinon';
77
import { once } from 'node:events';
@@ -264,16 +264,21 @@ describe('Client', () => {
264264
);
265265
}, GLOBAL.SERVERS.OPEN);
266266

267-
testUtils.testWithClient('AbortError with timeout', client => {
268-
const controller = new AbortController();
269-
controller.abort();
267+
testUtils.testWithClient('rejects with AbortError - respects given abortSignal', client => {
270268

271-
return assert.rejects(
272-
client.sendCommand(['PING'], {
273-
abortSignal: controller.signal
274-
}),
269+
const promise = client.sendCommand(['PING'], {
270+
abortSignal: AbortSignal.abort("my reason")
271+
})
272+
273+
assert.rejects(
274+
promise,
275275
AbortError
276276
);
277+
278+
promise.catch((error: unknown) => {
279+
assert.ok((error as string).includes("my reason"));
280+
});
281+
277282
}, {
278283
...GLOBAL.SERVERS.OPEN,
279284
clientOptions: {
@@ -282,19 +287,20 @@ describe('Client', () => {
282287
});
283288
});
284289

285-
testUtils.testWithClient('CommandTimeoutError', async client => {
286-
const promise = assert.rejects(client.sendCommand(['PING']), AbortError);
290+
291+
testUtils.testWithClient('rejects with AbortError on commandTimeout timer', async client => {
287292
const start = process.hrtime.bigint();
293+
const promise = client.ping();
288294

289-
while (process.hrtime.bigint() - start < 50_000_000) {
290-
// block the event loop for 50ms, to make sure the connection will timeout
291-
}
295+
while (process.hrtime.bigint() - start < 10_000_000) {
296+
// block the event loop for 10ms, to make sure the connection will timeout
297+
};
292298

293-
await promise;
299+
assert.rejects(promise, AbortError);
294300
}, {
295301
...GLOBAL.SERVERS.OPEN,
296302
clientOptions: {
297-
commandTimeout: 50,
303+
commandTimeout: 10,
298304
}
299305
});
300306

packages/client/lib/client/index.ts

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { BasicAuth, CredentialsError, CredentialsProvider, StreamingCredentialsP
44
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
55
import { EventEmitter } from 'node:events';
66
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
7-
import { ClientClosedError, ClientOfflineError, AbortError, DisconnectsClientError, WatchError } from '../errors';
7+
import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors';
88
import { URL } from 'node:url';
99
import { TcpSocketConnectOpts } from 'node:net';
1010
import { PUBSUB_TYPE, PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
@@ -530,7 +530,7 @@ export default class RedisClient<
530530
async #handshake(chainId: symbol, asap: boolean) {
531531
const promises = [];
532532
const commandsWithErrorHandlers = await this.#getHandshakeCommands();
533-
533+
534534
if (asap) commandsWithErrorHandlers.reverse()
535535

536536
for (const { cmd, errorHandler } of commandsWithErrorHandlers) {
@@ -636,7 +636,7 @@ export default class RedisClient<
636636
// since they could be connected to an older version that doesn't support them.
637637
}
638638
});
639-
639+
640640
commands.push({
641641
cmd: [
642642
'CLIENT',
@@ -893,15 +893,13 @@ export default class RedisClient<
893893
return Promise.reject(new ClientOfflineError());
894894
}
895895

896-
let controller: AbortController;
897896
if (this._self.#options?.commandTimeout) {
898-
controller = new AbortController()
899-
let abortSignal = controller.signal;
897+
let abortSignal = AbortSignal.timeout(this._self.#options?.commandTimeout);
900898
if (options?.abortSignal) {
901899
abortSignal = AbortSignal.any([
902900
abortSignal,
903-
options.abortSignal
904-
]);
901+
options.abortSignal
902+
]);
905903
}
906904
options = {
907905
...options,
@@ -911,23 +909,7 @@ export default class RedisClient<
911909
const promise = this._self.#queue.addCommand<T>(args, options);
912910

913911
this._self.#scheduleWrite();
914-
if (!this._self.#options?.commandTimeout) {
915-
return promise;
916-
}
917-
918-
return new Promise<T>((resolve, reject) => {
919-
const timeoutId = setTimeout(() => {
920-
controller.abort();
921-
reject(new AbortError());
922-
}, this._self.#options?.commandTimeout)
923-
promise.then(result => {
924-
clearInterval(timeoutId);
925-
resolve(result)
926-
}).catch(error => {
927-
clearInterval(timeoutId);
928-
reject(error)
929-
});
930-
})
912+
return promise;
931913
}
932914

933915
async SELECT(db: number): Promise<void> {

packages/client/lib/errors.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
export class AbortError extends Error {
2-
constructor() {
3-
super('The command was aborted');
2+
constructor(message = '') {
3+
super(`The command was aborted: ${message}`);
44
}
55
}
66

0 commit comments

Comments
 (0)