Skip to content

Commit fc76b31

Browse files
committed
Added pending count
1 parent 28330c1 commit fc76b31

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

src/workflows/WorkflowContext.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,10 @@ export default class WorkflowContext {
316316
// console.log(... a);
317317
}
318318

319+
public getPendingCount({ taskGroup = void 0} = { }) {
320+
return this.storage.getPendingWorkflowCount({ taskGroup });
321+
}
322+
319323
public async processQueueOnce({ taskGroup = "default", signal = void 0 as AbortSignal } = {}) {
320324
const pending = await this.storage.dequeue(taskGroup, signal);
321325
// run...

src/workflows/WorkflowStorage.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ export default class WorkflowStorage {
2525

2626
}
2727

28+
getPendingWorkflowCount({ taskGroup = void 0 } = { }) {
29+
const db = new WorkflowDbContext(this.driver);
30+
let q = db.workflows.where(void 0, (p) => (x) => x.isWorkflow === true
31+
&& x.state === "queued"
32+
);
33+
if (taskGroup) {
34+
q = q.where({ taskGroup}, (p) => (x) => x.taskGroup === p.taskGroup);
35+
}
36+
return q.count();
37+
}
38+
2839
async getNextEta(throttle: { group: string, maxPerSecond: number }) {
2940

3041
const db = new WorkflowDbContext(this.driver);

0 commit comments

Comments
 (0)