Skip to content

Commit 42e36df

Browse files
committed
enhance cluster reshard handling
1 parent 6946e36 commit 42e36df

File tree

4 files changed

+76
-28
lines changed

4 files changed

+76
-28
lines changed

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,8 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
4242
throw new Error('None of the root nodes is available');
4343
}
4444

45-
async discover(startWith: RedisClientType<M, S>): Promise<void> {
46-
if (await this.#discoverNodes(startWith.options)) return;
47-
48-
for (const { client } of this.#nodeByUrl.values()) {
49-
if (client === startWith) continue;
50-
51-
if (await this.#discoverNodes(client.options)) return;
52-
}
53-
54-
throw new Error('None of the cluster nodes is available');
55-
}
56-
5745
async #discoverNodes(clientOptions?: RedisClusterClientOptions): Promise<boolean> {
58-
const client = new this.#Client(clientOptions);
46+
const client = this.#initiateClient(clientOptions);
5947

6048
await client.connect();
6149

@@ -72,6 +60,29 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
7260
}
7361
}
7462

63+
#runningRediscoverPromise?: Promise<void>;
64+
65+
async rediscover(startWith: RedisClientType<M, S>): Promise<void> {
66+
if (!this.#runningRediscoverPromise) {
67+
this.#runningRediscoverPromise = this.#rediscover(startWith)
68+
.finally(() => this.#runningRediscoverPromise = undefined);
69+
}
70+
71+
return this.#runningRediscoverPromise;
72+
}
73+
74+
async #rediscover(startWith: RedisClientType<M, S>): Promise<void> {
75+
if (await this.#discoverNodes(startWith.options)) return;
76+
77+
for (const { client } of this.#nodeByUrl.values()) {
78+
if (client === startWith) continue;
79+
80+
if (await this.#discoverNodes(client.options)) return;
81+
}
82+
83+
throw new Error('None of the cluster nodes is available');
84+
}
85+
7586
async #reset(masters: Array<RedisClusterMasterNode>): Promise<void> {
7687
// Override this.#slots and add not existing clients to this.#nodeByUrl
7788
const promises: Array<Promise<void>> = [],
@@ -103,18 +114,23 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
103114
await Promise.all(promises);
104115
}
105116

106-
#clientOptionsDefaults(options: RedisClusterClientOptions): RedisClusterClientOptions {
117+
#clientOptionsDefaults(options?: RedisClusterClientOptions): RedisClusterClientOptions | undefined {
107118
if (!this.#options.defaults) return options;
108119

109120
const merged = Object.assign({}, this.#options.defaults, options);
110121

111-
if (options.socket && this.#options.defaults.socket) {
122+
if (options?.socket && this.#options.defaults.socket) {
112123
Object.assign({}, this.#options.defaults.socket, options.socket);
113124
}
114125

115126
return merged;
116127
}
117128

129+
#initiateClient(options?: RedisClusterClientOptions): RedisClientType<M, S> {
130+
return new this.#Client(this.#clientOptionsDefaults(options))
131+
.on('error', this.#onError);
132+
}
133+
118134
#initiateClientForNode(nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set<string>, promises: Array<Promise<void>>): ClusterNode<M, S> {
119135
const url = `${nodeData.host}:${nodeData.port}`;
120136
clientsInUse.add(url);
@@ -123,15 +139,13 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
123139
if (!node) {
124140
node = {
125141
id: nodeData.id,
126-
client: new this.#Client(
127-
this.#clientOptionsDefaults({
128-
socket: {
129-
host: nodeData.host,
130-
port: nodeData.port
131-
},
132-
readonly
133-
})
134-
)
142+
client: this.#initiateClient({
143+
socket: {
144+
host: nodeData.host,
145+
port: nodeData.port
146+
},
147+
readonly
148+
})
135149
};
136150
promises.push(node.client.connect());
137151
this.#nodeByUrl.set(url, node);

packages/client/lib/cluster/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ export default class RedisCluster<M extends RedisModules = Record<string, never>
157157
const url = err.message.substring(err.message.lastIndexOf(' ') + 1);
158158
let node = this.#slots.getNodeByUrl(url);
159159
if (!node) {
160-
await this.#slots.discover(client);
160+
await this.#slots.rediscover(client);
161161
node = this.#slots.getNodeByUrl(url);
162162

163163
if (!node) {
@@ -168,7 +168,7 @@ export default class RedisCluster<M extends RedisModules = Record<string, never>
168168
await node.client.asking();
169169
return node.client;
170170
} else if (err.message.startsWith('MOVED')) {
171-
await this.#slots.discover(client);
171+
await this.#slots.rediscover(client);
172172
return true;
173173
}
174174

packages/client/lib/commands/CLUSTER_NODES.spec.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,31 @@ describe('CLUSTER NODES', () => {
4848
);
4949
});
5050

51+
it('should support urls without cport', () => {
52+
assert.deepEqual(
53+
transformReply(
54+
'id 127.0.0.1:30001 master - 0 0 0 connected 0-16384\n'
55+
),
56+
[{
57+
id: 'id',
58+
url: '127.0.0.1:30001',
59+
host: '127.0.0.1',
60+
port: 30001,
61+
cport: null,
62+
flags: ['master'],
63+
pingSent: 0,
64+
pongRecv: 0,
65+
configEpoch: 0,
66+
linkState: RedisClusterNodeLinkStates.CONNECTED,
67+
slots: [{
68+
from: 0,
69+
to: 16384
70+
}],
71+
replicas: []
72+
}]
73+
);
74+
});
75+
5176
it.skip('with importing slots', () => {
5277
assert.deepEqual(
5378
transformReply(

packages/client/lib/commands/CLUSTER_NODES.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export enum RedisClusterNodeLinkStates {
1010
interface RedisClusterNodeTransformedUrl {
1111
host: string;
1212
port: number;
13-
cport: number;
13+
cport: number | null;
1414
}
1515

1616
export interface RedisClusterReplicaNode extends RedisClusterNodeTransformedUrl {
@@ -86,7 +86,16 @@ export function transformReply(reply: string): Array<RedisClusterMasterNode> {
8686

8787
function transformNodeUrl(url: string): RedisClusterNodeTransformedUrl {
8888
const indexOfColon = url.indexOf(':'),
89-
indexOfAt = url.indexOf('@', indexOfColon);
89+
indexOfAt = url.indexOf('@', indexOfColon),
90+
host = url.substring(0, indexOfColon);
91+
92+
if (indexOfAt === -1) {
93+
return {
94+
host,
95+
port: Number(url.substring(indexOfColon + 1)),
96+
cport: null
97+
};
98+
}
9099

91100
return {
92101
host: url.substring(0, indexOfColon),

0 commit comments

Comments
 (0)