Skip to content

Commit 91be6ac

Browse files
committed
wip
1 parent c188a5c commit 91be6ac

File tree

6 files changed

+130
-127
lines changed

6 files changed

+130
-127
lines changed

packages/client/lib/RESP/decoder.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ interface DecoderOptions {
4949
}
5050

5151
export class Decoder {
52-
private _config;
52+
private readonly _config;
5353

5454
private _cursor = 0;
5555

packages/client/lib/client/commands-queue.ts

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, Pu
66
import { AbortError, ErrorReply } from '../errors';
77
import { EventEmitter } from 'stream';
88

9-
export interface QueueCommandOptions {
9+
export interface CommandOptions {
1010
chainId?: symbol;
1111
asap?: boolean;
1212
abortSignal?: AbortSignal;
@@ -149,7 +149,7 @@ export default class RedisCommandsQueue {
149149
});
150150
}
151151

152-
addCommand<T>(args: CommandArguments, options?: QueueCommandOptions): Promise<T> {
152+
addCommand<T>(args: CommandArguments, options?: CommandOptions): Promise<T> {
153153
if (this._maxLength && this._waitingToBeSent.length + this._waitingForReply.length >= this._maxLength) {
154154
return Promise.reject(new Error('The queue is full'));
155155
} else if (options?.abortSignal?.aborted) {
@@ -256,30 +256,32 @@ export default class RedisCommandsQueue {
256256
});
257257
}
258258

259-
getCommandToSend(): CommandArguments | undefined {
260-
const toSend = this._waitingToBeSent.shift();
261-
if (!toSend) return;
262-
263-
let encoded: CommandArguments;
264-
try {
265-
encoded = encodeCommand(toSend.args);
266-
} catch (err) {
267-
toSend.reject(err);
268-
return;
269-
}
259+
*waitingToBeSent() {
260+
let toSend = this._waitingToBeSent.shift();
261+
while (toSend) {
262+
let encoded: CommandArguments;
263+
try {
264+
encoded = encodeCommand(toSend.args);
265+
} catch (err) {
266+
toSend.reject(err);
267+
toSend = this._waitingToBeSent.shift();
268+
continue;
269+
}
270270

271-
if (toSend.abort) {
272-
RedisCommandsQueue._removeAbortListener(toSend);
273-
toSend.abort = undefined;
271+
if (toSend.abort) {
272+
RedisCommandsQueue._removeAbortListener(toSend);
273+
toSend.abort = undefined;
274+
}
275+
276+
// TODO reuse `toSend` or create new object?
277+
(toSend as any).args = undefined;
278+
(toSend as any).chainId = undefined;
279+
280+
this._waitingForReply.push(toSend);
281+
this._chainInExecution = toSend.chainId;
282+
yield encoded;
283+
toSend = this._waitingToBeSent.shift();
274284
}
275-
276-
// TODO reuse `toSend` or create new object?
277-
(toSend as any).args = undefined;
278-
(toSend as any).chainId = undefined;
279-
280-
this._waitingForReply.push(toSend);
281-
this._chainInExecution = toSend.chainId;
282-
return encoded;
283285
}
284286

285287
private _flushWaitingForReply(err: Error): void {

0 commit comments

Comments
 (0)