Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ createCluster({

> This is a common problem when using ElastiCache. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) for more information on that.

### Events

The Node Redis Cluster class extends Node.js’s EventEmitter and emits the following events:

| Name | When | Listener arguments |
| ----------------------- | ---------------------------------------------------------------------------------- | --------------------------------------------------------- |
| `connect` | The cluster has successfully connected and is ready to us | _No arguments_ |
| `disconnect` | The cluster has disconnected | _No arguments_ |
| `error` | The cluster has errored | `(error: Error)` |
| `node-ready` | A cluster node is ready to establish a connection | `(node: { host: string, port: number })` |
| `node-connect` | A cluster node has connected | `(node: { host: string, port: number })` |
| `node-reconnecting` | A cluster node is attempting to reconnect after an error | `(node: { host: string, port: number })` |
| `node-disconnect` | A cluster node has disconnected | `(node: { host: string, port: number })` |
| `node-error` | A cluster node has has errored (usually during TCP connection) | `(error: Error, node: { host: string, port: number })` |

> :warning: You **MUST** listen to `error` events. If a cluster doesn't have at least one `error` listener registered and
> an `error` occurs, that error will be thrown and the Node.js process will exit. See the [ > `EventEmitter` docs](https://nodejs.org/api/events.html#events_error_events) for more details.

## Command Routing

### Commands that operate on Redis Keys
Expand Down
34 changes: 23 additions & 11 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ type PubSubNode<
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = (
Omit<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'> &
Required<Pick<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'>>
);
Omit<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'> &
Required<Pick<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'>>
);

type PubSubToResubscribe = Record<
PUBSUB_TYPE['CHANNELS'] | PUBSUB_TYPE['PATTERNS'],
Expand Down Expand Up @@ -153,6 +153,7 @@ export default class RedisClusterSlots<
this.#isOpen = true;
try {
await this.#discoverWithRootNodes();
this.#emit('connect');
} catch (err) {
this.#isOpen = false;
throw err;
Expand Down Expand Up @@ -333,17 +334,26 @@ export default class RedisClusterSlots<
}

#createClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly = node.readonly) {
const socket =
this.#getNodeAddress(node.address) ??
{ host: node.host, port: node.port, };
const client = Object.freeze({
host: socket.host,
port: socket.port,
});
const emit = this.#emit;
return this.#clientFactory(
this.#clientOptionsDefaults({
clientSideCache: this.clientSideCache,
RESP: this.#options.RESP,
socket: this.#getNodeAddress(node.address) ?? {
host: node.host,
port: node.port
},
readonly
})
).on('error', err => console.error(err));
socket,
readonly,
}))
.on('error', error => emit('node-error', error, client))
.on('reconnecting', () => emit('node-reconnecting', client))
.once('ready', () => emit('node-ready', client))
.once('connect', () => emit('node-connect', client))
.once('end', () => emit('node-disconnect', client));
}

#createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
Expand Down Expand Up @@ -406,6 +416,7 @@ export default class RedisClusterSlots<

this.#resetSlots();
this.nodeByAddress.clear();
this.#emit('disconnect');
}

*#clients() {
Expand Down Expand Up @@ -443,6 +454,7 @@ export default class RedisClusterSlots<
this.nodeByAddress.clear();

await Promise.allSettled(promises);
this.#emit('disconnect');
}

getClient(
Expand Down Expand Up @@ -542,7 +554,7 @@ export default class RedisClusterSlots<
node = index < this.masters.length ?
this.masters[index] :
this.replicas[index - this.masters.length],
client = this.#createClient(node, false);
client = this.#createClient(node, false);

this.pubSubNode = {
address: node.address,
Expand Down
59 changes: 59 additions & 0 deletions packages/client/lib/cluster/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,63 @@ describe('Cluster', () => {
minimumDockerVersion: [7]
});
});

describe('clusterEvents', () => {
testUtils.testWithCluster('should fire events', async (cluster) => {
const log: string[] = [];
const numberOfMasters = 2;
const nodeConnect = numberOfMasters;
const nodeReady = nodeConnect + numberOfMasters;
const connect = nodeReady + 1;
const nodeDisconnect = connect + numberOfMasters;
const disconnect = nodeDisconnect + 1;

cluster
.on('connect', () => log.push('connect'))
.on('disconnect', () => log.push('disconnect'))
.on('error', () => log.push('error'))
.on('node-error', () => log.push('node-error'))
.on('node-reconnecting', () => log.push('node-reconnecting'))
.on('node-ready', () => log.push('node-ready'))
.on('node-connect', () => log.push('node-connect'))
.on('node-disconnect', () => log.push('node-disconnect'))

await cluster.connect();
cluster.destroy();

assert.equal(log.length, disconnect);

assert.deepEqual(
log.slice(0, nodeConnect),
new Array(numberOfMasters).fill('node-connect'),
);
assert.deepEqual(
log.slice(nodeConnect, nodeReady),
new Array(numberOfMasters).fill('node-ready'),
);
assert.deepEqual(
log.slice(nodeReady, connect),
new Array(1).fill('connect'),
);
assert.deepEqual(
log.slice(connect, nodeDisconnect),
new Array(numberOfMasters).fill('node-disconnect'),
);
assert.deepEqual(
log.slice(nodeDisconnect, disconnect),
new Array(1).fill('disconnect'),
);

assert.equal(log.includes('error'), false);
assert.equal(log.includes('node-error'), false);
assert.equal(log.includes('node-reconnecting'), false);

}, {
...GLOBAL.CLUSTERS.OPEN,
disableClusterSetup: true,
numberOfMasters: 2,
numberOfReplicas: 1,
});
});

});
5 changes: 5 additions & 0 deletions packages/test-utils/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ interface ClusterTestOptions<
clusterConfiguration?: Partial<RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>>;
numberOfMasters?: number;
numberOfReplicas?: number;
disableClusterSetup?: boolean;
}

interface AllTestOptions<
Expand Down Expand Up @@ -558,6 +559,10 @@ export default class TestUtils {
...options.clusterConfiguration
});

if(options.disableClusterSetup) {
return fn(cluster);
}

await cluster.connect();

try {
Expand Down
Loading