Skip to content

Commit d0423fd

Browse files
feat: enhance task logging and job options in QueueManager - Add logging of parameters in MonkeyCapture decorator for better traceability - Update TaskGroup logging level from debug to info for improved clarity - Introduce new job options in QueueManager for better task management, including removeOnFail and removeDependencyOnFailure - Refactor Worker class to utilize updated job options and handle timeouts more effectively
1 parent 2e8f07b commit d0423fd

File tree

5 files changed

+38
-10
lines changed

5 files changed

+38
-10
lines changed

packages/core/src/decorators/monkeyLog.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,18 @@ export function MonkeyCapture(fn: Function) {
1717

1818
const taskHistoryKey = `${WORKER_KEY}:${workerId}:task:${job.id}:logs`;
1919

20+
const parameters = args[3];
21+
22+
redis.lpush(
23+
taskHistoryKey,
24+
JSON.stringify({
25+
timestamp: new Date().toISOString(),
26+
level: "info",
27+
message: "parameters",
28+
functionArgs: parameters,
29+
})
30+
);
31+
2032
// grep all the internal functions inside the wrapped function
2133
const internalFunctions = fn.toString().match(/function\s+(\w+)\s*\(/g);
2234

packages/core/src/groups/taskGroup.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ export class TaskGroup {
696696

697697
// In client mode, there's no worker and no processing needed
698698
if (!this.worker) {
699-
logger.debug("🔄 TaskGroup: No worker available (client mode), skipping processing", {
699+
logger.info("🔄 TaskGroup: No worker available (client mode), skipping processing", {
700700
file: "taskGroup.ts",
701701
function: "startProcessing",
702702
groupName: this.config.name,

packages/core/src/queue/queueManager.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Queue, QueueOptions, QueueEvents } from "bullmq";
1+
import { Queue, QueueOptions, QueueEvents, JobsOptions } from "bullmq";
22
import { Task, TaskOptions, WorkerConfig } from "../types/interfaces";
33
import {
44
TaskState,
@@ -59,8 +59,6 @@ export class QueueManager {
5959
this.metrics = new QueueMetrics(this.redis);
6060
this.shouldCreateWorkers = createWorkers;
6161

62-
console.log("🔍 QueueManager constructor:", { defaultQueueName, createWorkers, shouldCreateWorkers: this.shouldCreateWorkers });
63-
6462
const finalQueueOptions: QueueOptions = {
6563
connection: this.redis.options,
6664
...queueOptions,
@@ -411,22 +409,32 @@ export class QueueManager {
411409
updatedAt: new Date(),
412410
};
413411

414-
const jobOptions = {
415-
id: task.id,
412+
data['options'] = {
413+
timeout: options.timeout || 300000,
414+
weight: options.weight || 1,
415+
group: options.group || null,
416+
}
417+
418+
const jobOptions: JobsOptions = {
416419
priority: options.priority,
417420
attempts: options.maxRetries,
418421
backoff: {
419422
type: "exponential",
420423
delay: options.retryDelay || 3000,
421424
},
422-
timeout: options.timeout || 300000, // 5 minutes
423425
removeOnComplete: options.removeOnComplete || false,
424426
repeat: options.schedule,
425427
jobId: task.id,
428+
removeOnFail: options.removeOnFail || false,
429+
removeDependencyOnFailure: options.removeDependencyOnFailure || false,
430+
// @ts-ignore
431+
weight: options.weight || 1,
432+
// @ts-ignore
433+
group: options.group || null,
426434
};
427435

428436
const job = jobOptions.repeat?.pattern
429-
? await queue!.upsertJobScheduler(jobOptions.id, jobOptions.repeat, {
437+
? await queue!.upsertJobScheduler(task.id, jobOptions.repeat, {
430438
name,
431439
data,
432440
opts: jobOptions,
@@ -573,7 +581,8 @@ export class QueueManager {
573581

574582
const group = await this.getGroup(groupName);
575583
await group.addTask(methodName, taskOptions, taskData);
576-
await group.startProcessing();
584+
group.startProcessing();
585+
return;
577586
}
578587

579588
async ensureTaskInQueue(task: Task, queueName: string): Promise<void> {

packages/core/src/types/interfaces.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ export interface TaskOptions {
3333
removeOnComplete?: boolean;
3434
/** Weight of the task within its group (higher weight = higher priority) */
3535
weight?: number;
36+
/** Remove the task from the queue after failure */
37+
removeOnFail?: boolean;
38+
/** Remove the task from the queue after failure */
39+
removeDependencyOnFailure?: boolean;
3640
}
3741

3842
/**

packages/core/src/workers/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ export class Worker extends BullWorker {
9494
const startTime = Date.now();
9595
const group = job.data.options?.group;
9696

97+
// @ts-ignore
98+
const timeout = job.data.options?.timeout || job.opts['timeout'] || 300000; // in milliseconds, defaults to 5 minutes
99+
97100
// Track active task
98101
await this.addActiveTask(job.id!, job.name);
99102

@@ -110,6 +113,7 @@ export class Worker extends BullWorker {
110113
name: job.name,
111114
group: group,
112115
data: job.data.args ?? job.data,
116+
timeout: timeout
113117
});
114118

115119
let data = job.data.args ?? job.data;
@@ -125,7 +129,6 @@ export class Worker extends BullWorker {
125129

126130
await job.updateProgress(0);
127131

128-
const timeout = job.data.options?.timeout || 300000; // in milliseconds, defaults to 5 minutes
129132
const timeoutPromise = new Promise((_, reject) => {
130133
setTimeout(() => {
131134
reject(new Error(`Task ${job.name} timed out after ${timeout}ms`));

0 commit comments

Comments
 (0)