Skip to content

Commit a5a0c24

Browse files
committed
wip: server alive timeout
1 parent 23bebb0 commit a5a0c24

File tree

5 files changed

+104
-223
lines changed

5 files changed

+104
-223
lines changed

src/ClusterManager.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ import { IMessageQueueConnection, IServerInput } from './IMessageQueue';
2525
import { uuid } from './uuid';
2626

2727
export interface ICluster {
28-
add: (server: IServerInput) => void;
28+
add: <T extends IMessageQueueConnection>(server: IServerInput) => T;
2929
remove: (server: IServerInput) => void;
3030
find: <T extends IMessageQueueConnection>(
3131
server: IServerInput,
32-
strict?: boolean,
3332
) => T | undefined;
3433
}
3534

src/ClusteredRedisQueue.ts

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
485485
* @param {IServerInput} server
486486
* @returns {void}
487487
*/
488-
protected addServer(server: IServerInput): void {
488+
protected addServer(server: IServerInput): ClusterServer {
489489
return this.addServerWithQueueInitializing(server, true);
490490
}
491491

@@ -496,7 +496,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
496496
* @returns {void}
497497
*/
498498
protected removeServer(server: IServerInput): void {
499-
const remove = this.findServer(server, true);
499+
const remove = this.findServer(server);
500500

501501
if (!remove) {
502502
return;
@@ -524,12 +524,13 @@ export class ClusteredRedisQueue implements IMessageQueue,
524524
private addServerWithQueueInitializing(
525525
server: ClusterServer,
526526
initializeQueue: boolean = true,
527-
): void {
527+
): ClusterServer {
528528
const newServer: ClusterServer = {
529529
id: server.id,
530530
host: server.host,
531531
port: server.port,
532532
};
533+
533534
const opts = { ...this.mqOptions, ...newServer };
534535
const imq = new RedisQueue(this.name, opts);
535536

@@ -548,6 +549,8 @@ export class ClusteredRedisQueue implements IMessageQueue,
548549
this.servers.push(newServer);
549550
this.clusterEmitter.emit('add', { server: newServer, imq });
550551
this.queueLength = this.imqs.length;
552+
553+
return newServer;
551554
}
552555

553556
private eventEmitters(): EventEmitter[] {
@@ -569,23 +572,18 @@ export class ClusteredRedisQueue implements IMessageQueue,
569572
}
570573
}
571574

572-
private findServer(
573-
server: IServerInput,
574-
strict: boolean = false,
575-
): ClusterServer | undefined {
575+
private findServer(server: IServerInput): ClusterServer | undefined {
576576
return this.servers.find(
577577
existing => ClusteredRedisQueue.matchServers(
578578
existing,
579579
server,
580-
strict,
581580
),
582581
);
583582
}
584583

585584
private static matchServers(
586585
source: IServerInput,
587586
target: IServerInput,
588-
strict: boolean = false,
589587
): boolean {
590588
const sameAddress = target.host === source.host
591589
&& target.port === source.port;
@@ -596,10 +594,6 @@ export class ClusteredRedisQueue implements IMessageQueue,
596594

597595
const sameId = target.id === source.id;
598596

599-
if (strict) {
600-
return sameId && sameAddress;
601-
}
602-
603597
return sameId || sameAddress;
604598
}
605599

0 commit comments

Comments
 (0)