Skip to content

Commit fd526fa

Browse files
committed
POC partial implementation of request-response routing and aggregation
1 parent 8d6b6d8 commit fd526fa

File tree

3 files changed

+286
-52
lines changed

3 files changed

+286
-52
lines changed

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,20 @@ export default class RedisClusterSlots<
445445
await Promise.allSettled(promises);
446446
}
447447

448+
getAllClients() {
449+
return Array.from(this.#clients());
450+
}
451+
452+
getAllMasterClients() {
453+
const result = [];
454+
for (const master of this.masters) {
455+
if (master.client) {
456+
result.push(master.client);
457+
}
458+
}
459+
return result;
460+
}
461+
448462
getClient(
449463
firstKey: RedisArgument | undefined,
450464
isReadonly: boolean | undefined

packages/client/lib/cluster/index.ts

Lines changed: 146 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ import { PubSubListener } from '../client/pub-sub';
1010
import { ErrorReply } from '../errors';
1111
import { RedisTcpSocketOptions } from '../client/socket';
1212
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
13-
import { BasicCommandParser } from '../client/parser';
13+
import { BasicCommandParser, CommandParser } from '../client/parser';
1414
import { ASKING_CMD } from '../commands/ASKING';
1515
import SingleEntryCache from '../single-entry-cache'
16-
import { POLICIES, PolicyResolver, StaticPolicyResolver } from './request-response-policies';
16+
import { POLICIES, PolicyResolver, REQUEST_POLICIES_WITH_DEFAULTS, RESPONSE_POLICIES_WITH_DEFAULTS, StaticPolicyResolver } from './request-response-policies';
17+
import { aggregateLogicalAnd, aggregateLogicalOr, aggregateMax, aggregateMerge, aggregateMin, aggregateSum } from './request-response-policies/generic-aggregators';
1718
interface ClusterCommander<
1819
M extends RedisModules,
1920
F extends RedisFunctions,
@@ -190,10 +191,9 @@ export default class RedisCluster<
190191
command.parseCommand(parser, ...args);
191192

192193
return this._self._execute(
193-
parser.firstKey,
194+
parser,
194195
command.IS_READ_ONLY,
195196
this._commandOptions,
196-
parser.commandName!,
197197
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
198198
);
199199
};
@@ -207,10 +207,9 @@ export default class RedisCluster<
207207
command.parseCommand(parser, ...args);
208208

209209
return this._self._execute(
210-
parser.firstKey,
210+
parser,
211211
command.IS_READ_ONLY,
212212
this._self._commandOptions,
213-
parser.commandName!,
214213
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
215214
);
216215
};
@@ -226,10 +225,9 @@ export default class RedisCluster<
226225
fn.parseCommand(parser, ...args);
227226

228227
return this._self._execute(
229-
parser.firstKey,
228+
parser,
230229
fn.IS_READ_ONLY,
231230
this._self._commandOptions,
232-
parser.commandName!,
233231
(client, opts) => client._executeCommand(fn, parser, opts, transformReply)
234232
);
235233
};
@@ -245,10 +243,9 @@ export default class RedisCluster<
245243
script.parseCommand(parser, ...args);
246244

247245
return this._self._execute(
248-
parser.firstKey,
246+
parser,
249247
script.IS_READ_ONLY,
250248
this._commandOptions,
251-
parser.commandName!,
252249
(client, opts) => client._executeScript(script, parser, opts, transformReply)
253250
);
254251
};
@@ -459,64 +456,157 @@ export default class RedisCluster<
459456
}
460457

461458
async _execute<T>(
462-
firstKey: RedisArgument | undefined,
459+
parser: CommandParser,
463460
isReadonly: boolean | undefined,
464461
options: ClusterCommandOptions | undefined,
465-
commandName: string,
466462
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, opts?: ClusterCommandOptions) => Promise<T>
467463
): Promise<T> {
468464

469465
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
470-
const policyResult = this._policyResolver.resolvePolicy(commandName)
471466

472-
if(policyResult.ok) {
473-
//TODO
474-
} else {
475-
//TODO
467+
const policyResult = this._policyResolver.resolvePolicy(parser.commandIdentifier);
468+
469+
if(!policyResult.ok) {
470+
throw new Error(`Policy resolution error for ${parser.commandIdentifier}: ${policyResult.error}`);
476471
}
477-
478-
let client = await this._slots.getClient(firstKey, isReadonly);
479-
let i = 0;
480472

481-
let myFn = fn;
473+
const requestPolicy = policyResult.value.request
474+
const responsePolicy = policyResult.value.response
475+
476+
let clients: Array<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>;
477+
// https://redis.io/docs/latest/develop/reference/command-tips
478+
switch (requestPolicy) {
479+
480+
case REQUEST_POLICIES_WITH_DEFAULTS.ALL_NODES:
481+
clients = this._slots.getAllClients()
482+
break;
483+
484+
case REQUEST_POLICIES_WITH_DEFAULTS.ALL_SHARDS:
485+
clients = this._slots.getAllMasterClients()
486+
break;
487+
488+
case REQUEST_POLICIES_WITH_DEFAULTS.MULTI_SHARD:
489+
clients = await Promise.all(
490+
parser.keys.map((key) => this._slots.getClient(key, isReadonly))
491+
);
492+
break;
493+
494+
case REQUEST_POLICIES_WITH_DEFAULTS.SPECIAL:
495+
throw new Error(`Special request policy not implemented for ${parser.commandIdentifier}`);
496+
497+
case REQUEST_POLICIES_WITH_DEFAULTS.DEFAULT_KEYLESS:
498+
//TODO handle undefined case?
499+
clients = [this._slots.getRandomNode().client!]
500+
break;
501+
502+
case REQUEST_POLICIES_WITH_DEFAULTS.DEFAULT_KEYED:
503+
clients = [await this._slots.getClient(parser.firstKey, isReadonly)]
504+
break;
505+
506+
default:
507+
throw new Error(`Unknown request policy ${requestPolicy}`);
482508

483-
while (true) {
484-
try {
485-
return await myFn(client, options);
486-
} catch (err) {
487-
myFn = fn;
509+
}
488510

489-
// TODO: error class
490-
if (++i > maxCommandRedirections || !(err instanceof Error)) {
491-
throw err;
492-
}
511+
const responsePromises = clients.map(async client => {
493512

494-
if (err.message.startsWith('ASK')) {
495-
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
496-
let redirectTo = await this._slots.getMasterByAddress(address);
497-
if (!redirectTo) {
498-
await this._slots.rediscover(client);
499-
redirectTo = await this._slots.getMasterByAddress(address);
513+
let i = 0;
514+
515+
let myFn = fn;
516+
517+
while (true) {
518+
try {
519+
return await myFn(client, options);
520+
} catch (err) {
521+
myFn = fn;
522+
523+
// TODO: error class
524+
if (++i > maxCommandRedirections || !(err instanceof Error)) {
525+
throw err;
500526
}
501527

502-
if (!redirectTo) {
503-
throw new Error(`Cannot find node ${address}`);
528+
if (err.message.startsWith('ASK')) {
529+
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
530+
let redirectTo = await this._slots.getMasterByAddress(address);
531+
if (!redirectTo) {
532+
await this._slots.rediscover(client);
533+
redirectTo = await this._slots.getMasterByAddress(address);
534+
}
535+
536+
if (!redirectTo) {
537+
throw new Error(`Cannot find node ${address}`);
538+
}
539+
540+
client = redirectTo;
541+
myFn = this._handleAsk(fn);
542+
continue;
543+
}
544+
545+
if (err.message.startsWith('MOVED')) {
546+
await this._slots.rediscover(client);
547+
client = await this._slots.getClient(parser.firstKey, isReadonly);
548+
continue;
504549
}
505550

506-
client = redirectTo;
507-
myFn = this._handleAsk(fn);
508-
continue;
509-
}
510-
511-
if (err.message.startsWith('MOVED')) {
512-
await this._slots.rediscover(client);
513-
client = await this._slots.getClient(firstKey, isReadonly);
514-
continue;
515-
}
551+
throw err;
552+
}
553+
}
516554

517-
throw err;
518-
}
555+
})
556+
557+
switch (responsePolicy) {
558+
case RESPONSE_POLICIES_WITH_DEFAULTS.ONE_SUCCEEDED: {
559+
return Promise.any(responsePromises);
560+
}
561+
562+
case RESPONSE_POLICIES_WITH_DEFAULTS.ALL_SUCCEEDED: {
563+
const responses = await Promise.all(responsePromises);
564+
return responses[0]
565+
}
566+
567+
case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_LOGICAL_AND: {
568+
const responses = await Promise.all(responsePromises)
569+
return aggregateLogicalAnd(responses);
570+
}
571+
572+
case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_LOGICAL_OR: {
573+
const responses = await Promise.all(responsePromises)
574+
return aggregateLogicalOr(responses);
575+
}
576+
577+
case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_MIN: {
578+
const responses = await Promise.all(responsePromises);
579+
return aggregateMin(responses);
580+
}
581+
582+
case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_MAX: {
583+
const responses = await Promise.all(responsePromises);
584+
return aggregateMax(responses);
585+
}
586+
587+
case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_SUM: {
588+
const responses = await Promise.all(responsePromises);
589+
return aggregateSum(responses);
590+
}
591+
592+
case RESPONSE_POLICIES_WITH_DEFAULTS.SPECIAL: {
593+
throw new Error(`Special response policy not implemented for ${parser.commandIdentifier}`);
594+
}
595+
596+
case RESPONSE_POLICIES_WITH_DEFAULTS.DEFAULT_KEYLESS: {
597+
const responses = await Promise.all(responsePromises);
598+
return aggregateMerge(responses);
599+
}
600+
601+
case RESPONSE_POLICIES_WITH_DEFAULTS.DEFAULT_KEYED: {
602+
const responses = await Promise.all(responsePromises);
603+
return responses as T;
604+
}
605+
606+
default:
607+
throw new Error(`Unknown response policy ${responsePolicy}`);
519608
}
609+
520610
}
521611

522612
async sendCommand<T = ReplyUnion>(
@@ -526,11 +616,15 @@ export default class RedisCluster<
526616
options?: ClusterCommandOptions,
527617
// defaultPolicies?: CommandPolicies
528618
): Promise<T> {
619+
620+
const parser = new BasicCommandParser();
621+
firstKey && parser.push(firstKey)
622+
args.forEach(arg => parser.push(arg));
623+
529624
return this._self._execute(
530-
firstKey,
625+
parser,
531626
isReadonly,
532627
options,
533-
args[0] instanceof Buffer ? args[0].toString() : args[0],
534628
(client, opts) => client.sendCommand(args, opts)
535629
);
536630
}

0 commit comments

Comments
 (0)