Skip to content

Commit e7a46f7

Browse files
committed
feat: implemented own reconnection handler instead of ioredis native retry strategy
1 parent 76db725 commit e7a46f7

File tree

1 file changed

+123
-25
lines changed

1 file changed

+123
-25
lines changed

src/RedisQueue.ts

Lines changed: 123 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,14 @@ export class RedisQueue extends EventEmitter<EventMap>
201201
*/
202202
private safeCheckInterval: any;
203203

204+
/**
205+
* Internal per-channel reconnection state
206+
*/
207+
private reconnectTimers: Partial<Record<RedisConnectionChannel, any>> = {};
208+
private reconnectAttempts: Partial<Record<RedisConnectionChannel, number>>
209+
= {};
210+
private reconnecting: Partial<Record<RedisConnectionChannel, boolean>> = {};
211+
204212
/**
205213
* This queue instance unique key (identifier), for internal use
206214
*/
@@ -558,9 +566,7 @@ export class RedisQueue extends EventEmitter<EventMap>
558566

559567
if (this.reader) {
560568
this.verbose('Destroying reader...');
561-
this.reader.removeAllListeners();
562-
this.reader.quit();
563-
this.reader.disconnect(false);
569+
this.destroyChannel('reader', this);
564570

565571
delete this.reader;
566572
}
@@ -737,11 +743,7 @@ export class RedisQueue extends EventEmitter<EventMap>
737743
private destroyWatcher(): void {
738744
if (this.watcher) {
739745
this.verbose('Destroying watcher...');
740-
this.watcher.removeAllListeners();
741-
this.watcher.quit().catch(e => {
742-
this.verbose(`Error quitting watcher: ${ e }`);
743-
});
744-
this.watcher.disconnect(false);
746+
this.destroyChannel('watcher', this);
745747
delete RedisQueue.watchers[this.redisKey];
746748
this.verbose('Watcher destroyed!');
747749
}
@@ -756,17 +758,38 @@ export class RedisQueue extends EventEmitter<EventMap>
756758
private destroyWriter(): void {
757759
if (this.writer) {
758760
this.verbose('Destroying writer...');
759-
this.writer.removeAllListeners();
760-
this.writer.quit().catch(e => {
761-
this.verbose(`Error quitting writer: ${ e }`);
762-
});
763-
this.writer.disconnect(false);
764-
761+
this.destroyChannel('writer', this);
765762
delete RedisQueue.writers[this.redisKey];
766763
this.verbose('Writer destroyed!');
767764
}
768765
}
769766

767+
/**
768+
* Destroys any channel
769+
*
770+
* @access private
771+
*/
772+
@profile()
773+
private destroyChannel(
774+
channel: RedisConnectionChannel,
775+
context: RedisQueue = this,
776+
): void {
777+
const client = context[channel];
778+
779+
if (client) {
780+
try {
781+
client.removeAllListeners();
782+
client.quit().then(() => {
783+
client.disconnect(false);
784+
}).catch(e => {
785+
this.verbose(`Error quitting ${ channel }: ${ e }`);
786+
});
787+
} catch (error) {
788+
this.verbose(`Error destroying ${ channel }: ${ error }`);
789+
}
790+
}
791+
}
792+
770793
/**
771794
* Establishes a given connection channel by its name
772795
*
@@ -803,7 +826,7 @@ export class RedisQueue extends EventEmitter<EventMap>
803826
options.prefix || '',
804827
channel,
805828
),
806-
retryStrategy: this.retryStrategy(context),
829+
retryStrategy: this.retryStrategy(),
807830
autoResubscribe: true,
808831
enableOfflineQueue: true,
809832
autoResendUnfulfilledCommands: true,
@@ -847,22 +870,90 @@ export class RedisQueue extends EventEmitter<EventMap>
847870
/**
848871
* Builds and returns redis reconnection strategy
849872
*
850-
* @param {RedisQueue} context
851873
* @returns {() => (number | void | null)}
852874
* @private
853875
*/
854-
private retryStrategy(
855-
context: RedisQueue,
856-
): () => number | void | null {
876+
private retryStrategy(): () => null {
857877
return () => {
858-
if (context.destroyed) {
859-
return null;
878+
return null;
879+
};
880+
}
881+
882+
/**
883+
* Schedules custom reconnection for a given channel with capped
884+
* exponential backoff
885+
*
886+
* @param {RedisConnectionChannel} channel
887+
* @private
888+
*/
889+
private scheduleReconnect(channel: RedisConnectionChannel): void {
890+
if (this.destroyed) {
891+
return;
892+
}
893+
894+
if (this.reconnecting[channel]) {
895+
return;
896+
}
897+
898+
this.reconnecting[channel] = true;
899+
900+
const attempts = (this.reconnectAttempts[channel] || 0) + 1;
901+
this.reconnectAttempts[channel] = attempts;
902+
903+
const base = Math.min(30000, 1000 * Math.pow(2, attempts - 1));
904+
const jitter = Math.floor(base * 0.2 * Math.random());
905+
const delay = base + jitter;
906+
907+
this.verbose(`Scheduling ${ channel } reconnect in ${
908+
delay } ms (attempt ${ attempts })`);
909+
910+
if (this.reconnectTimers[channel]) {
911+
clearTimeout(this.reconnectTimers[channel] as any);
912+
}
913+
914+
this.reconnectTimers[channel] = setTimeout(async () => {
915+
if (this.destroyed) {
916+
this.reconnecting[channel] = false;
917+
918+
return;
860919
}
861920

862-
this.verbose('Redis connection error, retrying...');
921+
try {
922+
switch (channel) {
923+
case 'watcher':
924+
this.destroyWatcher();
925+
break;
926+
case 'writer':
927+
this.destroyWriter();
928+
break;
929+
case 'reader':
930+
this.destroyChannel(channel, this);
931+
this.reader = undefined;
932+
933+
break;
934+
case 'subscription':
935+
this.destroyChannel(channel, this);
936+
this.subscription = undefined;
863937

864-
return null;
865-
};
938+
break;
939+
}
940+
941+
await this.connect(channel, this.options);
942+
this.reconnectAttempts[channel] = 0;
943+
this.reconnecting[channel] = false;
944+
945+
if (this.reconnectTimers[channel]) {
946+
clearTimeout(this.reconnectTimers[channel] as any);
947+
this.reconnectTimers[channel] = undefined;
948+
}
949+
950+
this.verbose(`Reconnected ${ channel } channel`);
951+
} catch (err) {
952+
this.reconnecting[channel] = false;
953+
this.verbose(`Reconnect ${ channel } failed: ${ err }`);
954+
this.scheduleReconnect(channel);
955+
}
956+
}, delay);
866957
}
867958

868959
/**
@@ -946,8 +1037,10 @@ export class RedisQueue extends EventEmitter<EventMap>
9461037
);
9471038

9481039
if (!this.initialized) {
949-
this.initialized = false;
9501040
reject(err);
1041+
} else {
1042+
// Try to recover the channel using our reconnection routine
1043+
this.scheduleReconnect(channel);
9511044
}
9521045
});
9531046
}
@@ -969,10 +1062,15 @@ export class RedisQueue extends EventEmitter<EventMap>
9691062
// istanbul ignore next
9701063
return (() => {
9711064
this.initialized = false;
1065+
9721066
this.logger.warn(
9731067
'%s: redis connection %s closed on host %s, pid %s!',
9741068
context.name, channel, this.redisKey, process.pid,
9751069
);
1070+
1071+
if (!this.destroyed) {
1072+
this.scheduleReconnect(channel);
1073+
}
9761074
});
9771075
}
9781076

0 commit comments

Comments
 (0)