Skip to content

Commit a0d15e3

Browse files
committed
fix: safer message handling
1 parent 35c55c5 commit a0d15e3

File tree

1 file changed

+17
-7
lines changed

1 file changed

+17
-7
lines changed

src/RedisQueue.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,7 @@ export class RedisQueue extends EventEmitter<EventMap>
11651165
/**
11661166
* Unreliable but fast way of message handling by the queue
11671167
*/
1168-
private async readUnsafe() {
1168+
private async readUnsafe(): Promise<void> {
11691169
try {
11701170
const key = this.key;
11711171

@@ -1182,6 +1182,7 @@ export class RedisQueue extends EventEmitter<EventMap>
11821182
}
11831183
} catch (err) {
11841184
// istanbul ignore next
1185+
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
11851186
if (err.message.match(/Stream connection ended/)) {
11861187
break;
11871188
}
@@ -1201,7 +1202,7 @@ export class RedisQueue extends EventEmitter<EventMap>
12011202
/**
12021203
* Reliable but slow method of message handling by message queue
12031204
*/
1204-
private async readSafe() {
1205+
private async readSafe(): Promise<void> {
12051206
try {
12061207
const key = this.key;
12071208

@@ -1216,7 +1217,8 @@ export class RedisQueue extends EventEmitter<EventMap>
12161217

12171218
try {
12181219
await this.reader.brpoplpush(this.key, workerKey, 0);
1219-
} catch (err) {
1220+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
1221+
} catch (_) {
12201222
// istanbul ignore next
12211223
break;
12221224
}
@@ -1225,19 +1227,26 @@ export class RedisQueue extends EventEmitter<EventMap>
12251227
workerKey, -1, 1,
12261228
);
12271229

1228-
if (msgArr.length !== 1) {
1230+
if (!msgArr || msgArr?.length !== 1) {
12291231
// noinspection ExceptionCaughtLocallyJS
12301232
throw new Error('Wrong messages count');
12311233
}
12321234

1233-
const msg = msgArr[0];
1235+
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
1236+
const [msg] = msgArr as any[];
12341237

12351238
this.process([key, msg]);
1236-
this.writer.del(workerKey);
1239+
this.writer.del(workerKey).catch(e =>
1240+
this.logger.warn('OnReadSafe: del error', e));
12371241
}
12381242
} catch (err) {
12391243
// istanbul ignore next
1240-
this.emitError('OnReadSafe', 'safe reader failed', err);
1244+
this.emitError(
1245+
'OnReadSafe',
1246+
'safe reader failed',
1247+
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
1248+
err,
1249+
);
12411250
}
12421251
}
12431252

@@ -1261,6 +1270,7 @@ export class RedisQueue extends EventEmitter<EventMap>
12611270
? 'readSafe'
12621271
: 'readUnsafe';
12631272

1273+
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
12641274
process.nextTick(this[readMethod].bind(this));
12651275

12661276
return this;

0 commit comments

Comments
 (0)