Skip to content

Commit 8f88eb2

Browse files
committed
fix ASK and MOVED errors in multi as well
1 parent 29ff6c8 commit 8f88eb2

File tree

1 file changed

+55
-61
lines changed

1 file changed

+55
-61
lines changed

packages/client/lib/cluster/index.ts

Lines changed: 55 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import COMMANDS from './commands';
2-
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands';
2+
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands';
33
import { ClientCommandOptions, RedisClientCommandSignature, RedisClientOptions, RedisClientType, WithModules, WithScripts } from '../client';
44
import RedisClusterSlots, { ClusterNode } from './cluster-slots';
55
import { extendWithModulesAndScripts, transformCommandArguments, transformCommandReply, extendWithCommands } from '../commander';
@@ -82,27 +82,17 @@ export default class RedisCluster<M extends RedisModules, S extends RedisScripts
8282
);
8383
}
8484

85-
async sendCommand<C extends RedisCommand>(
85+
async sendCommand<T = RedisCommandRawReply>(
8686
firstKey: RedisCommandArgument | undefined,
8787
isReadonly: boolean | undefined,
8888
args: RedisCommandArguments,
89-
options?: ClientCommandOptions,
90-
redirections = 0
91-
): Promise<RedisCommandReply<C>> {
92-
const client = this.#slots.getClient(firstKey, isReadonly);
93-
94-
try {
95-
return await client.sendCommand(args, options);
96-
} catch (err: any) {
97-
const shouldRetry = await this.#handleCommandError(err, client, redirections);
98-
if (shouldRetry === true) {
99-
return this.sendCommand(firstKey, isReadonly, args, options, redirections + 1);
100-
} else if (shouldRetry) {
101-
return shouldRetry.sendCommand(args, options);
102-
}
103-
104-
throw err;
105-
}
89+
options?: ClientCommandOptions
90+
): Promise<T> {
91+
return this.#execute(
92+
firstKey,
93+
isReadonly,
94+
client => client.sendCommand<T>(args, options)
95+
);
10696
}
10797

10898
async scriptsExecutor(script: RedisScript, args: Array<unknown>): Promise<RedisCommandReply<typeof script>> {
@@ -124,61 +114,65 @@ export default class RedisCluster<M extends RedisModules, S extends RedisScripts
124114
script: RedisScript,
125115
originalArgs: Array<unknown>,
126116
redisArgs: RedisCommandArguments,
127-
options?: ClientCommandOptions,
128-
redirections = 0
117+
options?: ClientCommandOptions
129118
): Promise<RedisCommandReply<typeof script>> {
130-
const client = this.#slots.getClient(
119+
return this.#execute(
131120
RedisCluster.extractFirstKey(script, originalArgs, redisArgs),
132-
script.IS_READ_ONLY
121+
script.IS_READ_ONLY,
122+
client => client.executeScript(script, redisArgs, options)
133123
);
134-
135-
try {
136-
return await client.executeScript(script, redisArgs, options);
137-
} catch (err: any) {
138-
const shouldRetry = await this.#handleCommandError(err, client, redirections);
139-
if (shouldRetry === true) {
140-
return this.executeScript(script, originalArgs, redisArgs, options, redirections + 1);
141-
} else if (shouldRetry) {
142-
return shouldRetry.executeScript(script, redisArgs, options);
143-
}
144-
145-
throw err;
146-
}
147124
}
148125

149-
async #handleCommandError(err: Error, client: RedisClientType<M, S>, redirections: number): Promise<boolean | RedisClientType<M, S>> {
150-
if (redirections > (this.#options.maxCommandRedirections ?? 16)) {
151-
throw err;
152-
}
153-
154-
if (err.message.startsWith('ASK')) {
155-
const url = err.message.substring(err.message.lastIndexOf(' ') + 1);
156-
let node = this.#slots.getNodeByUrl(url);
157-
if (!node) {
158-
await this.#slots.rediscover(client);
159-
node = this.#slots.getNodeByUrl(url);
126+
async #execute<Reply>(
127+
firstKey: RedisCommandArgument | undefined,
128+
isReadonly: boolean | undefined,
129+
executor: (client: RedisClientType<M, S>) => Promise<Reply>
130+
): Promise<Reply> {
131+
const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16;
132+
let client = this.#slots.getClient(firstKey, isReadonly);
133+
for (let i = 0;; i++) {
134+
try {
135+
return await executor(client);
136+
} catch (err) {
137+
if (++i > maxCommandRedirections || !(err instanceof Error)) {
138+
throw err;
139+
}
160140

161-
if (!node) {
162-
throw new Error(`Cannot find node ${url}`);
141+
if (err.message.startsWith('ASK')) {
142+
const url = err.message.substring(err.message.lastIndexOf(' ') + 1);
143+
if (this.#slots.getNodeByUrl(url)?.client === client) {
144+
await client.asking();
145+
continue;
146+
}
147+
148+
await this.#slots.rediscover(client);
149+
const redirectTo = this.#slots.getNodeByUrl(url);
150+
if (!redirectTo) {
151+
throw new Error(`Cannot find node ${url}`);
152+
}
153+
154+
await redirectTo.client.asking();
155+
client = redirectTo.client;
156+
continue;
157+
} else if (err.message.startsWith('MOVED')) {
158+
await this.#slots.rediscover(client);
159+
client = this.#slots.getClient(firstKey, isReadonly);
160+
continue;
163161
}
164-
}
165162

166-
await node.client.asking();
167-
return node.client;
168-
} else if (err.message.startsWith('MOVED')) {
169-
await this.#slots.rediscover(client);
170-
return true;
163+
throw err;
164+
}
171165
}
172-
173-
throw err;
174166
}
175167

176168
multi(routing?: RedisCommandArgument): RedisClusterMultiCommandType<M, S> {
177169
return new this.#Multi(
178-
async (commands: Array<RedisMultiQueuedCommand>, firstKey?: RedisCommandArgument, chainId?: symbol) => {
179-
return this.#slots
180-
.getClient(firstKey)
181-
.multiExecutor(commands, chainId);
170+
(commands: Array<RedisMultiQueuedCommand>, firstKey?: RedisCommandArgument, chainId?: symbol) => {
171+
return this.#execute(
172+
firstKey,
173+
false,
174+
client => client.multiExecutor(commands, chainId)
175+
);
182176
},
183177
routing
184178
);

0 commit comments

Comments
 (0)