Skip to content

Commit fcb3a01

Browse files
committed
cluster multi
1 parent 2fa33db commit fcb3a01

File tree

6 files changed

+373
-251
lines changed

6 files changed

+373
-251
lines changed

docs/v4-to-v5.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ legacyClient.set('key', 'value', (err, reply) => {
7171
TODO
7272
The `isolationPool` has been moved to it's on class `ClientPool`. You can create pool from a client using `client.createPool()`.
7373

74+
## Cluster MULTI
75+
76+
Cluster MULTI supports readonly/replicas
77+
`cluster.multi.addCommand` now requires `isReadonly` as the second argument, to match `cluster.sendCommand`
78+
79+
TODO
80+
7481
## Commands
7582

7683
Some command arguments/replies have changed to align more closely to data types returned by Redis:

packages/client/lib/client/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import RedisSocket, { RedisSocketOptions, RedisTlsSocketOptions } from './socket
33
import RedisCommandsQueue, { QueueCommandOptions } from './commands-queue';
44
import { EventEmitter } from 'events';
55
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
6-
import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '../errors';
6+
import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors';
77
import { URL } from 'url';
88
import { TcpSocketConnectOpts } from 'net';
99
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
@@ -692,7 +692,7 @@ export default class RedisClient<
692692
/**
693693
* @internal
694694
*/
695-
async executePipeline(commands: Array<RedisMultiQueuedCommand>) {
695+
executePipeline(commands: Array<RedisMultiQueuedCommand>) {
696696
if (!this._socket.isOpen) {
697697
return Promise.reject(new ClientClosedError());
698698
}

packages/client/lib/client/multi-command.ts

Lines changed: 42 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import COMMANDS from '../commands';
2-
import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command';
2+
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command';
33
import { ReplyWithFlags, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, Flags, ReplyUnion } from '../RESP/types';
44
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
55
import { RedisClientType } from '.';
@@ -84,64 +84,50 @@ export type RedisClientMultiCommandType<
8484
WithScripts<REPLIES, M, F, S, RESP, FLAGS>
8585
);
8686

87-
type MULTI_REPLY = {
88-
GENERIC: 'generic';
89-
TYPED: 'typed';
90-
};
91-
92-
type MultiReply = MULTI_REPLY[keyof MULTI_REPLY];
93-
94-
type ReplyType<T extends MultiReply, REPLIES> = T extends MULTI_REPLY['TYPED'] ? REPLIES : Array<unknown>;
95-
96-
export type RedisClientMultiExecutor = (
97-
queue: Array<RedisMultiQueuedCommand>,
98-
selectedDB?: number,
99-
chainId?: symbol
100-
) => Promise<Array<unknown>>;
101-
102-
export default class RedisClientMultiCommand<REPLIES = []> extends RedisMultiCommand {
103-
static #createCommand(command: Command, resp: RespVersions) {
87+
export default class RedisClientMultiCommand<REPLIES = []> {
88+
private static _createCommand(command: Command, resp: RespVersions) {
10489
const transformReply = getTransformReply(command, resp);
105-
return function (this: RedisClientMultiCommand) {
106-
return this.addCommand(
107-
command.transformArguments.apply(undefined, arguments as any),
90+
return function (this: RedisClientMultiCommand, ...args: Array<unknown>) {
91+
return this._multi.addCommand(
92+
command.transformArguments(...args),
10893
transformReply
10994
);
11095
};
11196
}
11297

113-
static #createModuleCommand(command: Command, resp: RespVersions) {
98+
private static _createModuleCommand(command: Command, resp: RespVersions) {
11499
const transformReply = getTransformReply(command, resp);
115-
return function (this: { self: RedisClientMultiCommand }) {
116-
return this.self.addCommand(
117-
command.transformArguments.apply(undefined, arguments as any),
100+
return function (this: { self: RedisClientMultiCommand }, ...args: Array<unknown>) {
101+
return this.self._multi.addCommand(
102+
command.transformArguments(...args),
118103
transformReply
119104
);
120105
};
121106
}
122107

123-
static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
108+
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
124109
const prefix = functionArgumentsPrefix(name, fn),
125110
transformReply = getTransformReply(fn, resp);
126-
return function (this: { self: RedisClientMultiCommand }) {
127-
const fnArgs = fn.transformArguments.apply(undefined, arguments as any),
128-
args: CommandArguments = prefix.concat(fnArgs);
129-
args.preserve = fnArgs.preserve;
130-
return this.self.addCommand(
131-
args,
111+
return function (this: { self: RedisClientMultiCommand }, ...args: Array<unknown>) {
112+
const fnArgs = fn.transformArguments(...args),
113+
redisArgs: CommandArguments = prefix.concat(fnArgs);
114+
redisArgs.preserve = fnArgs.preserve;
115+
return this.self._multi.addCommand(
116+
redisArgs,
132117
transformReply
133118
);
134119
};
135120
}
136121

137-
static #createScriptCommand(script: RedisScript, resp: RespVersions) {
122+
private static _createScriptCommand(script: RedisScript, resp: RespVersions) {
138123
const transformReply = getTransformReply(script, resp);
139-
return function (this: RedisClientMultiCommand) {
140-
return this.addScript(
124+
return function (this: RedisClientMultiCommand, ...args: Array<unknown>) {
125+
this._multi.addScript(
141126
script,
142-
script.transformArguments.apply(undefined, arguments as any),
127+
script.transformArguments(...args),
143128
transformReply
144129
);
130+
return this;
145131
};
146132
}
147133

@@ -154,35 +140,36 @@ export default class RedisClientMultiCommand<REPLIES = []> extends RedisMultiCom
154140
return attachConfig({
155141
BaseClass: RedisClientMultiCommand,
156142
commands: COMMANDS,
157-
createCommand: RedisClientMultiCommand.#createCommand,
158-
createModuleCommand: RedisClientMultiCommand.#createModuleCommand,
159-
createFunctionCommand: RedisClientMultiCommand.#createFunctionCommand,
160-
createScriptCommand: RedisClientMultiCommand.#createScriptCommand,
143+
createCommand: RedisClientMultiCommand._createCommand,
144+
createModuleCommand: RedisClientMultiCommand._createModuleCommand,
145+
createFunctionCommand: RedisClientMultiCommand._createFunctionCommand,
146+
createScriptCommand: RedisClientMultiCommand._createScriptCommand,
161147
config
162148
});
163149
}
164150

165-
readonly #client: RedisClientType;
166-
#selectedDB?: number;
151+
private readonly _multi = new RedisMultiCommand();
152+
private readonly _client: RedisClientType;
153+
private _selectedDB?: number;
167154

168155
constructor(client: RedisClientType) {
169-
super();
170-
this.#client = client;
156+
this._client = client;
171157
}
172158

173159
SELECT(db: number, transformReply?: TransformReply): this {
174-
this.#selectedDB = db;
175-
return this.addCommand(['SELECT', db.toString()], transformReply);
160+
this._selectedDB = db;
161+
this._multi.addCommand(['SELECT', db.toString()], transformReply);
162+
return this;
176163
}
177164

178165
select = this.SELECT;
179166

180-
async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false): Promise<ReplyType<T, REPLIES>> {
167+
async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false): Promise<MultiReplyType<T, REPLIES>> {
181168
if (execAsPipeline) return this.execAsPipeline<T>();
182169

183-
return this.transformReplies(
184-
await this.#client.executeMulti(this.queue, this.#selectedDB)
185-
) as ReplyType<T, REPLIES>;
170+
return this._multi.transformReplies(
171+
await this._client.executeMulti(this._multi.queue, this._selectedDB)
172+
) as MultiReplyType<T, REPLIES>;
186173
}
187174

188175
EXEC = this.exec;
@@ -191,12 +178,12 @@ export default class RedisClientMultiCommand<REPLIES = []> extends RedisMultiCom
191178
return this.exec<MULTI_REPLY['TYPED']>(execAsPipeline);
192179
}
193180

194-
async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>(): Promise<ReplyType<T, REPLIES>> {
195-
if (this.queue.length === 0) return [] as ReplyType<T, REPLIES>;
181+
async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>(): Promise<MultiReplyType<T, REPLIES>> {
182+
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
196183

197-
return this.transformReplies(
198-
await this.#client.executePipeline(this.queue)
199-
) as ReplyType<T, REPLIES>;
184+
return this._multi.transformReplies(
185+
await this._client.executePipeline(this._multi.queue)
186+
) as MultiReplyType<T, REPLIES>;
200187
}
201188

202189
execAsPipelineTyped() {

0 commit comments

Comments
 (0)