diff --git a/docs/clustering.md b/docs/clustering.md index 3e4f8446b6..4afd95afd2 100644 --- a/docs/clustering.md +++ b/docs/clustering.md @@ -120,6 +120,24 @@ createCluster({ > This is a common problem when using ElastiCache. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) for more information on that. +### Events + +The Node Redis Cluster class extends Node.js’s EventEmitter and emits the following events: + +| Name | When | Listener arguments | +| ----------------------- | ---------------------------------------------------------------------------------- | --------------------------------------------------------- | +| `connect` | The cluster has successfully connected and is ready to us | _No arguments_ | +| `disconnect` | The cluster has disconnected | _No arguments_ | +| `error` | The cluster has errored | `(error: Error)` | +| `node-ready` | A cluster node is ready to establish a connection | `(node: { host: string, port: number })` | +| `node-connect` | A cluster node has connected | `(node: { host: string, port: number })` | +| `node-reconnecting` | A cluster node is attempting to reconnect after an error | `(node: { host: string, port: number })` | +| `node-disconnect` | A cluster node has disconnected | `(node: { host: string, port: number })` | +| `node-error` | A cluster node has has errored (usually during TCP connection) | `(error: Error, node: { host: string, port: number })` | + +> :warning: You **MUST** listen to `error` events. If a cluster doesn't have at least one `error` listener registered and +> an `error` occurs, that error will be thrown and the Node.js process will exit. See the [ > `EventEmitter` docs](https://nodejs.org/api/events.html#events_error_events) for more details. + ## Command Routing ### Commands that operate on Redis Keys diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 9c75b3ab4b..737413677e 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -80,9 +80,9 @@ type PubSubNode< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = ( - Omit, 'client'> & - Required, 'client'>> -); + Omit, 'client'> & + Required, 'client'>> + ); type PubSubToResubscribe = Record< PUBSUB_TYPE['CHANNELS'] | PUBSUB_TYPE['PATTERNS'], @@ -153,6 +153,7 @@ export default class RedisClusterSlots< this.#isOpen = true; try { await this.#discoverWithRootNodes(); + this.#emit('connect'); } catch (err) { this.#isOpen = false; throw err; @@ -333,17 +334,26 @@ export default class RedisClusterSlots< } #createClient(node: ShardNode, readonly = node.readonly) { + const socket = + this.#getNodeAddress(node.address) ?? + { host: node.host, port: node.port, }; + const client = Object.freeze({ + host: socket.host, + port: socket.port, + }); + const emit = this.#emit; return this.#clientFactory( this.#clientOptionsDefaults({ clientSideCache: this.clientSideCache, RESP: this.#options.RESP, - socket: this.#getNodeAddress(node.address) ?? { - host: node.host, - port: node.port - }, - readonly - }) - ).on('error', err => console.error(err)); + socket, + readonly, + })) + .on('error', error => emit('node-error', error, client)) + .on('reconnecting', () => emit('node-reconnecting', client)) + .once('ready', () => emit('node-ready', client)) + .once('connect', () => emit('node-connect', client)) + .once('end', () => emit('node-disconnect', client)); } #createNodeClient(node: ShardNode, readonly?: boolean) { @@ -406,6 +416,7 @@ export default class RedisClusterSlots< this.#resetSlots(); this.nodeByAddress.clear(); + this.#emit('disconnect'); } *#clients() { @@ -443,6 +454,7 @@ export default class RedisClusterSlots< this.nodeByAddress.clear(); await Promise.allSettled(promises); + this.#emit('disconnect'); } getClient( @@ -542,7 +554,7 @@ export default class RedisClusterSlots< node = index < this.masters.length ? this.masters[index] : this.replicas[index - this.masters.length], - client = this.#createClient(node, false); + client = this.#createClient(node, false); this.pubSubNode = { address: node.address, diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index 4db5f32e85..e32bf4fa85 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -339,4 +339,63 @@ describe('Cluster', () => { minimumDockerVersion: [7] }); }); + + describe('clusterEvents', () => { + testUtils.testWithCluster('should fire events', async (cluster) => { + const log: string[] = []; + const numberOfMasters = 2; + const nodeConnect = numberOfMasters; + const nodeReady = nodeConnect + numberOfMasters; + const connect = nodeReady + 1; + const nodeDisconnect = connect + numberOfMasters; + const disconnect = nodeDisconnect + 1; + + cluster + .on('connect', () => log.push('connect')) + .on('disconnect', () => log.push('disconnect')) + .on('error', () => log.push('error')) + .on('node-error', () => log.push('node-error')) + .on('node-reconnecting', () => log.push('node-reconnecting')) + .on('node-ready', () => log.push('node-ready')) + .on('node-connect', () => log.push('node-connect')) + .on('node-disconnect', () => log.push('node-disconnect')) + + await cluster.connect(); + cluster.destroy(); + + assert.equal(log.length, disconnect); + + assert.deepEqual( + log.slice(0, nodeConnect), + new Array(numberOfMasters).fill('node-connect'), + ); + assert.deepEqual( + log.slice(nodeConnect, nodeReady), + new Array(numberOfMasters).fill('node-ready'), + ); + assert.deepEqual( + log.slice(nodeReady, connect), + new Array(1).fill('connect'), + ); + assert.deepEqual( + log.slice(connect, nodeDisconnect), + new Array(numberOfMasters).fill('node-disconnect'), + ); + assert.deepEqual( + log.slice(nodeDisconnect, disconnect), + new Array(1).fill('disconnect'), + ); + + assert.equal(log.includes('error'), false); + assert.equal(log.includes('node-error'), false); + assert.equal(log.includes('node-reconnecting'), false); + + }, { + ...GLOBAL.CLUSTERS.OPEN, + disableClusterSetup: true, + numberOfMasters: 2, + numberOfReplicas: 1, + }); + }); + }); diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 64b9abc7f4..117946a183 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -116,6 +116,7 @@ interface ClusterTestOptions< clusterConfiguration?: Partial>; numberOfMasters?: number; numberOfReplicas?: number; + disableClusterSetup?: boolean; } interface AllTestOptions< @@ -558,6 +559,10 @@ export default class TestUtils { ...options.clusterConfiguration }); + if(options.disableClusterSetup) { + return fn(cluster); + } + await cluster.connect(); try {