Skip to content

Commit 1875813

Browse files
committed
cluster/node evets (#1855)
1 parent 073db12 commit 1875813

File tree

1 file changed

+22
-11
lines changed

1 file changed

+22
-11
lines changed

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ type PubSubNode<
8080
RESP extends RespVersions,
8181
TYPE_MAPPING extends TypeMapping
8282
> = (
83-
Omit<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'> &
84-
Required<Pick<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'>>
85-
);
83+
Omit<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'> &
84+
Required<Pick<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'>>
85+
);
8686

8787
type PubSubToResubscribe = Record<
8888
PUBSUB_TYPE['CHANNELS'] | PUBSUB_TYPE['PATTERNS'],
@@ -153,6 +153,7 @@ export default class RedisClusterSlots<
153153
this.#isOpen = true;
154154
try {
155155
await this.#discoverWithRootNodes();
156+
this.#emit('connect');
156157
} catch (err) {
157158
this.#isOpen = false;
158159
throw err;
@@ -333,17 +334,26 @@ export default class RedisClusterSlots<
333334
}
334335

335336
#createClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly = node.readonly) {
337+
const socket =
338+
this.#getNodeAddress(node.address) ??
339+
{ host: node.host, port: node.port, };
340+
const client = Object.freeze({
341+
host: socket.host,
342+
port: socket.port,
343+
});
344+
const emit = this.#emit;
336345
return this.#clientFactory(
337346
this.#clientOptionsDefaults({
338347
clientSideCache: this.clientSideCache,
339348
RESP: this.#options.RESP,
340-
socket: this.#getNodeAddress(node.address) ?? {
341-
host: node.host,
342-
port: node.port
343-
},
344-
readonly
345-
})
346-
).on('error', err => console.error(err));
349+
socket,
350+
readonly,
351+
}))
352+
.on('error', error => emit('node-error', error, client))
353+
.on('reconnecting', () => emit('node-reconnecting', client))
354+
.once('ready', () => emit('node-ready', client))
355+
.once('connect', () => emit('node-connect', client))
356+
.once('end', () => emit('node-disconnect', client));
347357
}
348358

349359
#createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
@@ -443,6 +453,7 @@ export default class RedisClusterSlots<
443453
this.nodeByAddress.clear();
444454

445455
await Promise.allSettled(promises);
456+
this.#emit('disconnect');
446457
}
447458

448459
getClient(
@@ -542,7 +553,7 @@ export default class RedisClusterSlots<
542553
node = index < this.masters.length ?
543554
this.masters[index] :
544555
this.replicas[index - this.masters.length],
545-
client = this.#createClient(node, false);
556+
client = this.#createClient(node, false);
546557

547558
this.pubSubNode = {
548559
address: node.address,

0 commit comments

Comments
 (0)