Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,16 @@ export class DevQueueConsumer {
env: this.env,
});

await marqs?.acknowledgeMessage(message.messageId);
await marqs?.acknowledgeMessage(
message.messageId,
"Failed to parse message.data with MessageBody schema in DevQueueConsumer"
);

setTimeout(() => this.#doWork(), 100);
return;
}

const existingTaskRun = await prisma.taskRun.findUnique({
const existingTaskRun = await prisma.taskRun.findFirst({
where: {
id: message.messageId,
},
Expand All @@ -346,7 +349,10 @@ export class DevQueueConsumer {
messageId: message.messageId,
});

await marqs?.acknowledgeMessage(message.messageId);
await marqs?.acknowledgeMessage(
message.messageId,
"Failed to find task run in DevQueueConsumer"
);
setTimeout(() => this.#doWork(), 100);
return;
}
Expand All @@ -365,7 +371,10 @@ export class DevQueueConsumer {
latestWorker: this.#getLatestBackgroundWorker(),
});

await marqs?.acknowledgeMessage(message.messageId);
await marqs?.acknowledgeMessage(
message.messageId,
"Failed to find background worker in DevQueueConsumer"
);
setTimeout(() => this.#doWork(), 100);
return;
}
Expand All @@ -382,7 +391,10 @@ export class DevQueueConsumer {
taskSlugs: backgroundWorker.tasks.map((task) => task.slug),
});

await marqs?.acknowledgeMessage(message.messageId);
await marqs?.acknowledgeMessage(
message.messageId,
"No matching background task found in DevQueueConsumer"
);

setTimeout(() => this.#doWork(), 100);
return;
Expand Down Expand Up @@ -416,7 +428,10 @@ export class DevQueueConsumer {
messageId: message.messageId,
});

await marqs?.acknowledgeMessage(message.messageId);
await marqs?.acknowledgeMessage(
message.messageId,
"Failed to lock task run in DevQueueConsumer"
);

setTimeout(() => this.#doWork(), 100);
return;
Expand Down Expand Up @@ -515,7 +530,10 @@ export class DevQueueConsumer {
messageId: message.messageId,
backgroundWorker,
});
await marqs?.acknowledgeMessage(message.messageId);
await marqs?.acknowledgeMessage(
message.messageId,
"Non-lazy attempts are no longer supported in DevQueueConsumer"
);

setTimeout(() => this.#doWork(), 100);
}
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ export class MarQS {
orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue),
messageId: messageData.messageId,
});

return;
}

await this.options.visibilityTimeoutStrategy.heartbeat(
Expand Down Expand Up @@ -411,7 +413,7 @@ export class MarQS {
);
}

public async acknowledgeMessage(messageId: string) {
public async acknowledgeMessage(messageId: string, reason: string = "unknown") {
return this.#trace(
"acknowledgeMessage",
async (span) => {
Expand All @@ -421,6 +423,7 @@ export class MarQS {
logger.log(`[${this.name}].acknowledgeMessage() message not found`, {
messageId,
service: this.name,
reason,
});
return;
}
Expand All @@ -430,6 +433,7 @@ export class MarQS {
[SemanticAttributes.MESSAGE_ID]: message.messageId,
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
["marqs.reason"]: reason,
});

await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ export class SharedQueueConsumer {
}

async #ackAndDoMoreWork(messageId: string, intervalInMs?: number) {
await marqs?.acknowledgeMessage(messageId);
await marqs?.acknowledgeMessage(messageId, "Acking and doing more work in SharedQueueConsumer");
this.#doMoreWork(intervalInMs);
}

Expand Down
10 changes: 8 additions & 2 deletions apps/webapp/app/v3/requeueTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ export class RequeueTaskRunService extends BaseService {
case "WAITING_FOR_DEPLOY": {
logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun });

await marqs?.acknowledgeMessage(taskRun.id);
await marqs?.acknowledgeMessage(
taskRun.id,
"Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in RequeueTaskRunService"
);

break;
}
Expand All @@ -93,7 +96,10 @@ export class RequeueTaskRunService extends BaseService {
case "CANCELED": {
logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun });

await marqs?.acknowledgeMessage(taskRun.id);
await marqs?.acknowledgeMessage(
taskRun.id,
"Task run is already completed in RequeueTaskRunService"
);

try {
if (taskRun.runtimeEnvironment.type === "DEVELOPMENT") {
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/services/createCheckpoint.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,10 @@ export class CreateCheckpointService extends BaseService {
checkpointId: checkpoint.id,
params,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
await marqs?.acknowledgeMessage(
attempt.taskRunId,
"No checkpoint event in CreateCheckpointService"
);

return {
success: false,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/finalizeTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class FinalizeTaskRunService extends BaseService {
expiredAt,
completedAt,
});
await marqs?.acknowledgeMessage(id);
await marqs?.acknowledgeMessage(id, "FinalTaskRunService call");

logger.debug("Finalizing run updating run status", {
id,
Expand Down
10 changes: 8 additions & 2 deletions apps/webapp/app/v3/services/resumeAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ export class ResumeAttemptService extends BaseService {

if (!completedAttempt) {
this._logger.error("Completed attempt not found", { completedAttemptId });
await marqs?.acknowledgeMessage(attempt.taskRunId);
await marqs?.acknowledgeMessage(
attempt.taskRunId,
"Cannot find completed attempt in ResumeAttemptService"
);
return;
}

Expand All @@ -186,7 +189,10 @@ export class ResumeAttemptService extends BaseService {

if (!resumePayload) {
logger.error("Failed to get resume payload");
await marqs?.acknowledgeMessage(attempt.taskRunId);
await marqs?.acknowledgeMessage(
attempt.taskRunId,
"Failed to get resume payload in ResumeAttemptService"
);
return;
}

Expand Down
Loading