From 92c7341bd3a8bff4dba279ae8fc89e859f31d9b3 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 11 Sep 2024 17:54:49 +0100 Subject: [PATCH 1/7] Ignore /packages/cli-v3/src/package.json --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index b90f17d85b..1dac9aa9c1 100644 --- a/.gitignore +++ b/.gitignore @@ -56,4 +56,5 @@ apps/**/public/build .trigger .tshy* .yarn -*.tsbuildinfo \ No newline at end of file +*.tsbuildinfo +/packages/cli-v3/src/package.json From 639525935c8f3afcda3c78ea846e6f6bfaa8359f Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 12 Sep 2024 11:24:06 +0100 Subject: [PATCH 2/7] Added more logs when resuming a dependency, added the runId --- .../app/v3/services/resumeTaskDependency.server.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts index 1effffc0c8..ac912f9049 100644 --- a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts +++ b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts @@ -39,6 +39,16 @@ export class ResumeTaskDependencyService extends BaseService { const dependentRun = dependency.dependentAttempt.taskRun; if (dependency.dependentAttempt.status === "PAUSED" && dependency.checkpointEventId) { + logger.debug( + "Task dependency resume: Attempt is paused and there's a checkpoint. Enqueuing resume with checkpoint.", + { + attemptId: dependency.id, + dependentAttempt: dependency.dependentAttempt, + checkpointEventId: dependency.checkpointEventId, + hasCheckpointEvent: !!dependency.checkpointEventId, + runId: dependentRun.id, + } + ); await marqs?.enqueueMessage( dependency.taskRun.runtimeEnvironment, dependentRun.queue, @@ -61,6 +71,7 @@ export class ResumeTaskDependencyService extends BaseService { dependentAttempt: dependency.dependentAttempt, checkpointEventId: dependency.checkpointEventId, hasCheckpointEvent: !!dependency.checkpointEventId, + runId: dependentRun.id, }); if (dependency.dependentAttempt.status === "PAUSED" && !dependency.checkpointEventId) { From 7bea9ebfb0a3220dedecf215a1e47e9b74d0c6d8 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 12 Sep 2024 11:25:09 +0100 Subject: [PATCH 3/7] A task for reproducing a race condition with checkpoints --- .../v3-catalog/src/trigger/checkpoints.ts | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/references/v3-catalog/src/trigger/checkpoints.ts b/references/v3-catalog/src/trigger/checkpoints.ts index 16b0b39452..89964dc61c 100644 --- a/references/v3-catalog/src/trigger/checkpoints.ts +++ b/references/v3-catalog/src/trigger/checkpoints.ts @@ -1,4 +1,4 @@ -import { logger, task, wait } from "@trigger.dev/sdk/v3"; +import { logger, queue, task, wait } from "@trigger.dev/sdk/v3"; type Payload = { count?: number; @@ -70,6 +70,7 @@ export const nestedDependencies = task({ maxDepth, waitSeconds, failAttemptChance, + batchSize, }); logger.log(`Triggered complete ${i + 1}/${batchSize}`); @@ -153,3 +154,36 @@ export const bulkPermanentlyFrozen = task({ ); }, }); + +const oneAtATime = queue({ + name: "race-condition", + concurrencyLimit: 1, +}); + +export const raceConditionCheckpointDequeue = task({ + id: "race-condition-checkpoint-dequeue", + queue: oneAtATime, + run: async ({ isBatch = true }: { isBatch?: boolean }) => { + await holdConcurrency.trigger({ waitSeconds: 45 }); + + if (isBatch) { + await fixedLengthTask.batchTriggerAndWait( + Array.from({ length: 1 }, (_, i) => ({ + payload: { waitSeconds: 5 }, + })) + ); + } else { + await fixedLengthTask.triggerAndWait({ waitSeconds: 5 }); + } + + logger.log(`Successfully completed task`); + }, +}); + +export const holdConcurrency = task({ + id: "hold-concurrency", + queue: oneAtATime, + run: async ({ waitSeconds = 60 }: { waitSeconds?: number }) => { + await new Promise((resolve) => setTimeout(resolve, waitSeconds * 1000)); + }, +}); From 296959e115fe8b594c52683c2e8e21488e956f55 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 12 Sep 2024 12:07:07 +0100 Subject: [PATCH 4/7] Fix for doing remote image build when not self-hosting --- apps/webapp/app/v3/services/initializeDeployment.server.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/v3/services/initializeDeployment.server.ts b/apps/webapp/app/v3/services/initializeDeployment.server.ts index c20857a280..e711813692 100644 --- a/apps/webapp/app/v3/services/initializeDeployment.server.ts +++ b/apps/webapp/app/v3/services/initializeDeployment.server.ts @@ -29,9 +29,9 @@ export class InitializeDeploymentService extends BaseService { const nextVersion = calculateNextBuildVersion(latestDeployment?.version); // Try and create a depot build and get back the external build data - const externalBuildData = !!payload.selfHosted - ? await createRemoteImageBuild(environment.project) - : undefined; + const externalBuildData = payload.selfHosted + ? undefined + : await createRemoteImageBuild(environment.project); const triggeredBy = payload.userId ? await this._prisma.user.findUnique({ From 7ffe8c249da2a7e914235d0b5c61e998e0af17ed Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 12 Sep 2024 12:32:31 +0100 Subject: [PATCH 5/7] Set team members, alerts and schedule limits to 100m for self-hosting --- apps/webapp/app/presenters/TeamPresenter.server.ts | 3 ++- .../app/presenters/v3/AlertChannelListPresenter.server.ts | 2 +- apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts | 2 +- apps/webapp/app/v3/services/checkSchedule.server.ts | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/presenters/TeamPresenter.server.ts b/apps/webapp/app/presenters/TeamPresenter.server.ts index 4124dc5fc4..ff56f6537c 100644 --- a/apps/webapp/app/presenters/TeamPresenter.server.ts +++ b/apps/webapp/app/presenters/TeamPresenter.server.ts @@ -1,6 +1,7 @@ import { getTeamMembersAndInvites } from "~/models/member.server"; import { BasePresenter } from "./v3/basePresenter.server"; import { getLimit } from "~/services/platform.v3.server"; +import { number } from "zod"; export class TeamPresenter extends BasePresenter { public async call({ userId, organizationId }: { userId: string; organizationId: string }) { @@ -13,7 +14,7 @@ export class TeamPresenter extends BasePresenter { return; } - const limit = await getLimit(organizationId, "teamMembers", 25); + const limit = await getLimit(organizationId, "teamMembers", 100_000_000); return { ...result, diff --git a/apps/webapp/app/presenters/v3/AlertChannelListPresenter.server.ts b/apps/webapp/app/presenters/v3/AlertChannelListPresenter.server.ts index e359ef97a3..ccca8f0e7d 100644 --- a/apps/webapp/app/presenters/v3/AlertChannelListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/AlertChannelListPresenter.server.ts @@ -43,7 +43,7 @@ export class AlertChannelListPresenter extends BasePresenter { throw new Error(`Project not found: ${projectId}`); } - const limit = await getLimit(organization.organizationId, "alerts", 25); + const limit = await getLimit(organization.organizationId, "alerts", 100_000_000); return { alertChannels: await Promise.all( diff --git a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts index 0fea1ec0d8..db16d742fa 100644 --- a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts @@ -256,7 +256,7 @@ export class ScheduleListPresenter extends BasePresenter { }; }); - const limit = await getLimit(project.organizationId, "schedules", 500); + const limit = await getLimit(project.organizationId, "schedules", 100_000_000); return { currentPage: page, diff --git a/apps/webapp/app/v3/services/checkSchedule.server.ts b/apps/webapp/app/v3/services/checkSchedule.server.ts index 6422c1cf5e..a959d13008 100644 --- a/apps/webapp/app/v3/services/checkSchedule.server.ts +++ b/apps/webapp/app/v3/services/checkSchedule.server.ts @@ -76,7 +76,7 @@ export class CheckScheduleService extends BaseService { throw new ServiceValidationError("Project not found"); } - const limit = await getLimit(project.organizationId, "schedules", 500); + const limit = await getLimit(project.organizationId, "schedules", 100_000_000); const schedulesCount = await this._prisma.taskSchedule.count({ where: { projectId, From c183416ae095f25c62e9eb3dfd4bb5a854049d48 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 12 Sep 2024 12:33:01 +0100 Subject: [PATCH 6/7] Import fix --- apps/webapp/app/presenters/TeamPresenter.server.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/webapp/app/presenters/TeamPresenter.server.ts b/apps/webapp/app/presenters/TeamPresenter.server.ts index ff56f6537c..25300af191 100644 --- a/apps/webapp/app/presenters/TeamPresenter.server.ts +++ b/apps/webapp/app/presenters/TeamPresenter.server.ts @@ -1,7 +1,6 @@ import { getTeamMembersAndInvites } from "~/models/member.server"; -import { BasePresenter } from "./v3/basePresenter.server"; import { getLimit } from "~/services/platform.v3.server"; -import { number } from "zod"; +import { BasePresenter } from "./v3/basePresenter.server"; export class TeamPresenter extends BasePresenter { public async call({ userId, organizationId }: { userId: string; organizationId: string }) { From 7850e442c836b73f949376d4108fe42507b2c1fb Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 12 Sep 2024 13:29:17 +0100 Subject: [PATCH 7/7] Set the checkpointEventId in marqs when the checkpoint is created for batchTriggerAndWait This should fix a horrible race condition when at max concurrency --- apps/webapp/app/v3/services/createCheckpoint.server.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index 221c721c78..f88f0f71f1 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -258,6 +258,16 @@ export class CreateCheckpointService extends BaseService { }; } + //if there's a message in the queue, we make sure the checkpoint event is on it + await marqs?.replaceMessage( + attempt.taskRun.id, + { + checkpointEventId: checkpointEvent.id, + }, + undefined, + true + ); + await ResumeBatchRunService.enqueue(batchRun.id, this._prisma); return {