Skip to content

Commit dc094ea

Browse files
committed
WIP
1 parent 0e89c66 commit dc094ea

File tree

11 files changed

+441
-303
lines changed

11 files changed

+441
-303
lines changed

packages/client/lib/RESP/decoder.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ export class Decoder {
555555
this,
556556
length - slice.length,
557557
chunks,
558+
skip,
558559
flag
559560
);
560561
}

packages/client/lib/RESP/encoder.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { RedisArgument } from "./types";
1+
import { RedisArgument } from './types';
22

33
const CRLF = '\r\n';
44

packages/client/lib/client/commands-queue.ts

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import Queue, { QueueNode } from './queue';
1+
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
22
import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_FLAGS, RESP_TYPES } from '../RESP/decoder';
44
import { CommandArguments, Flags, ReplyUnion, RespVersions } from '../RESP/types';
@@ -7,16 +7,19 @@ import { AbortError, ErrorReply } from '../errors';
77
import { EventEmitter } from 'stream';
88

99
export interface QueueCommandOptions {
10-
asap?: boolean;
1110
chainId?: symbol;
12-
signal?: AbortSignal;
11+
asap?: boolean;
12+
abortSignal?: AbortSignal;
1313
flags?: Flags;
1414
}
1515

1616
export interface CommandWaitingToBeSent extends CommandWaitingForReply {
1717
args: CommandArguments;
1818
chainId?: symbol;
19-
removeAbortListener?(): void;
19+
abort?: {
20+
signal: AbortSignal;
21+
listener: () => unknown;
22+
};
2023
}
2124

2225
interface CommandWaitingForReply {
@@ -37,8 +40,8 @@ const RESP2_PUSH_FLAGS = {
3740

3841
export default class RedisCommandsQueue {
3942
private readonly _maxLength: number | null | undefined;
40-
private readonly _waitingToBeSent = new Queue<CommandWaitingToBeSent>();
41-
private readonly _waitingForReply = new Queue<CommandWaitingForReply>();
43+
private readonly _waitingToBeSent = new DoublyLinkedList<CommandWaitingToBeSent>();
44+
private readonly _waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
4245
private readonly _onShardedChannelMoved: OnShardedChannelMoved;
4346

4447
private readonly _pubSub = new PubSub();
@@ -149,30 +152,31 @@ export default class RedisCommandsQueue {
149152
addCommand<T>(args: CommandArguments, options?: QueueCommandOptions): Promise<T> {
150153
if (this._maxLength && this._waitingToBeSent.length + this._waitingForReply.length >= this._maxLength) {
151154
return Promise.reject(new Error('The queue is full'));
152-
} else if (options?.signal?.aborted) {
155+
} else if (options?.abortSignal?.aborted) {
153156
return Promise.reject(new AbortError());
154157
}
155158

156159
return new Promise((resolve, reject) => {
157-
let node: QueueNode<CommandWaitingToBeSent>;
160+
let node: DoublyLinkedNode<CommandWaitingToBeSent>;
158161
const value: CommandWaitingToBeSent = {
159162
args,
160163
chainId: options?.chainId,
161164
flags: options?.flags,
162165
resolve,
163166
reject,
164-
removeAbortListener: undefined
167+
abort: undefined
165168
};
166169

167-
const signal = options?.signal;
170+
const signal = options?.abortSignal;
168171
if (signal) {
169-
const listener = () => {
170-
this._waitingToBeSent.remove(node);
171-
value.reject(new AbortError());
172+
value.abort = {
173+
signal,
174+
listener: () => {
175+
this._waitingToBeSent.remove(node);
176+
value.reject(new AbortError());
177+
}
172178
};
173-
174-
value.removeAbortListener = () => signal.removeEventListener('abort', listener);
175-
signal.addEventListener('abort', listener, { once: true });
179+
signal.addEventListener('abort', value.abort.listener, { once: true });
176180
}
177181

178182
node = options?.asap ?
@@ -264,13 +268,15 @@ export default class RedisCommandsQueue {
264268
return;
265269
}
266270

267-
// TODO
268-
// reuse `toSend`
269-
(toSend.args as any) = undefined;
270-
if (toSend.removeAbortListener) {
271-
toSend.removeAbortListener();
272-
(toSend.removeAbortListener as any) = undefined;
271+
if (toSend.abort) {
272+
RedisCommandsQueue._removeAbortListener(toSend);
273+
toSend.abort = undefined;
273274
}
275+
276+
// TODO reuse `toSend` or create new object?
277+
(toSend as any).args = undefined;
278+
(toSend as any).chainId = undefined;
279+
274280
this._waitingForReply.push(toSend);
275281
this._chainInExecution = toSend.chainId;
276282
return encoded;
@@ -282,9 +288,16 @@ export default class RedisCommandsQueue {
282288
}
283289
}
284290

285-
static #flushWaitingToBeSent(command: CommandWaitingToBeSent, err: Error) {
286-
command.removeAbortListener?.();
287-
command.reject(err);
291+
private static _removeAbortListener(command: CommandWaitingToBeSent) {
292+
command.abort!.signal.removeEventListener('abort', command.abort!.listener);
293+
}
294+
295+
private static _flushWaitingToBeSent(toBeSent: CommandWaitingToBeSent, err: Error) {
296+
if (toBeSent.abort) {
297+
RedisCommandsQueue._removeAbortListener(toBeSent);
298+
}
299+
300+
toBeSent.reject(err);
288301
}
289302

290303
flushWaitingForReply(err: Error): void {
@@ -296,7 +309,7 @@ export default class RedisCommandsQueue {
296309
if (!this._chainInExecution) return;
297310

298311
while (this._waitingToBeSent.head?.value.chainId === this._chainInExecution) {
299-
RedisCommandsQueue.#flushWaitingToBeSent(
312+
RedisCommandsQueue._flushWaitingToBeSent(
300313
this._waitingToBeSent.shift()!,
301314
err
302315
);
@@ -310,7 +323,7 @@ export default class RedisCommandsQueue {
310323
this._pubSub.reset();
311324
this.#flushWaitingForReply(err);
312325
while (this._waitingToBeSent.head) {
313-
RedisCommandsQueue.#flushWaitingToBeSent(
326+
RedisCommandsQueue._flushWaitingToBeSent(
314327
this._waitingToBeSent.shift()!,
315328
err
316329
);

packages/client/lib/client/index.ts

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -137,31 +137,31 @@ export default class RedisClient<
137137
> extends EventEmitter {
138138
private static _createCommand(command: Command, resp: RespVersions) {
139139
const transformReply = getTransformReply(command, resp);
140-
return async function (this: ProxyClient) {
141-
const args = command.transformArguments.apply(undefined, arguments as any),
142-
reply = await this.sendCommand(args, this.commandOptions);
140+
return async function (this: ProxyClient, ...args: Array<unknown>) {
141+
const redisArgs = command.transformArguments(...args),
142+
reply = await this.sendCommand(redisArgs, this.commandOptions);
143143
return transformReply ?
144-
transformReply(reply, args.preserve) :
144+
transformReply(reply, redisArgs.preserve) :
145145
reply;
146146
};
147147
}
148148

149149
private static _createModuleCommand(command: Command, resp: RespVersions) {
150150
const transformReply = getTransformReply(command, resp);
151-
return async function (this: NamespaceProxyClient) {
152-
const args = command.transformArguments.apply(undefined, arguments as any),
153-
reply = await this.self.sendCommand(args, this.self.commandOptions);
151+
return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
152+
const redisArgs = command.transformArguments(...args),
153+
reply = await this.self.sendCommand(redisArgs, this.self.commandOptions);
154154
return transformReply ?
155-
transformReply(reply, args.preserve) :
155+
transformReply(reply, redisArgs.preserve) :
156156
reply;
157157
};
158158
}
159159

160160
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
161161
const prefix = functionArgumentsPrefix(name, fn),
162162
transformReply = getTransformReply(fn, resp);
163-
return async function (this: NamespaceProxyClient) {
164-
const fnArgs = fn.transformArguments.apply(undefined, arguments as any),
163+
return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
164+
const fnArgs = fn.transformArguments(...args),
165165
reply = await this.self.sendCommand(
166166
prefix.concat(fnArgs),
167167
this.self.commandOptions
@@ -175,15 +175,15 @@ export default class RedisClient<
175175
private static _createScriptCommand(script: RedisScript, resp: RespVersions) {
176176
const prefix = scriptArgumentsPrefix(script),
177177
transformReply = getTransformReply(script, resp);
178-
return async function (this: ProxyClient) {
179-
const scriptArgs = script.transformArguments.apply(undefined, arguments as any),
180-
args = prefix.concat(scriptArgs),
181-
reply = await this.sendCommand(args, this.commandOptions).catch((err: unknown) => {
178+
return async function (this: ProxyClient, ...args: Array<unknown>) {
179+
const scriptArgs = script.transformArguments(...args),
180+
redisArgs = prefix.concat(scriptArgs),
181+
reply = await this.sendCommand(redisArgs, this.commandOptions).catch((err: unknown) => {
182182
if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err;
183183

184184
args[0] = 'EVAL';
185185
args[1] = script.SCRIPT;
186-
return this.sendCommand(args, this.commandOptions);
186+
return this.sendCommand(redisArgs, this.commandOptions);
187187
});
188188
return transformReply ?
189189
transformReply(reply, scriptArgs.preserve) :
@@ -470,6 +470,10 @@ export default class RedisClient<
470470
return this._commandOptionsProxy('flags', flags);
471471
}
472472

473+
withAbortSignal(abortSignal: AbortSignal) {
474+
return this._commandOptionsProxy('abortSignal', abortSignal);
475+
}
476+
473477
/**
474478
* Override the `asap` command option to `true`
475479
*/
@@ -792,8 +796,6 @@ export default class RedisClient<
792796
return Promise.resolve(this.destroy());
793797
}
794798

795-
private _resolveClose?: () => unknown;
796-
797799
/**
798800
* Close the client. Wait for pending replies.
799801
*/
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import { SinglyLinkedList, DoublyLinkedList } from './linked-list';
2+
import { equal, deepEqual } from 'assert/strict';
3+
4+
describe.only('DoublyLinkedList', () => {
5+
const list = new DoublyLinkedList();
6+
7+
it('should start empty', () => {
8+
equal(list.length, 0);
9+
equal(list.head, undefined);
10+
equal(list.tail, undefined);
11+
deepEqual(Array.from(list), []);
12+
});
13+
14+
it('shift empty', () => {
15+
equal(list.shift(), undefined);
16+
equal(list.length, 0);
17+
deepEqual(Array.from(list), []);
18+
});
19+
20+
it('push 1', () => {
21+
list.push(1);
22+
equal(list.length, 1);
23+
deepEqual(Array.from(list), [1]);
24+
});
25+
26+
it('push 2', () => {
27+
list.push(2);
28+
equal(list.length, 2);
29+
deepEqual(Array.from(list), [1, 2]);
30+
});
31+
32+
it('unshift 0', () => {
33+
list.unshift(0);
34+
equal(list.length, 3);
35+
deepEqual(Array.from(list), [0, 1, 2]);
36+
});
37+
38+
it('remove middle node', () => {
39+
list.remove(list.head!.next!);
40+
equal(list.length, 2);
41+
deepEqual(Array.from(list), [0, 2]);
42+
});
43+
44+
it('remove head', () => {
45+
list.remove(list.head!);
46+
equal(list.length, 1);
47+
deepEqual(Array.from(list), [2]);
48+
});
49+
50+
it('remove tail', () => {
51+
list.remove(list.tail!);
52+
equal(list.length, 0);
53+
deepEqual(Array.from(list), []);
54+
});
55+
56+
it('unshift empty queue', () => {
57+
list.unshift(0);
58+
equal(list.length, 1);
59+
deepEqual(Array.from(list), [0]);
60+
});
61+
62+
it('push 1', () => {
63+
list.push(1);
64+
equal(list.length, 2);
65+
deepEqual(Array.from(list), [0, 1]);
66+
});
67+
68+
it('shift', () => {
69+
equal(list.shift(), 0);
70+
equal(list.length, 1);
71+
deepEqual(Array.from(list), [1]);
72+
});
73+
74+
it('shift last element', () => {
75+
equal(list.shift(), 1);
76+
equal(list.length, 0);
77+
deepEqual(Array.from(list), []);
78+
});
79+
});
80+
81+
describe.only('SinglyLinkedList', () => {
82+
const list = new SinglyLinkedList();
83+
84+
it('should start empty', () => {
85+
equal(list.length, 0);
86+
equal(list.head, undefined);
87+
equal(list.tail, undefined);
88+
deepEqual(Array.from(list), []);
89+
});
90+
91+
it('shift empty', () => {
92+
equal(list.shift(), undefined);
93+
equal(list.length, 0);
94+
deepEqual(Array.from(list), []);
95+
});
96+
97+
it('push 1', () => {
98+
list.push(1);
99+
equal(list.length, 1);
100+
deepEqual(Array.from(list), [1]);
101+
});
102+
103+
it('push 2', () => {
104+
list.push(2);
105+
equal(list.length, 2);
106+
deepEqual(Array.from(list), [1, 2]);
107+
});
108+
109+
it('push 3', () => {
110+
list.push(3);
111+
equal(list.length, 3);
112+
deepEqual(Array.from(list), [1, 2, 3]);
113+
});
114+
115+
it('shift 1', () => {
116+
equal(list.shift(), 1);
117+
equal(list.length, 2);
118+
deepEqual(Array.from(list), [2, 3]);
119+
});
120+
121+
it('shift 2', () => {
122+
equal(list.shift(), 2);
123+
equal(list.length, 1);
124+
deepEqual(Array.from(list), [3]);
125+
});
126+
127+
it('shift 3', () => {
128+
equal(list.shift(), 3);
129+
equal(list.length, 0);
130+
deepEqual(Array.from(list), []);
131+
});
132+
133+
it('should be empty', () => {
134+
equal(list.length, 0);
135+
equal(list.head, undefined);
136+
equal(list.tail, undefined);
137+
});
138+
});

0 commit comments

Comments
 (0)