From 38fcc8515b0f58f3cf154bd283f9a58d6ec4cc0b Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 13 Jan 2025 17:26:24 +0000 Subject: [PATCH] Add reason to MarQS acknowledgeMessage to track why it's called --- .../app/v3/marqs/devQueueConsumer.server.ts | 32 +++++++++++++++---- apps/webapp/app/v3/marqs/index.server.ts | 6 +++- .../v3/marqs/sharedQueueConsumer.server.ts | 2 +- apps/webapp/app/v3/requeueTaskRun.server.ts | 10 ++++-- .../v3/services/createCheckpoint.server.ts | 5 ++- .../app/v3/services/finalizeTaskRun.server.ts | 2 +- .../app/v3/services/resumeAttempt.server.ts | 10 ++++-- 7 files changed, 52 insertions(+), 15 deletions(-) diff --git a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts index 5cf2131921..ca520d8199 100644 --- a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts @@ -329,13 +329,16 @@ export class DevQueueConsumer { env: this.env, }); - await marqs?.acknowledgeMessage(message.messageId); + await marqs?.acknowledgeMessage( + message.messageId, + "Failed to parse message.data with MessageBody schema in DevQueueConsumer" + ); setTimeout(() => this.#doWork(), 100); return; } - const existingTaskRun = await prisma.taskRun.findUnique({ + const existingTaskRun = await prisma.taskRun.findFirst({ where: { id: message.messageId, }, @@ -346,7 +349,10 @@ export class DevQueueConsumer { messageId: message.messageId, }); - await marqs?.acknowledgeMessage(message.messageId); + await marqs?.acknowledgeMessage( + message.messageId, + "Failed to find task run in DevQueueConsumer" + ); setTimeout(() => this.#doWork(), 100); return; } @@ -365,7 +371,10 @@ export class DevQueueConsumer { latestWorker: this.#getLatestBackgroundWorker(), }); - await marqs?.acknowledgeMessage(message.messageId); + await marqs?.acknowledgeMessage( + message.messageId, + "Failed to find background worker in DevQueueConsumer" + ); setTimeout(() => this.#doWork(), 100); return; } @@ -382,7 +391,10 @@ export class DevQueueConsumer { taskSlugs: backgroundWorker.tasks.map((task) => task.slug), }); - await marqs?.acknowledgeMessage(message.messageId); + await marqs?.acknowledgeMessage( + message.messageId, + "No matching background task found in DevQueueConsumer" + ); setTimeout(() => this.#doWork(), 100); return; @@ -416,7 +428,10 @@ export class DevQueueConsumer { messageId: message.messageId, }); - await marqs?.acknowledgeMessage(message.messageId); + await marqs?.acknowledgeMessage( + message.messageId, + "Failed to lock task run in DevQueueConsumer" + ); setTimeout(() => this.#doWork(), 100); return; @@ -515,7 +530,10 @@ export class DevQueueConsumer { messageId: message.messageId, backgroundWorker, }); - await marqs?.acknowledgeMessage(message.messageId); + await marqs?.acknowledgeMessage( + message.messageId, + "Non-lazy attempts are no longer supported in DevQueueConsumer" + ); setTimeout(() => this.#doWork(), 100); } diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 6839f7761b..31b1bc2a9c 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -291,6 +291,8 @@ export class MarQS { orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue), messageId: messageData.messageId, }); + + return; } await this.options.visibilityTimeoutStrategy.heartbeat( @@ -411,7 +413,7 @@ export class MarQS { ); } - public async acknowledgeMessage(messageId: string) { + public async acknowledgeMessage(messageId: string, reason: string = "unknown") { return this.#trace( "acknowledgeMessage", async (span) => { @@ -421,6 +423,7 @@ export class MarQS { logger.log(`[${this.name}].acknowledgeMessage() message not found`, { messageId, service: this.name, + reason, }); return; } @@ -430,6 +433,7 @@ export class MarQS { [SemanticAttributes.MESSAGE_ID]: message.messageId, [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, + ["marqs.reason"]: reason, }); await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId); diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index e313510948..b8cd96a6e3 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -975,7 +975,7 @@ export class SharedQueueConsumer { } async #ackAndDoMoreWork(messageId: string, intervalInMs?: number) { - await marqs?.acknowledgeMessage(messageId); + await marqs?.acknowledgeMessage(messageId, "Acking and doing more work in SharedQueueConsumer"); this.#doMoreWork(intervalInMs); } diff --git a/apps/webapp/app/v3/requeueTaskRun.server.ts b/apps/webapp/app/v3/requeueTaskRun.server.ts index 5965fef321..9105588c68 100644 --- a/apps/webapp/app/v3/requeueTaskRun.server.ts +++ b/apps/webapp/app/v3/requeueTaskRun.server.ts @@ -71,7 +71,10 @@ export class RequeueTaskRunService extends BaseService { case "WAITING_FOR_DEPLOY": { logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun }); - await marqs?.acknowledgeMessage(taskRun.id); + await marqs?.acknowledgeMessage( + taskRun.id, + "Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in RequeueTaskRunService" + ); break; } @@ -93,7 +96,10 @@ export class RequeueTaskRunService extends BaseService { case "CANCELED": { logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun }); - await marqs?.acknowledgeMessage(taskRun.id); + await marqs?.acknowledgeMessage( + taskRun.id, + "Task run is already completed in RequeueTaskRunService" + ); try { if (taskRun.runtimeEnvironment.type === "DEVELOPMENT") { diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 7290424248..d1b80e6e75 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -307,7 +307,10 @@ export class CreateCheckpointService extends BaseService { checkpointId: checkpoint.id, params, }); - await marqs?.acknowledgeMessage(attempt.taskRunId); + await marqs?.acknowledgeMessage( + attempt.taskRunId, + "No checkpoint event in CreateCheckpointService" + ); return { success: false, diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index 3c87dd5f41..b31546cb52 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -61,7 +61,7 @@ export class FinalizeTaskRunService extends BaseService { expiredAt, completedAt, }); - await marqs?.acknowledgeMessage(id); + await marqs?.acknowledgeMessage(id, "FinalTaskRunService call"); logger.debug("Finalizing run updating run status", { id, diff --git a/apps/webapp/app/v3/services/resumeAttempt.server.ts b/apps/webapp/app/v3/services/resumeAttempt.server.ts index daf8d96936..7f927e7517 100644 --- a/apps/webapp/app/v3/services/resumeAttempt.server.ts +++ b/apps/webapp/app/v3/services/resumeAttempt.server.ts @@ -172,7 +172,10 @@ export class ResumeAttemptService extends BaseService { if (!completedAttempt) { this._logger.error("Completed attempt not found", { completedAttemptId }); - await marqs?.acknowledgeMessage(attempt.taskRunId); + await marqs?.acknowledgeMessage( + attempt.taskRunId, + "Cannot find completed attempt in ResumeAttemptService" + ); return; } @@ -186,7 +189,10 @@ export class ResumeAttemptService extends BaseService { if (!resumePayload) { logger.error("Failed to get resume payload"); - await marqs?.acknowledgeMessage(attempt.taskRunId); + await marqs?.acknowledgeMessage( + attempt.taskRunId, + "Failed to get resume payload in ResumeAttemptService" + ); return; }