Skip to content

Commit 1c33bab

Browse files
committed
prevent checkpoint creation for resumed task waits
1 parent 7c51186 commit 1c33bab

File tree

4 files changed

+102
-6
lines changed

4 files changed

+102
-6
lines changed

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,36 +97,77 @@ export class CreateCheckpointService extends BaseService {
9797

9898
// Check if we should accept this checkpoint
9999
switch (reason.type) {
100-
case "MANUAL":
100+
case "MANUAL": {
101101
// Always accept manual checkpoints
102102
break;
103-
case "WAIT_FOR_DURATION":
103+
}
104+
case "WAIT_FOR_DURATION": {
104105
// Always accept duration checkpoints
105106
break;
107+
}
106108
case "WAIT_FOR_TASK": {
107-
// TODO
109+
const childRun = await this._prisma.taskRun.findFirst({
110+
where: {
111+
friendlyId: reason.friendlyId,
112+
},
113+
select: {
114+
dependency: {
115+
select: {
116+
resumedAt: true,
117+
},
118+
},
119+
},
120+
});
121+
122+
if (!childRun) {
123+
logger.error("CreateCheckpointService: Pre-check - WAIT_FOR_TASK child run not found", {
124+
friendlyId: reason.friendlyId,
125+
params,
126+
});
127+
128+
return {
129+
success: false,
130+
keepRunAlive: false,
131+
};
132+
}
133+
134+
if (childRun.dependency?.resumedAt) {
135+
logger.error("CreateCheckpointService: Child run already resumed", {
136+
childRun,
137+
params,
138+
});
139+
140+
return {
141+
success: false,
142+
keepRunAlive: true,
143+
};
144+
}
145+
108146
break;
109147
}
110148
case "WAIT_FOR_BATCH": {
111149
const batchRun = await this._prisma.batchTaskRun.findFirst({
112150
where: {
113151
friendlyId: reason.batchFriendlyId,
114152
},
153+
select: {
154+
resumedAt: true,
155+
},
115156
});
116157

117158
if (!batchRun) {
118-
logger.error("CreateCheckpointService: Batch not found", {
159+
logger.error("CreateCheckpointService: Pre-check - Batch not found", {
119160
batchFriendlyId: reason.batchFriendlyId,
120161
params,
121162
});
122163

123164
return {
124165
success: false,
125-
keepRunAlive: true,
166+
keepRunAlive: false,
126167
};
127168
}
128169

129-
if (batchRun.batchVersion === "v3" && batchRun.resumedAt) {
170+
if (batchRun.resumedAt) {
130171
logger.error("CreateCheckpointService: Batch already resumed", {
131172
batchRun,
132173
params,
@@ -140,6 +181,9 @@ export class CreateCheckpointService extends BaseService {
140181

141182
break;
142183
}
184+
default: {
185+
break;
186+
}
143187
}
144188

145189
//sleep to test slow checkpoints

apps/webapp/app/v3/services/resumeTaskDependency.server.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { workerQueue } from "~/services/worker.server";
33
import { marqs } from "~/v3/marqs/index.server";
44
import { BaseService } from "./baseService.server";
55
import { logger } from "~/services/logger.server";
6+
import { TaskRunDependency } from "@trigger.dev/database";
67

78
export class ResumeTaskDependencyService extends BaseService {
89
public async call(dependencyId: string, sourceTaskAttemptId: string) {
@@ -49,6 +50,21 @@ export class ResumeTaskDependencyService extends BaseService {
4950
runId: dependentRun.id,
5051
}
5152
);
53+
54+
const wasUpdated = await this.#setDependencyToResumedOnce(dependency);
55+
56+
if (!wasUpdated) {
57+
logger.debug("Task dependency resume: Attempt with checkpoint was already resumed", {
58+
attemptId: dependency.id,
59+
dependentAttempt: dependency.dependentAttempt,
60+
checkpointEventId: dependency.checkpointEventId,
61+
hasCheckpointEvent: !!dependency.checkpointEventId,
62+
runId: dependentRun.id,
63+
});
64+
return;
65+
}
66+
67+
5268
await marqs?.enqueueMessage(
5369
dependency.taskRun.runtimeEnvironment,
5470
dependentRun.queue,
@@ -85,6 +101,19 @@ export class ResumeTaskDependencyService extends BaseService {
85101
return;
86102
}
87103

104+
const wasUpdated = await this.#setDependencyToResumedOnce(dependency);
105+
106+
if (!wasUpdated) {
107+
logger.debug("Task dependency resume: Attempt without checkpoint was already resumed", {
108+
attemptId: dependency.id,
109+
dependentAttempt: dependency.dependentAttempt,
110+
checkpointEventId: dependency.checkpointEventId,
111+
hasCheckpointEvent: !!dependency.checkpointEventId,
112+
runId: dependentRun.id,
113+
});
114+
return;
115+
}
116+
88117
await marqs?.replaceMessage(
89118
dependentRun.id,
90119
{
@@ -102,6 +131,26 @@ export class ResumeTaskDependencyService extends BaseService {
102131
}
103132
}
104133

134+
async #setDependencyToResumedOnce(dependency: TaskRunDependency) {
135+
const result = await this._prisma.taskRunDependency.updateMany({
136+
where: {
137+
id: dependency.id,
138+
resumedAt: null,
139+
},
140+
data: {
141+
resumedAt: new Date(),
142+
},
143+
});
144+
145+
// Check if any records were updated
146+
if (result.count > 0) {
147+
// The status was changed, so we return true
148+
return true;
149+
} else {
150+
return false;
151+
}
152+
}
153+
105154
static async enqueue(
106155
dependencyId: string,
107156
sourceTaskAttemptId: string,
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "TaskRunDependency" ADD COLUMN "resumedAt" TIMESTAMP(3);

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1915,6 +1915,7 @@ model TaskRunDependency {
19151915
19161916
createdAt DateTime @default(now())
19171917
updatedAt DateTime @updatedAt
1918+
resumedAt DateTime?
19181919
19191920
@@index([dependentAttemptId])
19201921
@@index([dependentBatchRunId])

0 commit comments

Comments
 (0)