Skip to content

Commit 2ca6e1f

Browse files
committed
add support for MULTI in ClientPool
1 parent 6549fa4 commit 2ca6e1f

File tree

5 files changed

+64
-46
lines changed

5 files changed

+64
-46
lines changed

packages/client/lib/client/index.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -794,8 +794,12 @@ export default class RedisClient<
794794
return execResult as Array<unknown>;
795795
}
796796

797-
MULTI(): RedisClientMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {
798-
return new (this as any).Multi(this);
797+
MULTI() {
798+
type Multi = new (...args: ConstructorParameters<typeof RedisClientMultiCommand>) => RedisClientMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;;
799+
return new ((this as any).Multi as Multi)(
800+
this._executeMulti.bind(this),
801+
this._executePipeline.bind(this)
802+
);
799803
}
800804

801805
multi = this.MULTI;

packages/client/lib/client/multi-command.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import COMMANDS from '../commands';
2-
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command';
2+
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType, RedisMultiQueuedCommand } from '../multi-command';
33
import { ReplyWithTypeMapping, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, TypeMapping } from '../RESP/types';
44
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
5-
import { RedisClientType } from '.';
65

76
type CommandSignature<
87
REPLIES extends Array<unknown>,
@@ -84,6 +83,10 @@ export type RedisClientMultiCommandType<
8483
WithScripts<REPLIES, M, F, S, RESP, TYPE_MAPPING>
8584
);
8685

86+
type ExecuteMulti = (commands: Array<RedisMultiQueuedCommand>, selectedDB?: number) => Promise<Array<unknown>>;
87+
88+
type ExecutePipeline = (commands: Array<RedisMultiQueuedCommand>) => Promise<Array<unknown>>;
89+
8790
export default class RedisClientMultiCommand<REPLIES = []> {
8891
private static _createCommand(command: Command, resp: RespVersions) {
8992
const transformReply = getTransformReply(command, resp);
@@ -149,11 +152,14 @@ export default class RedisClientMultiCommand<REPLIES = []> {
149152
}
150153

151154
private readonly _multi = new RedisMultiCommand();
152-
private readonly _client: RedisClientType;
155+
private readonly _executeMulti: ExecuteMulti;
156+
private readonly _executePipeline: ExecutePipeline;
153157
private _selectedDB?: number;
154158

155-
constructor(client: RedisClientType) {
156-
this._client = client;
159+
constructor(executeMulti: ExecuteMulti, executePipeline: ExecutePipeline) {
160+
this._executeMulti = executeMulti;
161+
this._executePipeline = executePipeline;
162+
// this._client = client;
157163
}
158164

159165
SELECT(db: number, transformReply?: TransformReply): this {
@@ -173,7 +179,7 @@ export default class RedisClientMultiCommand<REPLIES = []> {
173179
if (execAsPipeline) return this.execAsPipeline<T>();
174180

175181
return this._multi.transformReplies(
176-
await this._client._executeMulti(this._multi.queue, this._selectedDB)
182+
await this._executeMulti(this._multi.queue, this._selectedDB)
177183
) as MultiReplyType<T, REPLIES>;
178184
}
179185

@@ -187,7 +193,7 @@ export default class RedisClientMultiCommand<REPLIES = []> {
187193
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
188194

189195
return this._multi.transformReplies(
190-
await this._client._executePipeline(this._multi.queue)
196+
await this._executePipeline(this._multi.queue)
191197
) as MultiReplyType<T, REPLIES>;
192198
}
193199

packages/client/lib/client/pool.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-l
66
import { TimeoutError } from '../errors';
77
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
88
import { CommandOptions } from './commands-queue';
9+
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
910

1011
export interface RedisPoolOptions {
1112
/**
@@ -118,7 +119,6 @@ export class RedisClientPool<
118119
clientOptions?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>,
119120
options?: Partial<RedisPoolOptions>
120121
) {
121-
// @ts-ignore
122122
const Pool = attachConfig({
123123
BaseClass: RedisClientPool,
124124
commands: COMMANDS,
@@ -129,6 +129,8 @@ export class RedisClientPool<
129129
config: clientOptions
130130
});
131131

132+
Pool.prototype.Multi = RedisClientMultiCommand.extend(clientOptions);
133+
132134
// returning a "proxy" to prevent the namespaces.self to leak between "proxies"
133135
return Object.create(
134136
new Pool(
@@ -327,8 +329,8 @@ export class RedisClientPool<
327329
this._returnClient(node);
328330
}
329331

330-
execute<T>(fn: PoolTask<M, F, S, RESP, TYPE_MAPPING, T>): Promise<T> {
331-
return new Promise<T>((resolve, reject) => {
332+
execute<T>(fn: PoolTask<M, F, S, RESP, TYPE_MAPPING, T>) {
333+
return new Promise<Awaited<T>>((resolve, reject) => {
332334
const client = this._idleClients.shift(),
333335
{ tail } = this._tasksQueue;
334336
if (!client) {
@@ -425,6 +427,16 @@ export class RedisClientPool<
425427
return this.execute(client => client.executeScript(script, args, options));
426428
}
427429

430+
MULTI() {
431+
type Multi = new (...args: ConstructorParameters<typeof RedisClientMultiCommand>) => RedisClientMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;
432+
return new ((this as any).Multi as Multi)(
433+
(commands, selectedDB) => this.execute(client => client._executeMulti(commands, selectedDB)),
434+
commands => this.execute(client => client._executePipeline(commands))
435+
);
436+
}
437+
438+
multi = this.MULTI;
439+
428440
async close() {
429441
if (this._isClosing) return; // TODO: throw err?
430442
if (!this._isOpen) return; // TODO: throw err?

packages/client/lib/cluster/index.ts

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -504,33 +504,17 @@ export default class RedisCluster<
504504
);
505505
}
506506

507-
/**
508-
* @internal
509-
*/
510-
async _executePipeline(
511-
firstKey: RedisArgument | undefined,
512-
isReadonly: boolean | undefined,
513-
commands: Array<RedisMultiQueuedCommand>
514-
) {
515-
const client = await this._slots.getClient(firstKey, isReadonly);
516-
return client._executePipeline(commands);
517-
}
518-
519-
/**
520-
* @internal
521-
*/
522-
async _executeMulti(
523-
firstKey: RedisArgument | undefined,
524-
isReadonly: boolean | undefined,
525-
commands: Array<RedisMultiQueuedCommand>
526-
) {
527-
const client = await this._slots.getClient(firstKey, isReadonly);
528-
return client._executeMulti(commands);
529-
}
530-
531-
MULTI(routing?: RedisArgument): RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {
532-
return new (this as any).Multi(
533-
this,
507+
MULTI(routing?: RedisArgument) {
508+
type Multi = new (...args: ConstructorParameters<typeof RedisClusterMultiCommand>) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;
509+
return new ((this as any).Multi as Multi)(
510+
async (firstKey, isReadonly, commands) => {
511+
const client = await this._slots.getClient(firstKey, isReadonly);
512+
return client._executeMulti(commands);
513+
},
514+
async (firstKey, isReadonly, commands) => {
515+
const client = await this._slots.getClient(firstKey, isReadonly);
516+
return client._executePipeline(commands);
517+
},
534518
routing
535519
);
536520
}

packages/client/lib/cluster/multi-command.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import COMMANDS from '../commands';
2-
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command';
2+
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType, RedisMultiQueuedCommand } from '../multi-command';
33
import { ReplyWithTypeMapping, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, TypeMapping, RedisArgument } from '../RESP/types';
44
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
5-
import RedisCluster, { RedisClusterType } from '.';
5+
import RedisCluster from '.';
66

77
type CommandSignature<
88
REPLIES extends Array<unknown>,
@@ -84,6 +84,12 @@ export type RedisClusterMultiCommandType<
8484
WithScripts<REPLIES, M, F, S, RESP, TYPE_MAPPING>
8585
);
8686

87+
export type ClusterMultiExecute = (
88+
firstKey: RedisArgument | undefined,
89+
isReadonly: boolean | undefined,
90+
commands: Array<RedisMultiQueuedCommand>
91+
) => Promise<Array<unknown>>;
92+
8793
export default class RedisClusterMultiCommand<REPLIES = []> {
8894
private static _createCommand(command: Command, resp: RespVersions) {
8995
const transformReply = getTransformReply(command, resp);
@@ -181,12 +187,18 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
181187
}
182188

183189
private readonly _multi = new RedisMultiCommand();
184-
private readonly _cluster: RedisClusterType;
190+
private readonly _executeMulti: ClusterMultiExecute;
191+
private readonly _executePipeline: ClusterMultiExecute;
185192
private _firstKey: RedisArgument | undefined;
186193
private _isReadonly: boolean | undefined = true;
187194

188-
constructor(cluster: RedisClusterType, routing: RedisArgument | undefined) {
189-
this._cluster = cluster;
195+
constructor(
196+
executeMulti: ClusterMultiExecute,
197+
executePipeline: ClusterMultiExecute,
198+
routing: RedisArgument | undefined
199+
) {
200+
this._executeMulti = executeMulti;
201+
this._executePipeline = executePipeline;
190202
this._firstKey = routing;
191203
}
192204

@@ -213,7 +225,7 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
213225
if (execAsPipeline) return this.execAsPipeline<T>();
214226

215227
return this._multi.transformReplies(
216-
await this._cluster._executeMulti(
228+
await this._executeMulti(
217229
this._firstKey,
218230
this._isReadonly,
219231
this._multi.queue
@@ -231,7 +243,7 @@ export default class RedisClusterMultiCommand<REPLIES = []> {
231243
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
232244

233245
return this._multi.transformReplies(
234-
await this._cluster._executePipeline(
246+
await this._executePipeline(
235247
this._firstKey,
236248
this._isReadonly,
237249
this._multi.queue

0 commit comments

Comments
 (0)