Skip to content

Commit 56421b6

Browse files
committed
fix: rework redis connection make it more stable & straightforward
1 parent 13c9544 commit 56421b6

File tree

2 files changed

+67
-92
lines changed

2 files changed

+67
-92
lines changed

src/RedisQueue.ts

Lines changed: 63 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -811,59 +811,63 @@ export class RedisQueue extends EventEmitter<EventMap>
811811
return context[channel];
812812
}
813813

814-
return new Promise((resolve, reject) => {
815-
const redis = new Redis({
816-
// istanbul ignore next
817-
port: options.port || 6379,
818-
// istanbul ignore next
819-
host: options.host || 'localhost',
820-
// istanbul ignore next
821-
username: options.username,
822-
// istanbul ignore next
823-
password: options.password,
824-
connectionName: this.getChannelName(
825-
context.name + '',
826-
options.prefix || '',
827-
channel,
828-
),
829-
retryStrategy: this.retryStrategy(),
830-
autoResubscribe: true,
831-
enableOfflineQueue: true,
832-
autoResendUnfulfilledCommands: true,
833-
offlineQueue: true,
834-
maxRetriesPerRequest: null,
814+
const redis = new Redis({
815+
// istanbul ignore next
816+
port: options.port || 6379,
817+
// istanbul ignore next
818+
host: options.host || 'localhost',
819+
// istanbul ignore next
820+
username: options.username,
821+
// istanbul ignore next
822+
password: options.password,
823+
connectionName: this.getChannelName(
824+
context.name + '',
825+
options.prefix || '',
826+
channel,
827+
),
828+
retryStrategy: this.retryStrategy(),
829+
autoResubscribe: true,
830+
enableOfflineQueue: true,
831+
autoResendUnfulfilledCommands: true,
832+
offlineQueue: true,
833+
maxRetriesPerRequest: null,
834+
enableReadyCheck: channel !== 'subscription',
835+
lazyConnect: true,
836+
});
837+
838+
context[channel] = redis;
839+
context[channel].__imq = true;
840+
841+
for (const event of [
842+
'wait',
843+
'reconnecting',
844+
'connecting',
845+
'connect',
846+
'close',
847+
]) {
848+
redis.on(event, () => {
849+
context.verbose(`Redis Event fired: ${ event }`);
835850
});
851+
}
836852

837-
context[channel] = redis;
838-
context[channel].__imq = true;
839-
840-
for (const event of [
841-
'wait',
842-
'reconnecting',
843-
'connecting',
844-
'connect',
845-
'close',
846-
]) {
847-
redis.on(event, () => {
848-
context.verbose(`Redis Event fired: ${ event }`);
849-
});
850-
}
853+
redis.setMaxListeners(IMQ_REDIS_MAX_LISTENERS_LIMIT);
854+
redis.on('error', this.onErrorHandler(context, channel));
855+
redis.on('end', this.onCloseHandler(context, channel));
851856

852-
redis.setMaxListeners(IMQ_REDIS_MAX_LISTENERS_LIMIT);
853-
redis.on('ready',
854-
this.onReadyHandler(
855-
context,
856-
channel,
857-
resolve,
858-
) as unknown as () => void,
859-
);
860-
redis.on('error',
861-
this.onErrorHandler(context, channel, reject),
862-
);
863-
redis.on('end',
864-
this.onCloseHandler(context, channel),
865-
);
866-
});
857+
await redis.connect();
858+
859+
this.logger.info(
860+
'%s: %s channel connected, host %s, pid %s',
861+
context.name, channel, this.redisKey, process.pid,
862+
);
863+
864+
switch (channel) {
865+
case 'reader': this.read(); break;
866+
case 'writer': await this.processDelayed(this.key); break;
867+
case 'watcher': await this.initWatcher(); break;
868+
}
869+
870+
return context[channel];
867871
}
868872

869873
// istanbul ignore next
@@ -956,38 +960,6 @@ export class RedisQueue extends EventEmitter<EventMap>
956960
}, delay);
957961
}
958962

959-
/**
960-
* Builds and returns connection ready state handler
961-
*
962-
* @access private
963-
* @param {RedisQueue} context
964-
* @param {RedisConnectionChannel} channel
965-
* @param {(...args: any[]) => void} resolve
966-
* @return {() => Promise<void>}
967-
*/
968-
private onReadyHandler(
969-
context: RedisQueue,
970-
channel: RedisConnectionChannel,
971-
resolve: (...args: any[]) => void,
972-
): () => Promise<void> {
973-
this.verbose(`Redis ${ channel } channel ready!`);
974-
975-
return (async () => {
976-
this.logger.info(
977-
'%s: %s channel connected, host %s, pid %s',
978-
context.name, channel, this.redisKey, process.pid,
979-
);
980-
981-
switch (channel) {
982-
case 'reader': this.read(); break;
983-
case 'writer': await this.processDelayed(this.key); break;
984-
case 'watcher': await this.initWatcher(); break;
985-
}
986-
987-
resolve(context[channel]);
988-
});
989-
}
990-
991963
// noinspection JSMethodCanBeStatic
992964
/**
993965
* Generates channel name
@@ -1013,17 +985,15 @@ export class RedisQueue extends EventEmitter<EventMap>
1013985
* @access private
1014986
* @param {RedisQueue} context
1015987
* @param {RedisConnectionChannel} channel
1016-
* @param {(...args: any[]) => void} reject
1017988
* @return {(err: Error) => void}
1018989
*/
1019990
private onErrorHandler(
1020991
context: RedisQueue,
1021992
channel: RedisConnectionChannel,
1022-
reject: (...args: any[]) => void,
1023-
): (err: Error) => void {
993+
): (error: Error) => void {
1024994
// istanbul ignore next
1025-
return ((err: Error & { code: string }) => {
1026-
this.verbose(`Redis Error: ${ err }`);
995+
return ((error: Error & { code: string }) => {
996+
this.verbose(`Redis Error: ${ error }`);
1027997

1028998
if (this.destroyed) {
1029999
return;
@@ -1033,13 +1003,14 @@ export class RedisQueue extends EventEmitter<EventMap>
10331003
`${context.name}: error connecting redis host ${
10341004
this.redisKey} on ${
10351005
channel}, pid ${process.pid}:`,
1036-
err,
1006+
error,
10371007
);
10381008

1039-
if (!this.initialized) {
1040-
reject(err);
1041-
} else {
1042-
// Try to recover the channel using our reconnection routine
1009+
if (
1010+
error.code === 'ECONNREFUSED' ||
1011+
error.code === 'ETIMEDOUT' ||
1012+
context[channel]?.status !== 'ready'
1013+
) {
10431014
this.scheduleReconnect(channel);
10441015
}
10451016
});

test/mocks/redis.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ export class RedisClientMock extends EventEmitter {
6464
return new Promise(resolve => resolve(undefined));
6565
}
6666

67+
public connect() {
68+
return new Promise(resolve => resolve(undefined));
69+
}
70+
6771
// noinspection JSMethodCanBeStatic
6872
public set(...args: any[]): Promise<number> {
6973
const [key, val] = args;

0 commit comments

Comments
 (0)