Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { BaseService, ServiceValidationError } from "./baseService.server";
import { CrashTaskRunService } from "./crashTaskRun.server";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { findQueueInEnvironment } from "~/models/taskQueue.server";
import { FINAL_RUN_STATUSES } from "../taskStatus";

export class CreateTaskRunAttemptService extends BaseService {
public async call({
Expand Down Expand Up @@ -91,11 +92,17 @@ export class CreateTaskRunAttemptService extends BaseService {

span.setAttribute("taskRunId", taskRun.id);
span.setAttribute("taskRunFriendlyId", taskRun.friendlyId);
span.setAttribute("taskRunStatus", taskRun.status);

if (taskRun.status === "CANCELED") {
throw new ServiceValidationError("Task run is cancelled", 400);
}

// If the run is finalized, it's pointless to create another attempt
if (FINAL_RUN_STATUSES.includes(taskRun.status)) {
throw new ServiceValidationError("Task run is already finished", 400);
}

const lockedBy = taskRun.lockedBy;

if (!lockedBy) {
Expand Down
33 changes: 27 additions & 6 deletions apps/webapp/app/v3/services/resumeAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export class ResumeAttemptService extends BaseService {
take: 1,
select: {
id: true,
number: true,
status: true,
},
} satisfies Prisma.TaskRunInclude["attempts"];

Expand All @@ -37,9 +39,9 @@ export class ResumeAttemptService extends BaseService {
include: {
taskRun: true,
dependencies: {
include: {
select: {
taskRun: {
include: {
select: {
attempts: latestAttemptSelect,
},
},
Expand All @@ -50,11 +52,11 @@ export class ResumeAttemptService extends BaseService {
take: 1,
},
batchDependencies: {
include: {
select: {
items: {
include: {
select: {
taskRun: {
include: {
select: {
attempts: latestAttemptSelect,
},
},
Expand Down Expand Up @@ -130,7 +132,26 @@ export class ResumeAttemptService extends BaseService {
return;
}

completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id);
//find the best attempt for each batch item
//it should be the most recent one in a final state
const finalAttempts = dependentBatchItems.map((item) => {
return item.taskRun.attempts
.filter((a) => FINAL_ATTEMPT_STATUSES.includes(a.status))
.sort((a, b) => b.number - a.number)[0];
});

completedAttemptIds = finalAttempts.map((a) => a.id);

if (completedAttemptIds.length !== dependentBatchItems.length) {
this._logger.error("[ResumeAttemptService] not all batch items have attempts", {
runId: attempt.taskRunId,
completedAttemptIds,
finalAttempts,
dependentBatchItems,
});

return;
}
} else {
this._logger.error("No batch dependency");
return;
Expand Down
2 changes: 2 additions & 0 deletions references/hello-world/src/trigger/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ export const batchTask = task({

const results = await childTask.batchTriggerAndWait(items);

logger.info("Batch task complete", { results });

return {
batchCount: payload.count,
results,
Expand Down