diff --git a/apps/webapp/app/presenters/v3/TestPresenter.server.ts b/apps/webapp/app/presenters/v3/TestPresenter.server.ts index 23c181a8cb..af5bb93a7e 100644 --- a/apps/webapp/app/presenters/v3/TestPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/TestPresenter.server.ts @@ -56,7 +56,7 @@ export class TestPresenter extends BasePresenter { JOIN ${sqlDatabaseSchema}."BackgroundWorkerTask" bwt ON bwt."workerId" = latest_workers.id ORDER BY slug ASC;`; } else { - const currentDeployment = await findCurrentWorkerDeployment(envId); + const currentDeployment = await findCurrentWorkerDeployment({ environmentId: envId }); return currentDeployment?.worker?.tasks ?? []; } } diff --git a/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts b/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts index f58a613a75..2519231afe 100644 --- a/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts @@ -88,7 +88,7 @@ export class TestTaskPresenter { }: TestTaskOptions): Promise { let task: BackgroundWorkerTaskSlim | null = null; if (environment.type !== "DEVELOPMENT") { - const deployment = await findCurrentWorkerDeployment(environment.id); + const deployment = await findCurrentWorkerDeployment({ environmentId: environment.id }); if (deployment) { task = deployment.worker?.tasks.find((t) => t.slug === taskIdentifier) ?? null; } diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index c13e5062a0..5926002c91 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -612,7 +612,10 @@ export class SharedQueueConsumer { ? await getWorkerDeploymentFromWorkerTask(existingTaskRun.lockedById) : existingTaskRun.lockedToVersionId ? await getWorkerDeploymentFromWorker(existingTaskRun.lockedToVersionId) - : await findCurrentWorkerDeployment(existingTaskRun.runtimeEnvironmentId); + : await findCurrentWorkerDeployment({ + environmentId: existingTaskRun.runtimeEnvironmentId, + type: "V1", + }); }); const worker = deployment?.worker; diff --git a/apps/webapp/app/v3/models/workerDeployment.server.ts b/apps/webapp/app/v3/models/workerDeployment.server.ts index b4669a9cc7..12629b274d 100644 --- a/apps/webapp/app/v3/models/workerDeployment.server.ts +++ b/apps/webapp/app/v3/models/workerDeployment.server.ts @@ -1,5 +1,5 @@ import type { Prettify } from "@trigger.dev/core"; -import { BackgroundWorker, RunEngineVersion, WorkerDeployment } from "@trigger.dev/database"; +import { BackgroundWorker, RunEngineVersion, WorkerDeploymentType } from "@trigger.dev/database"; import { CURRENT_DEPLOYMENT_LABEL, CURRENT_UNMANAGED_DEPLOYMENT_LABEL, @@ -56,10 +56,23 @@ type WorkerDeploymentWithWorkerTasks = Prisma.WorkerDeploymentGetPayload<{ }; }>; -export async function findCurrentWorkerDeployment( - environmentId: string, - label = CURRENT_DEPLOYMENT_LABEL -): Promise { +/** + * Finds the current worker deployment for a given environment. + * + * @param environmentId - The ID of the environment to find the current worker deployment for. + * @param label - The label of the current worker deployment to find. + * @param type - The type of worker deployment to find. If the current deployment is NOT of this type, + * we will return the latest deployment of the given type. + */ +export async function findCurrentWorkerDeployment({ + environmentId, + label = CURRENT_DEPLOYMENT_LABEL, + type, +}: { + environmentId: string; + label?: string; + type?: WorkerDeploymentType; +}): Promise { const promotion = await prisma.workerDeploymentPromotion.findFirst({ where: { environmentId, @@ -93,16 +106,19 @@ export async function findCurrentWorkerDeployment( return undefined; } - if (promotion.deployment.type === "V1") { - // This is a run engine v1 deployment, so return it + if (!type) { return promotion.deployment; } - // We need to get the latest run engine v1 deployment - const latestV1Deployment = await prisma.workerDeployment.findFirst({ + if (promotion.deployment.type === type) { + return promotion.deployment; + } + + // We need to get the latest deployment of the given type + const latestDeployment = await prisma.workerDeployment.findFirst({ where: { environmentId, - type: "V1", + type, }, orderBy: { id: "desc", @@ -127,11 +143,11 @@ export async function findCurrentWorkerDeployment( }, }); - if (!latestV1Deployment) { + if (!latestDeployment) { return undefined; } - return latestV1Deployment; + return latestDeployment; } export async function getCurrentWorkerDeploymentEngineVersion( @@ -162,7 +178,11 @@ export async function getCurrentWorkerDeploymentEngineVersion( export async function findCurrentUnmanagedWorkerDeployment( environmentId: string ): Promise { - return await findCurrentWorkerDeployment(environmentId, CURRENT_UNMANAGED_DEPLOYMENT_LABEL); + return await findCurrentWorkerDeployment({ + environmentId, + label: CURRENT_UNMANAGED_DEPLOYMENT_LABEL, + type: "UNMANAGED", + }); } export async function findCurrentWorkerFromEnvironment( @@ -183,7 +203,10 @@ export async function findCurrentWorkerFromEnvironment( }); return latestDevWorker; } else { - const deployment = await findCurrentWorkerDeployment(environment.id, label); + const deployment = await findCurrentWorkerDeployment({ + environmentId: environment.id, + label, + }); return deployment?.worker ?? null; } } diff --git a/apps/webapp/app/v3/services/triggerScheduledTask.server.ts b/apps/webapp/app/v3/services/triggerScheduledTask.server.ts index b2a56f78cc..f2d40725a7 100644 --- a/apps/webapp/app/v3/services/triggerScheduledTask.server.ts +++ b/apps/webapp/app/v3/services/triggerScheduledTask.server.ts @@ -73,7 +73,9 @@ export class TriggerScheduledTaskService extends BaseService { if (instance.environment.type !== "DEVELOPMENT") { // Get the current backgroundWorker for this environment - const currentWorkerDeployment = await findCurrentWorkerDeployment(instance.environment.id); + const currentWorkerDeployment = await findCurrentWorkerDeployment({ + environmentId: instance.environment.id, + }); if (!currentWorkerDeployment) { logger.debug("No current worker deployment found, skipping task trigger", { diff --git a/internal-packages/run-engine/src/engine/db/worker.ts b/internal-packages/run-engine/src/engine/db/worker.ts index 701987581d..8bc2817d63 100644 --- a/internal-packages/run-engine/src/engine/db/worker.ts +++ b/internal-packages/run-engine/src/engine/db/worker.ts @@ -100,7 +100,7 @@ export async function getRunWithBackgroundWorkerTasks( } else { workerWithTasks = workerId ? await getWorkerDeploymentFromWorker(prisma, workerId) - : await getWorkerFromCurrentlyPromotedDeployment(prisma, run.runtimeEnvironmentId); + : await getManagedWorkerFromCurrentlyPromotedDeployment(prisma, run.runtimeEnvironmentId); } if (!workerWithTasks) { @@ -260,7 +260,7 @@ export async function getWorkerById( return { worker, tasks: worker.tasks, queues: worker.queues, deployment: worker.deployment }; } -export async function getWorkerFromCurrentlyPromotedDeployment( +export async function getManagedWorkerFromCurrentlyPromotedDeployment( prisma: PrismaClientOrTransaction, environmentId: string ): Promise {