Skip to content

Commit 52b7b24

Browse files
committed
fix: alive timeout check & start/destroy queue race condition
1 parent 8655c27 commit 52b7b24

File tree

4 files changed

+69
-39
lines changed

4 files changed

+69
-39
lines changed

src/ClusterManager.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export interface ICluster {
2929
remove: (server: IServerInput) => void;
3030
find: <T extends IMessageQueueConnection>(
3131
server: IServerInput,
32+
strict?: boolean,
3233
) => T | undefined;
3334
}
3435

@@ -49,6 +50,12 @@ export abstract class ClusterManager {
4950
return initializedCluster;
5051
}
5152

53+
public async anyCluster(
54+
fn: (cluster: InitializedCluster) => Promise<void> | void,
55+
): Promise<void> {
56+
await Promise.all(this.clusters.map(cluster => fn(cluster)));
57+
}
58+
5259
public async remove(
5360
cluster: string | InitializedCluster,
5461
destroy: boolean = true,

src/ClusteredRedisQueue.ts

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@
2323
*/
2424
import { EventEmitter } from 'events';
2525
import {
26-
DEFAULT_IMQ_OPTIONS,
2726
buildOptions,
27+
copyEventEmitter,
28+
DEFAULT_IMQ_OPTIONS,
29+
EventMap,
2830
ILogger,
2931
IMessageQueue,
3032
IMessageQueueConnection,
3133
IMQMode,
3234
IMQOptions,
35+
IServerInput,
3336
JsonObject,
3437
RedisQueue,
35-
EventMap,
36-
IServerInput,
37-
copyEventEmitter,
3838
} from '.';
3939
import { InitializedCluster } from './ClusterManager';
4040

@@ -496,7 +496,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
496496
* @returns {void}
497497
*/
498498
protected removeServer(server: IServerInput): void {
499-
const remove = this.findServer(server);
499+
const remove = this.findServer(server, true);
500500

501501
if (!remove) {
502502
return;
@@ -534,14 +534,12 @@ export class ClusteredRedisQueue implements IMessageQueue,
534534
const imq = new RedisQueue(this.name, opts);
535535

536536
if (initializeQueue) {
537-
this.initializeQueue(imq)
538-
.then(() => {
539-
this.clusterEmitter.emit('initialized', {
540-
server: newServer,
541-
imq,
542-
});
543-
})
544-
.catch();
537+
this.initializeQueue(imq).then(() => {
538+
this.clusterEmitter.emit('initialized', {
539+
server: newServer,
540+
imq,
541+
});
542+
});
545543
}
546544

547545
newServer.imq = imq;
@@ -571,18 +569,23 @@ export class ClusteredRedisQueue implements IMessageQueue,
571569
}
572570
}
573571

574-
private findServer(server: IServerInput): ClusterServer | undefined {
572+
private findServer(
573+
server: IServerInput,
574+
strict: boolean = false,
575+
): ClusterServer | undefined {
575576
return this.servers.find(
576577
existing => ClusteredRedisQueue.matchServers(
577578
existing,
578579
server,
580+
strict,
579581
),
580582
);
581583
}
582584

583585
private static matchServers(
584586
source: IServerInput,
585587
target: IServerInput,
588+
strict: boolean = false,
586589
): boolean {
587590
const sameAddress = target.host === source.host
588591
&& target.port === source.port;
@@ -593,6 +596,11 @@ export class ClusteredRedisQueue implements IMessageQueue,
593596

594597
const sameId = target.id === source.id;
595598

599+
if (strict) {
600+
return sameId && sameAddress;
601+
}
602+
596603
return sameId || sameAddress;
597604
}
605+
598606
}

src/RedisQueue.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ export class RedisQueue extends EventEmitter<EventMap>
180180
private destroyed: boolean = false;
181181

182182
/**
183-
* True if the current instance owns watcher connection, false otherwise
183+
* True if the current instance owns a watcher connection, false otherwise
184184
*
185185
* @type {boolean}
186186
*/
@@ -369,6 +369,8 @@ export class RedisQueue extends EventEmitter<EventMap>
369369
return this;
370370
}
371371

372+
this.destroyed = false;
373+
372374
const connPromises = [];
373375

374376
// istanbul ignore next
@@ -408,7 +410,6 @@ export class RedisQueue extends EventEmitter<EventMap>
408410

409411
await this.initWatcher();
410412
this.initialized = true;
411-
this.destroyed = false;
412413

413414
return this;
414415
}
@@ -586,7 +587,7 @@ export class RedisQueue extends EventEmitter<EventMap>
586587

587588
// noinspection JSUnusedLocalSymbols
588589
/**
589-
* Watcher setter, sets the watcher connection property for this
590+
* Watcher setter sets the watcher connection property for this
590591
* queue instance
591592
*
592593
* @param {IRedisClient} conn
@@ -864,13 +865,17 @@ export class RedisQueue extends EventEmitter<EventMap>
864865
}
865866

866867
/**
867-
* Returns number of established watcher connections on redis
868+
* Returns the number of established watcher connections on redis
868869
*
869870
* @access private
870871
* @returns {Promise<number>}
871872
*/
872873
// istanbul ignore next
873874
private async watcherCount(): Promise<number> {
875+
if (!this.writer) {
876+
return 0;
877+
}
878+
874879
const rx = new RegExp(
875880
`\\bname=${this.options.prefix}:[\\S]+?:watcher:`,
876881
);
@@ -883,7 +888,7 @@ export class RedisQueue extends EventEmitter<EventMap>
883888
}
884889

885890
/**
886-
* Processes delayed message by its given redis key
891+
* Processes delayed a message by its given redis key
887892
*
888893
* @access private
889894
* @param {string} key

src/UDPClusterManager.ts

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,6 @@ export const DEFAULT_UDP_CLUSTER_MANAGER_OPTIONS = {
5555
};
5656

5757
export interface UDPClusterManagerOptions {
58-
/**
59-
* Represents the cluster operations that are responsible for managing
60-
* clusters. This includes operations such as adding, removing, or checking
61-
* if a cluster server exists.
62-
*
63-
* @type {ICluster}
64-
*/
65-
cluster?: ICluster;
66-
6758
/**
6859
* Message queue broadcast port
6960
*
@@ -224,7 +215,7 @@ export class UDPClusterManager extends ClusterManager {
224215
if (!server && message.type === MessageType.Up) {
225216
cluster.add(message);
226217

227-
const added = cluster.find<ClusterServer>(message);
218+
const added = cluster.find<ClusterServer>(message, true);
228219

229220
if (added) {
230221
UDPClusterManager.serverAliveWait(
@@ -271,13 +262,13 @@ export class UDPClusterManager extends ClusterManager {
271262
return;
272263
}
273264

274-
for (const cluster of context.clusters) {
265+
context.anyCluster(cluster => {
275266
UDPClusterManager.processMessageOnCluster(
276267
cluster,
277268
message,
278269
context.options.aliveTimeoutCorrection,
279270
);
280-
}
271+
}).then();
281272
};
282273
}
283274

@@ -307,7 +298,11 @@ export class UDPClusterManager extends ClusterManager {
307298
aliveTimeoutCorrection?: number,
308299
message?: Message,
309300
): void {
310-
clearTimeout(server.timer);
301+
if (server.timer) {
302+
clearTimeout(server.timer);
303+
server.timer = undefined;
304+
}
305+
311306
server.timestamp = Date.now();
312307

313308
if (message) {
@@ -317,23 +312,38 @@ export class UDPClusterManager extends ClusterManager {
317312
const correction = aliveTimeoutCorrection || 0;
318313
const timeout = (server.timeout || 0) + correction;
319314

320-
server.timer = setTimeout(() => {
321-
const existing = cluster.find<ClusterServer>(server);
315+
if (timeout <= 0) {
316+
return;
317+
}
318+
319+
const timerId = setTimeout(() => {
320+
const existing = cluster.find<ClusterServer>(server, true);
322321

323-
if (!existing) {
322+
if (!existing || existing.timer !== timerId) {
324323
return;
325324
}
326325

327326
const now = Date.now();
328-
const delta = now - (existing.timestamp || now);
327+
328+
if (!existing.timestamp) {
329+
clearTimeout(existing.timer);
330+
existing.timer = undefined;
331+
cluster.remove(existing);
332+
333+
return;
334+
}
335+
336+
const delta = now - existing.timestamp;
329337
const currentTimeout = (existing.timeout || 0) + correction;
330338

331339
if (delta >= currentTimeout) {
332-
clearTimeout(server.timer);
333-
334-
cluster.remove(server);
340+
clearTimeout(existing.timer);
341+
existing.timer = undefined;
342+
cluster.remove(existing);
335343
}
336344
}, timeout);
345+
346+
server.timer = timerId;
337347
}
338348

339349
/**

0 commit comments

Comments
 (0)