Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/TestPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class TestPresenter extends BasePresenter {
JOIN ${sqlDatabaseSchema}."BackgroundWorkerTask" bwt ON bwt."workerId" = latest_workers.id
ORDER BY slug ASC;`;
} else {
const currentDeployment = await findCurrentWorkerDeployment(envId);
const currentDeployment = await findCurrentWorkerDeployment({ environmentId: envId });
return currentDeployment?.worker?.tasks ?? [];
}
}
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export class TestTaskPresenter {
}: TestTaskOptions): Promise<TestTaskResult> {
let task: BackgroundWorkerTaskSlim | null = null;
if (environment.type !== "DEVELOPMENT") {
const deployment = await findCurrentWorkerDeployment(environment.id);
const deployment = await findCurrentWorkerDeployment({ environmentId: environment.id });
if (deployment) {
task = deployment.worker?.tasks.find((t) => t.slug === taskIdentifier) ?? null;
}
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,10 @@ export class SharedQueueConsumer {
? await getWorkerDeploymentFromWorkerTask(existingTaskRun.lockedById)
: existingTaskRun.lockedToVersionId
? await getWorkerDeploymentFromWorker(existingTaskRun.lockedToVersionId)
: await findCurrentWorkerDeployment(existingTaskRun.runtimeEnvironmentId);
: await findCurrentWorkerDeployment({
environmentId: existingTaskRun.runtimeEnvironmentId,
type: "V1",
});
});

const worker = deployment?.worker;
Expand Down
51 changes: 37 additions & 14 deletions apps/webapp/app/v3/models/workerDeployment.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Prettify } from "@trigger.dev/core";
import { BackgroundWorker, RunEngineVersion, WorkerDeployment } from "@trigger.dev/database";
import { BackgroundWorker, RunEngineVersion, WorkerDeploymentType } from "@trigger.dev/database";
import {
CURRENT_DEPLOYMENT_LABEL,
CURRENT_UNMANAGED_DEPLOYMENT_LABEL,
Expand Down Expand Up @@ -56,10 +56,23 @@ type WorkerDeploymentWithWorkerTasks = Prisma.WorkerDeploymentGetPayload<{
};
}>;

export async function findCurrentWorkerDeployment(
environmentId: string,
label = CURRENT_DEPLOYMENT_LABEL
): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
/**
* Finds the current worker deployment for a given environment.
*
* @param environmentId - The ID of the environment to find the current worker deployment for.
* @param label - The label of the current worker deployment to find.
* @param type - The type of worker deployment to find. If the current deployment is NOT of this type,
* we will return the latest deployment of the given type.
*/
export async function findCurrentWorkerDeployment({
environmentId,
label = CURRENT_DEPLOYMENT_LABEL,
type,
}: {
environmentId: string;
label?: string;
type?: WorkerDeploymentType;
}): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
const promotion = await prisma.workerDeploymentPromotion.findFirst({
where: {
environmentId,
Expand Down Expand Up @@ -93,16 +106,19 @@ export async function findCurrentWorkerDeployment(
return undefined;
}

if (promotion.deployment.type === "V1") {
// This is a run engine v1 deployment, so return it
if (!type) {
return promotion.deployment;
}

// We need to get the latest run engine v1 deployment
const latestV1Deployment = await prisma.workerDeployment.findFirst({
if (promotion.deployment.type === type) {
return promotion.deployment;
}

// We need to get the latest deployment of the given type
const latestDeployment = await prisma.workerDeployment.findFirst({
where: {
environmentId,
type: "V1",
type,
},
orderBy: {
id: "desc",
Expand All @@ -127,11 +143,11 @@ export async function findCurrentWorkerDeployment(
},
});

if (!latestV1Deployment) {
if (!latestDeployment) {
return undefined;
}

return latestV1Deployment;
return latestDeployment;
}

export async function getCurrentWorkerDeploymentEngineVersion(
Expand Down Expand Up @@ -162,7 +178,11 @@ export async function getCurrentWorkerDeploymentEngineVersion(
export async function findCurrentUnmanagedWorkerDeployment(
environmentId: string
): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
return await findCurrentWorkerDeployment(environmentId, CURRENT_UNMANAGED_DEPLOYMENT_LABEL);
return await findCurrentWorkerDeployment({
environmentId,
label: CURRENT_UNMANAGED_DEPLOYMENT_LABEL,
type: "UNMANAGED",
});
}

export async function findCurrentWorkerFromEnvironment(
Expand All @@ -183,7 +203,10 @@ export async function findCurrentWorkerFromEnvironment(
});
return latestDevWorker;
} else {
const deployment = await findCurrentWorkerDeployment(environment.id, label);
const deployment = await findCurrentWorkerDeployment({
environmentId: environment.id,
label,
});
return deployment?.worker ?? null;
}
}
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/triggerScheduledTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ export class TriggerScheduledTaskService extends BaseService {

if (instance.environment.type !== "DEVELOPMENT") {
// Get the current backgroundWorker for this environment
const currentWorkerDeployment = await findCurrentWorkerDeployment(instance.environment.id);
const currentWorkerDeployment = await findCurrentWorkerDeployment({
environmentId: instance.environment.id,
});

if (!currentWorkerDeployment) {
logger.debug("No current worker deployment found, skipping task trigger", {
Expand Down
4 changes: 2 additions & 2 deletions internal-packages/run-engine/src/engine/db/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export async function getRunWithBackgroundWorkerTasks(
} else {
workerWithTasks = workerId
? await getWorkerDeploymentFromWorker(prisma, workerId)
: await getWorkerFromCurrentlyPromotedDeployment(prisma, run.runtimeEnvironmentId);
: await getManagedWorkerFromCurrentlyPromotedDeployment(prisma, run.runtimeEnvironmentId);
}

if (!workerWithTasks) {
Expand Down Expand Up @@ -260,7 +260,7 @@ export async function getWorkerById(
return { worker, tasks: worker.tasks, queues: worker.queues, deployment: worker.deployment };
}

export async function getWorkerFromCurrentlyPromotedDeployment(
export async function getManagedWorkerFromCurrentlyPromotedDeployment(
prisma: PrismaClientOrTransaction,
environmentId: string
): Promise<WorkerDeploymentWithWorkerTasks | null> {
Expand Down