Skip to content

Commit 6259b3d

Browse files
committed
Allow blocking a run with multiple waitpoints at once. Made it atomic
1 parent c1306eb commit 6259b3d

File tree

1 file changed

+30
-31
lines changed
  • internal-packages/run-engine/src/engine

1 file changed

+30
-31
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,20 +1577,19 @@ export class RunEngine {
15771577
waitpointId,
15781578
projectId,
15791579
failAfter,
1580-
checkWaitpointIsPending = false,
15811580
tx,
15821581
}: {
15831582
runId: string;
1584-
waitpointId: string;
1583+
waitpointId: string | string[];
15851584
environmentId: string;
15861585
projectId: string;
15871586
failAfter?: Date;
1588-
/** If the waitpoint could be completed, i.e. not inside a run lock and not new */
1589-
checkWaitpointIsPending?: boolean;
15901587
tx?: PrismaClientOrTransaction;
15911588
}): Promise<TaskRunExecutionSnapshot> {
15921589
const prisma = tx ?? this.prisma;
15931590

1591+
let waitpointIds = typeof waitpointId === "string" ? [waitpointId] : waitpointId;
1592+
15941593
return await this.runLock.lock([runId], 5000, async (signal) => {
15951594
let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId);
15961595

@@ -1602,28 +1601,26 @@ export class RunEngine {
16021601
newStatus = "EXECUTING_WITH_WAITPOINTS";
16031602
}
16041603

1605-
if (checkWaitpointIsPending) {
1606-
const waitpoint = await prisma.waitpoint.findUnique({
1607-
where: { id: waitpointId },
1608-
});
1609-
1610-
if (!waitpoint) {
1611-
throw new ServiceValidationError("Waitpoint not found", 404);
1612-
}
1613-
1614-
//the waitpoint has been completed since it was retrieved
1615-
if (waitpoint.status !== "PENDING") {
1616-
return snapshot;
1617-
}
1618-
}
1619-
1620-
const taskWaitpoint = await prisma.taskRunWaitpoint.create({
1621-
data: {
1622-
taskRunId: runId,
1623-
waitpointId: waitpointId,
1624-
projectId: projectId,
1604+
const insertedBlockers = await $transaction(
1605+
prisma,
1606+
async (tx) => {
1607+
return tx.$executeRaw`
1608+
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt")
1609+
SELECT
1610+
gen_random_uuid(),
1611+
${runId},
1612+
w.id,
1613+
${projectId},
1614+
NOW(),
1615+
NOW()
1616+
FROM "Waitpoint" w
1617+
WHERE w.id IN (${Prisma.join(waitpointIds)})
1618+
AND w.status = 'PENDING'
1619+
ON CONFLICT DO NOTHING;
1620+
`;
16251621
},
1626-
});
1622+
(error) => {}
1623+
);
16271624

16281625
//if the state has changed, create a new snapshot
16291626
if (newStatus !== snapshot.executionStatus) {
@@ -1641,12 +1638,14 @@ export class RunEngine {
16411638
}
16421639

16431640
if (failAfter) {
1644-
await this.worker.enqueue({
1645-
id: `finishWaitpoint.${waitpointId}`,
1646-
job: "finishWaitpoint",
1647-
payload: { waitpointId, error: "Waitpoint timed out" },
1648-
availableAt: failAfter,
1649-
});
1641+
for (const waitpointId of waitpointIds) {
1642+
await this.worker.enqueue({
1643+
id: `finishWaitpoint.${waitpointId}`,
1644+
job: "finishWaitpoint",
1645+
payload: { waitpointId, error: "Waitpoint timed out" },
1646+
availableAt: failAfter,
1647+
});
1648+
}
16501649
}
16511650

16521651
return snapshot;

0 commit comments

Comments
 (0)