Skip to content

Commit 1ac1480

Browse files
committed
feat: implemented destroy method on UDPClusterManager
1 parent de06d8d commit 1ac1480

File tree

5 files changed

+131
-103
lines changed

5 files changed

+131
-103
lines changed

src/ClusterManager.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* <[email protected]> to get commercial licensing options.
2323
*/
2424
import { IMessageQueueConnection, IServerInput } from './IMessageQueue';
25+
import { uuid } from './uuid';
2526

2627
export interface ICluster {
2728
add: (server: IServerInput) => void;
@@ -31,12 +32,39 @@ export interface ICluster {
3132
) => T | undefined;
3233
}
3334

35+
export interface InitializedCluster extends ICluster {
36+
id: string;
37+
}
38+
3439
export abstract class ClusterManager {
35-
protected clusters: ICluster[] = [];
40+
protected clusters: InitializedCluster[] = [];
3641

3742
protected constructor() {}
3843

39-
public init(cluster: ICluster): void {
40-
this.clusters.push(cluster);
44+
public init(cluster: ICluster): InitializedCluster {
45+
const initializedCluster = Object.assign(cluster, { id: uuid() });
46+
47+
this.clusters.push(initializedCluster);
48+
49+
return initializedCluster;
4150
}
51+
52+
public async remove(
53+
cluster: string | InitializedCluster,
54+
destroy: boolean = true,
55+
): Promise<void> {
56+
const id = typeof cluster === 'string' ? cluster : cluster.id;
57+
58+
this.clusters = this.clusters.filter(cluster => cluster.id !== id);
59+
60+
if (
61+
this.clusters.length === 0
62+
&& destroy
63+
&& typeof this.destroy === 'function'
64+
) {
65+
await this.destroy();
66+
}
67+
}
68+
69+
public abstract destroy(): Promise<void>;
4270
}

src/ClusteredRedisQueue.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
IServerInput,
3737
copyEventEmitter,
3838
} from '.';
39+
import { InitializedCluster } from './ClusterManager';
3940

4041
interface ClusterServer extends IMessageQueueConnection {
4142
imq?: RedisQueue;
@@ -130,6 +131,8 @@ export class ClusteredRedisQueue implements IMessageQueue,
130131
subscription: null,
131132
};
132133

134+
private initializedClusters: InitializedCluster[] = [];
135+
133136
/**
134137
* Class constructor
135138
*
@@ -167,11 +170,11 @@ export class ClusteredRedisQueue implements IMessageQueue,
167170

168171
if (this.options.clusterManagers?.length) {
169172
for (const manager of this.options.clusterManagers) {
170-
manager.init({
173+
this.initializedClusters.push(manager.init({
171174
add: this.addServer.bind(this),
172175
remove: this.removeServer.bind(this),
173176
find: this.findServer.bind(this),
174-
});
177+
}));
175178
}
176179
}
177180
}
@@ -247,7 +250,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
247250
}
248251

249252
/**
250-
* Safely destroys current queue, unregistered all set event
253+
* Safely destroys the current queue, unregistered all set event
251254
* listeners and connections.
252255
* Supposed to be an async function.
253256
*
@@ -258,6 +261,16 @@ export class ClusteredRedisQueue implements IMessageQueue,
258261

259262
await this.batch('destroy',
260263
'Destroying clustered redis message queue...');
264+
265+
if (!this.options.clusterManagers?.length) {
266+
return;
267+
}
268+
269+
for await (const manager of this.options.clusterManagers) {
270+
for await (const cluster of this.initializedClusters) {
271+
await manager.remove(cluster);
272+
}
273+
}
261274
}
262275

263276
// noinspection JSUnusedGlobalSymbols

src/RedisQueue.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,7 @@ export class RedisQueue extends EventEmitter<EventMap>
931931
await this.processKeys(keys, now);
932932

933933
if (cursor === '0') {
934-
return ;
934+
return;
935935
}
936936
} catch (err) {
937937
this.emitError('OnSafeDelivery',
@@ -954,7 +954,7 @@ export class RedisQueue extends EventEmitter<EventMap>
954954
*/
955955
private async processKeys(keys: string[], now: number): Promise<void> {
956956
if (!keys.length) {
957-
return ;
957+
return;
958958
}
959959

960960
for (const key of keys) {
@@ -1393,5 +1393,4 @@ export class RedisQueue extends EventEmitter<EventMap>
13931393
}
13941394
});
13951395
}
1396-
13971396
}

0 commit comments

Comments
 (0)