Skip to content

Commit 6fd1fd9

Browse files
committed
WIP post-merge conflicts
1 parent 71356cd commit 6fd1fd9

File tree

4 files changed

+82
-40
lines changed

4 files changed

+82
-40
lines changed

apps/coordinator/src/checkpointer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ExponentialBackoff } from "@trigger.dev/core/v3/apps";
2-
import { testDockerCheckpoint } from "@trigger.dev/core/v3/apps";
2+
import { testDockerCheckpoint } from "@trigger.dev/core/v3/checkpoints";
33
import { nanoid } from "nanoid";
44
import fs from "node:fs/promises";
55
import { ChaosMonkey } from "./chaosMonkey";

apps/webapp/app/v3/services/cancelTaskRunV1.server.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { BaseService } from "./baseService.server";
99
import { CancelAttemptService } from "./cancelAttempt.server";
1010
import { CancelTaskAttemptDependenciesService } from "./cancelTaskAttemptDependencies.server";
1111
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
12+
import { getTaskEventStoreTableForRun } from "../taskEventStore.server";
1213

1314
type ExtendedTaskRun = Prisma.TaskRunGetPayload<{
1415
include: {
@@ -83,9 +84,14 @@ export class CancelTaskRunServiceV1 extends BaseService {
8384
},
8485
});
8586

86-
const inProgressEvents = await eventRepository.queryIncompleteEvents({
87-
runId: taskRun.friendlyId,
88-
});
87+
const inProgressEvents = await eventRepository.queryIncompleteEvents(
88+
getTaskEventStoreTableForRun(taskRun),
89+
{
90+
runId: taskRun.friendlyId,
91+
},
92+
taskRun.createdAt,
93+
taskRun.completedAt ?? undefined
94+
);
8995

9096
logger.debug("Cancelling in-progress events", {
9197
inProgressEvents: inProgressEvents.map((event) => event.id),

apps/webapp/app/v3/services/triggerTaskV1.server.ts

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import {
33
packetRequiresOffloading,
44
QueueOptions,
55
SemanticInternalAttributes,
6+
taskRunErrorToString,
7+
taskRunErrorEnhancer,
68
TriggerTaskRequestBody,
79
} from "@trigger.dev/core/v3";
810
import {
@@ -39,6 +41,8 @@ import {
3941
TriggerTaskServiceOptions,
4042
TriggerTaskServiceResult,
4143
} from "./triggerTask.server";
44+
import { getTaskEventStore } from "../taskEventStore.server";
45+
import { enqueueRun } from "./enqueueRun.server";
4246

4347
/** @deprecated Use TriggerTaskService in `triggerTask.server.ts` instead. */
4448
export class TriggerTaskServiceV1 extends BaseService {
@@ -168,6 +172,8 @@ export class TriggerTaskServiceV1 extends BaseService {
168172
taskIdentifier: true,
169173
rootTaskRunId: true,
170174
depth: true,
175+
queueTimestamp: true,
176+
queue: true,
171177
},
172178
},
173179
},
@@ -224,6 +230,8 @@ export class TriggerTaskServiceV1 extends BaseService {
224230
taskIdentifier: true,
225231
rootTaskRunId: true,
226232
depth: true,
233+
queueTimestamp: true,
234+
queue: true,
227235
},
228236
},
229237
},
@@ -276,7 +284,7 @@ export class TriggerTaskServiceV1 extends BaseService {
276284
: undefined;
277285

278286
try {
279-
return await eventRepository.traceEvent(
287+
const result = await eventRepository.traceEvent(
280288
taskId,
281289
{
282290
context: options.traceContext,
@@ -349,6 +357,12 @@ export class TriggerTaskServiceV1 extends BaseService {
349357
? dependentBatchRun.dependentTaskAttempt.taskRun.depth + 1
350358
: 0;
351359

360+
const queueTimestamp =
361+
dependentAttempt?.taskRun.queueTimestamp ??
362+
dependentBatchRun?.dependentTaskAttempt?.taskRun.queueTimestamp ??
363+
delayUntil ??
364+
new Date();
365+
352366
const taskRun = await tx.taskRun.create({
353367
data: {
354368
status: delayUntil ? "DELAYED" : "PENDING",
@@ -376,7 +390,9 @@ export class TriggerTaskServiceV1 extends BaseService {
376390
isTest: body.options?.test ?? false,
377391
delayUntil,
378392
queuedAt: delayUntil ? undefined : new Date(),
393+
queueTimestamp,
379394
maxAttempts: body.options?.maxAttempts,
395+
taskEventStore: getTaskEventStore(),
380396
ttl,
381397
tags:
382398
tagIds.length === 0
@@ -528,44 +544,61 @@ export class TriggerTaskServiceV1 extends BaseService {
528544
this._prisma
529545
);
530546

531-
//release the concurrency for the env and org, if part of a (batch)triggerAndWait
532-
if (dependentAttempt) {
533-
const isSameTask = dependentAttempt.taskRun.taskIdentifier === taskId;
534-
await marqs?.releaseConcurrency(dependentAttempt.taskRun.id, isSameTask);
535-
}
536-
if (dependentBatchRun?.dependentTaskAttempt) {
537-
const isSameTask =
538-
dependentBatchRun.dependentTaskAttempt.taskRun.taskIdentifier === taskId;
539-
await marqs?.releaseConcurrency(
540-
dependentBatchRun.dependentTaskAttempt.taskRun.id,
541-
isSameTask
542-
);
543-
}
544-
545547
if (!run) {
546548
return;
547549
}
548550

549-
// We need to enqueue the task run into the appropriate queue. This is done after the tx completes to prevent a race condition where the task run hasn't been created yet by the time we dequeue.
551+
// Now enqueue the run if it's not delayed
550552
if (run.status === "PENDING") {
551-
await marqs?.enqueueMessage(
552-
environment,
553-
run.queue,
554-
run.id,
555-
{
556-
type: "EXECUTE",
557-
taskIdentifier: taskId,
558-
projectId: environment.projectId,
559-
environmentId: environment.id,
560-
environmentType: environment.type,
561-
},
562-
body.options?.concurrencyKey
563-
);
553+
const enqueueResult = await enqueueRun({
554+
env: environment,
555+
run,
556+
dependentRun:
557+
dependentAttempt?.taskRun ?? dependentBatchRun?.dependentTaskAttempt?.taskRun,
558+
});
559+
560+
if (!enqueueResult.ok) {
561+
// Now we need to fail the run with enqueueResult.error and make sure and
562+
// set the traced event to failed as well
563+
await this._prisma.taskRun.update({
564+
where: { id: run.id },
565+
data: {
566+
status: "SYSTEM_FAILURE",
567+
completedAt: new Date(),
568+
error: enqueueResult.error,
569+
},
570+
});
571+
572+
event.failWithError(enqueueResult.error);
573+
574+
return {
575+
run,
576+
isCached: false,
577+
error: enqueueResult.error,
578+
};
579+
}
564580
}
565581

566582
return { run, isCached: false };
567583
}
568584
);
585+
586+
if (result?.error) {
587+
throw new ServiceValidationError(
588+
taskRunErrorToString(taskRunErrorEnhancer(result.error))
589+
);
590+
}
591+
592+
const run = result?.run;
593+
594+
if (!run) {
595+
return;
596+
}
597+
598+
return {
599+
run,
600+
isCached: result?.isCached,
601+
};
569602
} catch (error) {
570603
// Detect a prisma transaction Unique constraint violation
571604
if (error instanceof Prisma.PrismaClientKnownRequestError) {

apps/webapp/app/v3/taskEventStore.server.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export type TraceEvent = Pick<
2020
| "level"
2121
| "events"
2222
| "environmentType"
23+
| "isDebug"
2324
>;
2425

2526
export type TaskEventStoreTable = "taskEvent" | "taskEventPartitioned";
@@ -122,7 +123,7 @@ export class TaskEventStore {
122123
) {
123124
if (table === "taskEventPartitioned") {
124125
return await this.readReplica.$queryRaw<TraceEvent[]>`
125-
SELECT
126+
SELECT
126127
"spanId",
127128
"parentId",
128129
"runId",
@@ -136,11 +137,12 @@ export class TaskEventStore {
136137
"isCancelled",
137138
level,
138139
events,
139-
"environmentType"
140+
"environmentType",
141+
"isDebug"
140142
FROM "TaskEventPartitioned"
141-
WHERE
142-
"traceId" = ${traceId}
143-
AND "createdAt" >= ${startCreatedAt.toISOString()}::timestamp
143+
WHERE
144+
"traceId" = ${traceId}
145+
AND "createdAt" >= ${startCreatedAt.toISOString()}::timestamp
144146
AND "createdAt" < ${(endCreatedAt
145147
? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000)
146148
: new Date()
@@ -150,7 +152,7 @@ export class TaskEventStore {
150152
`;
151153
} else {
152154
return await this.readReplica.$queryRaw<TraceEvent[]>`
153-
SELECT
155+
SELECT
154156
id,
155157
"spanId",
156158
"parentId",
@@ -165,7 +167,8 @@ export class TaskEventStore {
165167
"isCancelled",
166168
level,
167169
events,
168-
"environmentType"
170+
"environmentType",
171+
"isDebug"
169172
FROM "TaskEvent"
170173
WHERE "traceId" = ${traceId}
171174
ORDER BY "startTime" ASC

0 commit comments

Comments
 (0)