Skip to content

Commit 6223e25

Browse files
committed
Merge branch 'code-improvements'
2 parents 566c213 + 1ac1480 commit 6223e25

File tree

10 files changed

+549
-67
lines changed

10 files changed

+549
-67
lines changed

src/ClusterManager.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* <[email protected]> to get commercial licensing options.
2323
*/
2424
import { IMessageQueueConnection, IServerInput } from './IMessageQueue';
25+
import { uuid } from './uuid';
2526

2627
export interface ICluster {
2728
add: (server: IServerInput) => void;
@@ -31,12 +32,39 @@ export interface ICluster {
3132
) => T | undefined;
3233
}
3334

35+
export interface InitializedCluster extends ICluster {
36+
id: string;
37+
}
38+
3439
export abstract class ClusterManager {
35-
protected clusters: ICluster[] = [];
40+
protected clusters: InitializedCluster[] = [];
3641

3742
protected constructor() {}
3843

39-
public init(cluster: ICluster): void {
40-
this.clusters.push(cluster);
44+
public init(cluster: ICluster): InitializedCluster {
45+
const initializedCluster = Object.assign(cluster, { id: uuid() });
46+
47+
this.clusters.push(initializedCluster);
48+
49+
return initializedCluster;
4150
}
51+
52+
public async remove(
53+
cluster: string | InitializedCluster,
54+
destroy: boolean = true,
55+
): Promise<void> {
56+
const id = typeof cluster === 'string' ? cluster : cluster.id;
57+
58+
this.clusters = this.clusters.filter(cluster => cluster.id !== id);
59+
60+
if (
61+
this.clusters.length === 0
62+
&& destroy
63+
&& typeof this.destroy === 'function'
64+
) {
65+
await this.destroy();
66+
}
67+
}
68+
69+
public abstract destroy(): Promise<void>;
4270
}

src/ClusteredRedisQueue.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
IServerInput,
3737
copyEventEmitter,
3838
} from '.';
39+
import { InitializedCluster } from './ClusterManager';
3940

4041
interface ClusterServer extends IMessageQueueConnection {
4142
imq?: RedisQueue;
@@ -130,6 +131,8 @@ export class ClusteredRedisQueue implements IMessageQueue,
130131
subscription: null,
131132
};
132133

134+
private initializedClusters: InitializedCluster[] = [];
135+
133136
/**
134137
* Class constructor
135138
*
@@ -167,11 +170,11 @@ export class ClusteredRedisQueue implements IMessageQueue,
167170

168171
if (this.options.clusterManagers?.length) {
169172
for (const manager of this.options.clusterManagers) {
170-
manager.init({
173+
this.initializedClusters.push(manager.init({
171174
add: this.addServer.bind(this),
172175
remove: this.removeServer.bind(this),
173176
find: this.findServer.bind(this),
174-
});
177+
}));
175178
}
176179
}
177180
}
@@ -247,7 +250,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
247250
}
248251

249252
/**
250-
* Safely destroys current queue, unregistered all set event
253+
* Safely destroys the current queue, unregistered all set event
251254
* listeners and connections.
252255
* Supposed to be an async function.
253256
*
@@ -258,6 +261,16 @@ export class ClusteredRedisQueue implements IMessageQueue,
258261

259262
await this.batch('destroy',
260263
'Destroying clustered redis message queue...');
264+
265+
if (!this.options.clusterManagers?.length) {
266+
return;
267+
}
268+
269+
for await (const manager of this.options.clusterManagers) {
270+
for await (const cluster of this.initializedClusters) {
271+
await manager.remove(cluster);
272+
}
273+
}
261274
}
262275

263276
// noinspection JSUnusedGlobalSymbols

src/RedisQueue.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ export class RedisQueue extends EventEmitter<EventMap>
503503
await this.stop();
504504
await this.clear();
505505
this.destroyWriter();
506+
await this.unsubscribe();
506507
}
507508

508509
/**
@@ -930,7 +931,7 @@ export class RedisQueue extends EventEmitter<EventMap>
930931
await this.processKeys(keys, now);
931932

932933
if (cursor === '0') {
933-
return ;
934+
return;
934935
}
935936
} catch (err) {
936937
this.emitError('OnSafeDelivery',
@@ -953,7 +954,7 @@ export class RedisQueue extends EventEmitter<EventMap>
953954
*/
954955
private async processKeys(keys: string[], now: number): Promise<void> {
955956
if (!keys.length) {
956-
return ;
957+
return;
957958
}
958959

959960
for (const key of keys) {
@@ -1392,5 +1393,4 @@ export class RedisQueue extends EventEmitter<EventMap>
13921393
}
13931394
});
13941395
}
1395-
13961396
}

src/UDPClusterManager.ts

Lines changed: 93 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ import { ICluster, ClusterManager } from './ClusterManager';
2828
import { Socket, createSocket } from 'dgram';
2929
import { networkInterfaces } from 'os';
3030

31-
enum BroadcastedMessageType {
31+
enum MessageType {
3232
Up = 'up',
3333
Down = 'down',
3434
}
3535

36-
interface BroadcastedMessage {
36+
interface Message {
3737
name: string;
3838
id: string;
39-
type: BroadcastedMessageType;
39+
type: MessageType;
4040
host: string;
4141
port: number;
4242
timeout: number;
@@ -48,7 +48,7 @@ interface ClusterServer extends IMessageQueueConnection {
4848
timer?: NodeJS.Timeout;
4949
}
5050

51-
export const DEFAULT_UDP_BROADCAST_CLUSTER_MANAGER_OPTIONS = {
51+
export const DEFAULT_UDP_CLUSTER_MANAGER_OPTIONS = {
5252
broadcastPort: 63000,
5353
broadcastAddress: '255.255.255.255',
5454
aliveTimeoutCorrection: 1000,
@@ -133,44 +133,63 @@ const LOCALHOST_ADDRESSES = [
133133
export class UDPClusterManager extends ClusterManager {
134134
private static sockets: Record<string, Socket> = {};
135135
private readonly options: UDPClusterManagerOptions;
136+
private socketKey: string;
137+
138+
private get socket(): Socket | undefined {
139+
return UDPClusterManager.sockets[this.socketKey];
140+
}
141+
142+
private set socket(socket: Socket) {
143+
UDPClusterManager.sockets[this.socketKey] = socket;
144+
}
136145

137146
constructor(options?: UDPClusterManagerOptions) {
138147
super();
139148

140149
this.options = {
141-
...DEFAULT_UDP_BROADCAST_CLUSTER_MANAGER_OPTIONS,
150+
...DEFAULT_UDP_CLUSTER_MANAGER_OPTIONS,
142151
...options || {},
143152
};
144153
this.startListening(this.options);
154+
155+
process.on('SIGTERM', UDPClusterManager.free);
156+
process.on('SIGINT', UDPClusterManager.free);
157+
process.on('SIGABRT', UDPClusterManager.free);
158+
}
159+
160+
private static async free(): Promise<void> {
161+
const socketKeys = Object.keys(UDPClusterManager.sockets);
162+
163+
await Promise.all(socketKeys.map(
164+
socketKey => UDPClusterManager.destroySocket(
165+
socketKey,
166+
UDPClusterManager.sockets[socketKey],
167+
)),
168+
);
145169
}
146170

147171
private listenBroadcastedMessages(
148-
listener: (message: BroadcastedMessage) => void,
172+
listener: (message: Message) => void,
149173
options: UDPClusterManagerOptions,
150174
): void {
151-
const address = UDPClusterManager.selectNetworkInterface(
152-
options,
153-
);
154-
const key = `${ address }:${ options.broadcastPort }`;
175+
const address = UDPClusterManager.selectNetworkInterface(options);
155176

156-
if (!UDPClusterManager.sockets[key]) {
157-
const socket = createSocket({ type: 'udp4', reuseAddr: true });
177+
this.socketKey = `${ address }:${ options.broadcastPort }`;
158178

159-
socket.bind(options.broadcastPort, address);
160-
UDPClusterManager.sockets[key] = socket;
179+
if (!this.socket) {
180+
this.socket = createSocket({ type: 'udp4', reuseAddr: true });
181+
this.socket.bind(options.broadcastPort, address);
161182
}
162183

163-
UDPClusterManager.sockets[key].on(
184+
this.socket.on(
164185
'message',
165186
message => listener(
166187
UDPClusterManager.parseBroadcastedMessage(message),
167188
),
168189
);
169190
}
170191

171-
private startListening(
172-
options: UDPClusterManagerOptions = {},
173-
): void {
192+
private startListening(options: UDPClusterManagerOptions = {}): void {
174193
this.listenBroadcastedMessages(
175194
UDPClusterManager.processBroadcastedMessage(this),
176195
options,
@@ -191,18 +210,18 @@ export class UDPClusterManager extends ClusterManager {
191210

192211
private static processMessageOnCluster(
193212
cluster: ICluster,
194-
message: BroadcastedMessage,
213+
message: Message,
195214
aliveTimeoutCorrection?: number,
196215
): void {
197216
const server = cluster.find<ClusterServer>(message);
198217

199-
if (server && message.type === BroadcastedMessageType.Down) {
218+
if (server && message.type === MessageType.Down) {
200219
clearTimeout(server.timer);
201220

202221
return cluster.remove(message);
203222
}
204223

205-
if (!server && message.type === BroadcastedMessageType.Up) {
224+
if (!server && message.type === MessageType.Up) {
206225
cluster.add(message);
207226

208227
const added = cluster.find<ClusterServer>(message);
@@ -218,7 +237,7 @@ export class UDPClusterManager extends ClusterManager {
218237
return;
219238
}
220239

221-
if (server && message.type === BroadcastedMessageType.Up) {
240+
if (server && message.type === MessageType.Up) {
222241
return UDPClusterManager.serverAliveWait(
223242
cluster,
224243
server,
@@ -230,7 +249,7 @@ export class UDPClusterManager extends ClusterManager {
230249

231250
private static processBroadcastedMessage(
232251
context: UDPClusterManager,
233-
): (message: BroadcastedMessage) => void {
252+
): (message: Message) => void {
234253
return message => {
235254
if (
236255
context.options.excludeHosts
@@ -239,7 +258,7 @@ export class UDPClusterManager extends ClusterManager {
239258
context.options.excludeHosts,
240259
)
241260
) {
242-
return ;
261+
return;
243262
}
244263

245264
if (
@@ -249,7 +268,7 @@ export class UDPClusterManager extends ClusterManager {
249268
context.options.includeHosts,
250269
)
251270
) {
252-
return ;
271+
return;
253272
}
254273

255274
for (const cluster of context.clusters) {
@@ -262,9 +281,7 @@ export class UDPClusterManager extends ClusterManager {
262281
};
263282
}
264283

265-
private static parseBroadcastedMessage(
266-
input: Buffer,
267-
): BroadcastedMessage {
284+
private static parseBroadcastedMessage(input: Buffer): Message {
268285
const [
269286
name,
270287
id,
@@ -277,7 +294,7 @@ export class UDPClusterManager extends ClusterManager {
277294
return {
278295
id,
279296
name,
280-
type: type.toLowerCase() as BroadcastedMessageType,
297+
type: type.toLowerCase() as MessageType,
281298
host,
282299
port: parseInt(port),
283300
timeout: parseFloat(timeout) * 1000,
@@ -288,7 +305,7 @@ export class UDPClusterManager extends ClusterManager {
288305
cluster: ICluster,
289306
server: ClusterServer,
290307
aliveTimeoutCorrection?: number,
291-
message?: BroadcastedMessage,
308+
message?: Message,
292309
): void {
293310
clearTimeout(server.timer);
294311
server.timestamp = Date.now();
@@ -319,6 +336,52 @@ export class UDPClusterManager extends ClusterManager {
319336
}, timeout);
320337
}
321338

339+
/**
340+
* Destroys the UDPClusterManager by closing all opened network connections
341+
* and safely destroying all blocking sockets
342+
*
343+
* @returns {Promise<void>}
344+
* @throws {Error}
345+
*/
346+
public async destroy(): Promise<void> {
347+
await UDPClusterManager.destroySocket(this.socketKey, this.socket);
348+
}
349+
350+
private static async destroySocket(
351+
socketKey: string,
352+
socket?: Socket,
353+
): Promise<void> {
354+
if (!socket) {
355+
return;
356+
}
357+
358+
return await new Promise((resolve, reject) => {
359+
try {
360+
if (typeof socket.close === 'function') {
361+
socket.removeAllListeners();
362+
socket.close(() => {
363+
socket?.unref();
364+
365+
if (
366+
socketKey
367+
&& UDPClusterManager.sockets[socketKey]
368+
) {
369+
delete UDPClusterManager.sockets[socketKey];
370+
}
371+
372+
resolve();
373+
});
374+
375+
return;
376+
}
377+
378+
resolve();
379+
} catch (e) {
380+
reject(e);
381+
}
382+
});
383+
}
384+
322385
private static selectNetworkInterface(
323386
options: Pick<
324387
UDPClusterManagerOptions,

src/profile.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ export function logDebugInfo({
165165
if (debugTime) {
166166
// noinspection TypeScriptUnresolvedFunction
167167
const time = parseInt(
168-
((process.hrtime as any).bigint() - start) as any,
168+
((process.hrtime as any).bigint() - BigInt(start)) as any,
169169
10,
170170
) / 1000;
171171
let timeStr: string;

0 commit comments

Comments
 (0)