Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ createCluster({

> This is a common problem when using ElastiCache. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) for more information on that.

### Events

The Node Redis Cluster class extends Node.js’s EventEmitter and emits the following events:

| Name | When | Listener arguments |
| ----------------------- | ---------------------------------------------------------------------------------- | --------------------------------------------------------- |
| `connect` | The cluster has successfully connected and is ready to us | _No arguments_ |
| `disconnect` | The cluster has disconnected | _No arguments_ |
| `error` | The cluster has errored | `(error: Error)` |
| `node-ready` | A cluster node is ready to establish a connection | `(node: { host: string, port: number })` |
| `node-connect` | A cluster node has connected | `(node: { host: string, port: number })` |
| `node-reconnecting` | A cluster node is attempting to reconnect after an error | `(node: { host: string, port: number })` |
| `node-disconnect` | A cluster node has disconnected | `(node: { host: string, port: number })` |
| `node-error` | A cluster node has has errored (usually during TCP connection) | `(error: Error, node: { host: string, port: number })` |

> :warning: You **MUST** listen to `error` events. If a cluster doesn't have at least one `error` listener registered and
> an `error` occurs, that error will be thrown and the Node.js process will exit. See the [ > `EventEmitter` docs](https://nodejs.org/api/events.html#events_error_events) for more details.

## Command Routing

### Commands that operate on Redis Keys
Expand Down
34 changes: 23 additions & 11 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ type PubSubNode<
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = (
Omit<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'> &
Required<Pick<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'>>
);
Omit<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'> &
Required<Pick<Node<M, F, S, RESP, TYPE_MAPPING>, 'client'>>
);

type PubSubToResubscribe = Record<
PUBSUB_TYPE['CHANNELS'] | PUBSUB_TYPE['PATTERNS'],
Expand Down Expand Up @@ -153,6 +153,7 @@ export default class RedisClusterSlots<
this.#isOpen = true;
try {
await this.#discoverWithRootNodes();
this.#emit('connect');
} catch (err) {
this.#isOpen = false;
throw err;
Expand Down Expand Up @@ -333,17 +334,26 @@ export default class RedisClusterSlots<
}

#createClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly = node.readonly) {
const socket =
this.#getNodeAddress(node.address) ??
{ host: node.host, port: node.port, };
const client = Object.freeze({
host: socket.host,
port: socket.port,
});
const emit = this.#emit;
return this.#clientFactory(
this.#clientOptionsDefaults({
clientSideCache: this.clientSideCache,
RESP: this.#options.RESP,
socket: this.#getNodeAddress(node.address) ?? {
host: node.host,
port: node.port
},
readonly
})
).on('error', err => console.error(err));
socket,
readonly,
}))
.on('error', error => emit('node-error', error, client))
.on('reconnecting', () => emit('node-reconnecting', client))
.once('ready', () => emit('node-ready', client))
.once('connect', () => emit('node-connect', client))
.once('end', () => emit('node-disconnect', client));
}

#createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
Expand Down Expand Up @@ -406,6 +416,7 @@ export default class RedisClusterSlots<

this.#resetSlots();
this.nodeByAddress.clear();
this.#emit('disconnect');
}

*#clients() {
Expand Down Expand Up @@ -443,6 +454,7 @@ export default class RedisClusterSlots<
this.nodeByAddress.clear();

await Promise.allSettled(promises);
this.#emit('disconnect');
}

getClient(
Expand Down Expand Up @@ -542,7 +554,7 @@ export default class RedisClusterSlots<
node = index < this.masters.length ?
this.masters[index] :
this.replicas[index - this.masters.length],
client = this.#createClient(node, false);
client = this.#createClient(node, false);

this.pubSubNode = {
address: node.address,
Expand Down
59 changes: 59 additions & 0 deletions packages/client/lib/cluster/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,63 @@ describe('Cluster', () => {
minimumDockerVersion: [7]
});
});

describe('clusterEvents', () => {
testUtils.testWithCluster('should fire events', async (cluster) => {
const log: string[] = [];
const numberOfMasters = 2;
const nodeConnect = numberOfMasters;
const nodeReady = nodeConnect + numberOfMasters;
const connect = nodeReady + 1;
const nodeDisconnect = connect + numberOfMasters;
const disconnect = nodeDisconnect + 1;

cluster
.on('connect', () => log.push('connect'))
.on('disconnect', () => log.push('disconnect'))
.on('error', () => log.push('error'))
.on('node-error', () => log.push('node-error'))
.on('node-reconnecting', () => log.push('node-reconnecting'))
.on('node-ready', () => log.push('node-ready'))
.on('node-connect', () => log.push('node-connect'))
.on('node-disconnect', () => log.push('node-disconnect'))

await cluster.connect();
cluster.destroy();

assert.equal(log.length, disconnect);

assert.deepEqual(
log.slice(0, nodeConnect),
new Array(numberOfMasters).fill('node-connect'),
);
assert.deepEqual(
log.slice(nodeConnect, nodeReady),
new Array(numberOfMasters).fill('node-ready'),
);
assert.deepEqual(
log.slice(nodeReady, connect),
new Array(1).fill('connect'),
);
assert.deepEqual(
log.slice(connect, nodeDisconnect),
new Array(numberOfMasters).fill('node-disconnect'),
);
assert.deepEqual(
log.slice(nodeDisconnect, disconnect),
new Array(1).fill('disconnect'),
);

assert.equal(log.includes('error'), false);
assert.equal(log.includes('node-error'), false);
assert.equal(log.includes('node-reconnecting'), false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simplify this by just asserting the log is equal to the expected log. For example:

assert.deepEqual(log, [
  'node-connect',
  'node-connect',
  'node-connect',
  'node-ready',
  'node-ready',
  'node-ready',
  'connect',
  'node-disconnect',
  'node-disconnect',
  'node-disconnect',
  'disconnect'
]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Total agree with it. I will make a change.
If I get it correctly, if we have 2 masters it will be 2 event for each node-* event.
So the test should be
[
'node-connect',
'node-connect',
'node-ready',
'node-ready',
'connect',
'node-disconnect',
'node-disconnect',
'disconnect',
]
Correct me please if i'm wrong.


}, {
...GLOBAL.CLUSTERS.OPEN,
disableClusterSetup: true,
numberOfMasters: 2,
numberOfReplicas: 1,
});
});

});
5 changes: 5 additions & 0 deletions packages/test-utils/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ interface ClusterTestOptions<
clusterConfiguration?: Partial<RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>>;
numberOfMasters?: number;
numberOfReplicas?: number;
disableClusterSetup?: boolean;
}

interface AllTestOptions<
Expand Down Expand Up @@ -558,6 +559,10 @@ export default class TestUtils {
...options.clusterConfiguration
});

if(options.disableClusterSetup) {
return fn(cluster);
}

await cluster.connect();

try {
Expand Down
Loading