Skip to content

Commit 4d393b4

Browse files
committed
feat: implmented queueLength method on Redis message queue
1 parent 31168b0 commit 4d393b4

File tree

3 files changed

+42
-7
lines changed

3 files changed

+42
-7
lines changed

src/ClusteredRedisQueue.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
108108
*
109109
* @type {number}
110110
*/
111-
private queueLength: number = 0;
111+
private imqLength: number = 0;
112112

113113
/**
114114
* Template EventEmitter instance used to replicate queue EventEmitters when
@@ -223,7 +223,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
223223
delay?: number,
224224
errorHandler?: (err: Error) => void,
225225
): Promise<string> {
226-
if (!this.queueLength) {
226+
if (!this.imqLength) {
227227
return await new Promise(resolve => this.clusterEmitter.once(
228228
'initialized',
229229
async ({ imq }) => {
@@ -237,7 +237,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
237237
));
238238
}
239239

240-
if (this.currentQueue >= this.queueLength) {
240+
if (this.currentQueue >= this.imqLength) {
241241
this.currentQueue = 0;
242242
}
243243

@@ -285,6 +285,18 @@ export class ClusteredRedisQueue implements IMessageQueue,
285285
'Clearing clustered redis message queue...');
286286
}
287287

288+
public async queueLength(): Promise<number> {
289+
const promises = [];
290+
291+
for (const imq of this.imqs) {
292+
promises.push(imq.queueLength());
293+
}
294+
295+
const lengths = await Promise.all(promises);
296+
297+
return lengths.reduce((total, length) => total + length, 0);
298+
}
299+
288300
/**
289301
* Batch imq action processing on all registered imqs at once
290302
*
@@ -511,7 +523,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
511523
imqToRemove.destroy().catch();
512524
}
513525

514-
this.queueLength = this.imqs.length;
526+
this.imqLength = this.imqs.length;
515527
this.servers = this.servers.filter(
516528
existing => !ClusteredRedisQueue.matchServers(
517529
existing,
@@ -557,7 +569,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
557569
this.imqs.push(imq);
558570
this.servers.push(newServer);
559571
this.clusterEmitter.emit('add', { server: newServer, imq });
560-
this.queueLength = this.imqs.length;
572+
this.imqLength = this.imqs.length;
561573

562574
return newServer;
563575
}
@@ -605,5 +617,4 @@ export class ClusteredRedisQueue implements IMessageQueue,
605617

606618
return sameId || sameAddress;
607619
}
608-
609620
}

src/IMessageQueue.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,4 +421,12 @@ export interface IMessageQueue extends EventEmitter<EventMap> {
421421
* @returns {Promise<IMessageQueue>}
422422
*/
423423
clear(): Promise<IMessageQueue>;
424+
425+
/**
426+
* Retrieves the current count of messages in the queue.
427+
* Supposed to be an async function.
428+
*
429+
* @returns {Promise<number>}
430+
*/
431+
queueLength(): Promise<number>;
424432
}

src/RedisQueue.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ export class RedisQueue extends EventEmitter<EventMap>
512512
}
513513

514514
/**
515-
* Clears queue data in redis;
515+
* Clears queue data in redis
516516
*
517517
* @returns {Promise<void>}
518518
*/
@@ -541,6 +541,20 @@ export class RedisQueue extends EventEmitter<EventMap>
541541
return this;
542542
}
543543

544+
/**
545+
* Retrieves the current count of messages in the queue
546+
*
547+
* @returns {Promise<number>}
548+
*/
549+
@profile()
550+
public async queueLength(): Promise<number> {
551+
if (!this.writer) {
552+
return 0;
553+
}
554+
555+
return this.writer.llen(this.key);
556+
}
557+
544558
/**
545559
* Returns true if publisher mode is enabled on this queue, false otherwise.
546560
*
@@ -698,6 +712,8 @@ export class RedisQueue extends EventEmitter<EventMap>
698712
retryStrategy: this.retryStrategy(context),
699713
autoResubscribe: true,
700714
enableReadyCheck: true,
715+
enableOfflineQueue: true,
716+
autoResendUnfulfilledCommands: true,
701717
});
702718

703719
context[channel] = makeRedisSafe(redis);

0 commit comments

Comments
 (0)