Skip to content

Commit b0223e7

Browse files
committed
--wip-- [skip ci]
1 parent 6f3380b commit b0223e7

File tree

2 files changed

+41
-10
lines changed

2 files changed

+41
-10
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
44
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
6-
import { AbortError, ErrorReply } from '../errors';
6+
import { AbortError, ErrorReply, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
88

99
export interface CommandOptions<T = TypeMapping> {
@@ -14,6 +14,10 @@ export interface CommandOptions<T = TypeMapping> {
1414
* Maps between RESP and JavaScript types
1515
*/
1616
typeMapping?: T;
17+
/**
18+
* Timeout for the command in milliseconds
19+
*/
20+
timeout?: number;
1721
}
1822

1923
export interface CommandToWrite extends CommandWaitingForReply {
@@ -23,6 +27,10 @@ export interface CommandToWrite extends CommandWaitingForReply {
2327
signal: AbortSignal;
2428
listener: () => unknown;
2529
} | undefined;
30+
timeout: {
31+
signal: AbortSignal;
32+
listener: () => unknown;
33+
} | undefined;
2634
}
2735

2836
interface CommandWaitingForReply {
@@ -80,7 +88,7 @@ export default class RedisCommandsQueue {
8088
#onPush(push: Array<any>) {
8189
// TODO: type
8290
if (this.#pubSub.handleMessageReply(push)) return true;
83-
91+
8492
const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push);
8593
if (isShardedUnsubscribe && !this.#waitingForReply.length) {
8694
const channel = push[1].toString();
@@ -144,6 +152,7 @@ export default class RedisCommandsQueue {
144152
if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) {
145153
return Promise.reject(new Error('The queue is full'));
146154
} else if (options?.abortSignal?.aborted) {
155+
console.log('eeeeeeeee', args)
147156
return Promise.reject(new AbortError());
148157
}
149158

@@ -153,17 +162,35 @@ export default class RedisCommandsQueue {
153162
args,
154163
chainId: options?.chainId,
155164
abort: undefined,
165+
timeout: undefined,
156166
resolve,
157167
reject,
158168
channelsCounter: undefined,
159169
typeMapping: options?.typeMapping
160170
};
161171

172+
const timeout = options?.timeout;
173+
if (timeout) {
174+
console.log('set timeout', timeout);
175+
const signal = AbortSignal.timeout(timeout);
176+
value.timeout = {
177+
signal,
178+
listener: () => {
179+
console.log('TIMEOUT OCCURRED', node);
180+
this.#toWrite.remove(node);
181+
value.reject(new TimeoutError());
182+
}
183+
};
184+
signal.addEventListener('abort', value.timeout.listener, { once: true });
185+
}
186+
162187
const signal = options?.abortSignal;
163188
if (signal) {
189+
console.log('signal', signal)
164190
value.abort = {
165191
signal,
166192
listener: () => {
193+
console.log('ABORT OCCURRED', node);
167194
this.#toWrite.remove(node);
168195
value.reject(new AbortError());
169196
}
@@ -181,6 +208,7 @@ export default class RedisCommandsQueue {
181208
args: command.args,
182209
chainId,
183210
abort: undefined,
211+
timeout: undefined,
184212
resolve() {
185213
command.resolve();
186214
resolve();
@@ -202,7 +230,7 @@ export default class RedisCommandsQueue {
202230
this.decoder.onReply = (reply => {
203231
if (Array.isArray(reply)) {
204232
if (this.#onPush(reply)) return;
205-
233+
206234
if (PONG.equals(reply[0] as Buffer)) {
207235
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
208236
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
@@ -250,7 +278,7 @@ export default class RedisCommandsQueue {
250278
if (!this.#pubSub.isActive) {
251279
this.#resetDecoderCallbacks();
252280
}
253-
281+
254282
resolve();
255283
};
256284
}
@@ -299,6 +327,7 @@ export default class RedisCommandsQueue {
299327
args: ['MONITOR'],
300328
chainId: options?.chainId,
301329
abort: undefined,
330+
timeout: undefined,
302331
// using `resolve` instead of using `.then`/`await` to make sure it'll be called before processing the next reply
303332
resolve: () => {
304333
// after running `MONITOR` only `MONITOR` and `RESET` replies are expected
@@ -317,7 +346,7 @@ export default class RedisCommandsQueue {
317346
reject,
318347
channelsCounter: undefined,
319348
typeMapping
320-
}, options?.asap);
349+
}, options?.asap);
321350
});
322351
}
323352

@@ -340,18 +369,19 @@ export default class RedisCommandsQueue {
340369
this.#resetDecoderCallbacks();
341370
this.#resetFallbackOnReply = undefined;
342371
this.#pubSub.reset();
343-
372+
344373
this.#waitingForReply.shift()!.resolve(reply);
345374
return;
346375
}
347-
376+
348377
this.#resetFallbackOnReply!(reply);
349378
}) as Decoder['onReply'];
350379

351380
this.#toWrite.push({
352381
args: ['RESET'],
353382
chainId,
354383
abort: undefined,
384+
timeout: undefined,
355385
resolve,
356386
reject,
357387
channelsCounter: undefined,
@@ -376,7 +406,7 @@ export default class RedisCommandsQueue {
376406
continue;
377407
}
378408

379-
// TODO reuse `toSend` or create new object?
409+
// TODO reuse `toSend` or create new object?
380410
(toSend as any).args = undefined;
381411
if (toSend.abort) {
382412
RedisCommandsQueue.#removeAbortListener(toSend);
@@ -385,7 +415,7 @@ export default class RedisCommandsQueue {
385415
this.#chainInExecution = toSend.chainId;
386416
toSend.chainId = undefined;
387417
this.#waitingForReply.push(toSend);
388-
418+
389419
yield encoded;
390420
toSend = this.#toWrite.shift();
391421
}
@@ -406,7 +436,7 @@ export default class RedisCommandsQueue {
406436
if (toBeSent.abort) {
407437
RedisCommandsQueue.#removeAbortListener(toBeSent);
408438
}
409-
439+
410440
toBeSent.reject(err);
411441
}
412442

packages/client/lib/client/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,7 @@ export default class RedisClient<
889889
return Promise.reject(new ClientOfflineError());
890890
}
891891

892+
console.log('sendCommand', args, options)
892893
const promise = this._self.#queue.addCommand<T>(args, options);
893894
this._self.#scheduleWrite();
894895
return promise;

0 commit comments

Comments
 (0)