Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/thick-poets-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Specify a region override when triggering a run
6 changes: 3 additions & 3 deletions apps/webapp/app/presenters/v3/RegionsPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class RegionsPresenter extends BasePresenter {
id: true,
organizationId: true,
defaultWorkerGroupId: true,
allowedMasterQueues: true,
allowedWorkerQueues: true,
},
where: {
slug: projectSlug,
Expand Down Expand Up @@ -70,9 +70,9 @@ export class RegionsPresenter extends BasePresenter {
where: isAdmin
? undefined
: // Hide hidden unless they're allowed to use them
project.allowedMasterQueues.length > 0
project.allowedWorkerQueues.length > 0
? {
masterQueue: { in: project.allowedMasterQueues },
masterQueue: { in: project.allowedWorkerQueues },
}
: {
hidden: false,
Expand Down
23 changes: 20 additions & 3 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {
MachinePreset,
type MachinePreset,
prettyPrintPacket,
SemanticInternalAttributes,
TaskRunContext,
type TaskRunContext,
TaskRunError,
TriggerTraceContext,
V3TaskRunContext,
type V3TaskRunContext,
} from "@trigger.dev/core/v3";
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
Expand Down Expand Up @@ -176,6 +176,22 @@ export class SpanPresenter extends BasePresenter {

const externalTraceId = this.#getExternalTraceId(run.traceContext);

let region: { name: string; location: string | null } | null = null;

if (run.runtimeEnvironment.type !== "DEVELOPMENT" && run.engine !== "V1") {
const workerGroup = await this._replica.workerInstanceGroup.findFirst({
select: {
name: true,
location: true,
},
where: {
masterQueue: run.workerQueue,
},
});

region = workerGroup ?? null;
}

return {
id: run.id,
friendlyId: run.friendlyId,
Expand Down Expand Up @@ -233,6 +249,7 @@ export class SpanPresenter extends BasePresenter {
maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds),
batch: run.batch ? { friendlyId: run.batch.friendlyId } : undefined,
engine: run.engine,
region,
workerQueue: run.workerQueue,
spanId: run.spanId,
isCached: !!span.originalRun,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import {
} from "~/utils/pathBuilder";
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
import { CompleteWaitpointForm } from "../resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.waitpoints.$waitpointFriendlyId.complete/route";
import { FlagIcon } from "~/assets/icons/RegionIcons";

export const loader = async ({ request, params }: LoaderFunctionArgs) => {
const { projectParam, organizationSlug, envParam, runParam, spanParam } =
Expand Down Expand Up @@ -701,6 +702,19 @@ function RunBody({
<MachineLabelCombo preset={run.machinePreset} />
</Property.Value>
</Property.Item>
{run.region && (
<Property.Item>
<Property.Label>Region</Property.Label>
<Property.Value>
<span className="flex items-center gap-1">
{run.region.location ? (
<FlagIcon region={run.region.location} className="size-5" />
) : null}
{run.region.name}
</span>
</Property.Value>
</Property.Item>
)}
<Property.Item>
<Property.Label>Run invocation cost</Property.Label>
<Property.Value>
Expand Down
19 changes: 15 additions & 4 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.serv
import type { RunEngine } from "~/v3/runEngine.server";
import { env } from "~/env.server";
import { EngineServiceValidationError } from "./errors";
import { tryCatch } from "@trigger.dev/core/v3";

export class DefaultQueueManager implements QueueManager {
constructor(
Expand Down Expand Up @@ -196,7 +197,10 @@ export class DefaultQueueManager implements QueueManager {
};
}

async getWorkerQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
async getWorkerQueue(
environment: AuthenticatedEnvironment,
regionOverride?: string
): Promise<string | undefined> {
if (environment.type === "DEVELOPMENT") {
return environment.id;
}
Expand All @@ -206,9 +210,16 @@ export class DefaultQueueManager implements QueueManager {
engine: this.engine,
});

const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
projectId: environment.projectId,
});
const [error, workerGroup] = await tryCatch(
workerGroupService.getDefaultWorkerGroupForProject({
projectId: environment.projectId,
regionOverride,
})
);

if (error) {
throw new EngineServiceValidationError(error.message);
}

if (!workerGroup) {
throw new EngineServiceValidationError("No worker group found");
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export class RunEngineTriggerTaskService {

const depth = parentRun ? parentRun.depth + 1 : 0;

const workerQueue = await this.queueConcern.getWorkerQueue(environment);
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);

try {
return await this.traceEventConcern.traceRun(triggerRequest, async (event) => {
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/runEngine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ export interface QueueManager {
): Promise<QueueProperties>;
getQueueName(request: TriggerTaskRequest): Promise<string>;
validateQueueLimits(env: AuthenticatedEnvironment): Promise<QueueValidationResult>;
getWorkerQueue(env: AuthenticatedEnvironment): Promise<string | undefined>;
getWorkerQueue(
env: AuthenticatedEnvironment,
regionOverride?: string
): Promise<string | undefined>;
}

export interface PayloadProcessor {
Expand Down
17 changes: 12 additions & 5 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ export type RunsReplicationServiceOptions = {
insertMaxDelayMs?: number;
};

type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
type PostgresTaskRun = TaskRun & { masterQueue: string };

type TaskRunInsert = {
_version: bigint;
run: PostgresTaskRun;
event: "insert" | "update" | "delete";
};

export type RunsReplicationServiceEvents = {
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
Expand Down Expand Up @@ -243,7 +249,7 @@ export class RunsReplicationService {
}
}

async backfill(runs: TaskRun[]) {
async backfill(runs: PostgresTaskRun[]) {
// divide into batches of 50 to get data from Postgres
const flushId = nanoid();
// Use current timestamp as LSN (high enough to be above existing data)
Expand Down Expand Up @@ -352,7 +358,7 @@ export class RunsReplicationService {
const replicationLagMs = Date.now() - Number(message.commitTime / 1000n);
this._currentTransaction.commitEndLsn = message.commitEndLsn;
this._currentTransaction.replicationLagMs = replicationLagMs;
const transaction = this._currentTransaction as Transaction<TaskRun>;
const transaction = this._currentTransaction as Transaction<PostgresTaskRun>;
this._currentTransaction = null;

if (transaction.commitEndLsn) {
Expand All @@ -370,7 +376,7 @@ export class RunsReplicationService {
}
}

#handleTransaction(transaction: Transaction<TaskRun>) {
#handleTransaction(transaction: Transaction<PostgresTaskRun>) {
if (this._isShutDownComplete) return;

if (this._isShuttingDown) {
Expand Down Expand Up @@ -764,7 +770,7 @@ export class RunsReplicationService {
}

async #prepareTaskRunInsert(
run: TaskRun,
run: PostgresTaskRun,
organizationId: string,
environmentType: string,
event: "insert" | "update" | "delete",
Expand Down Expand Up @@ -814,6 +820,7 @@ export class RunsReplicationService {
output,
concurrency_key: run.concurrencyKey ?? "",
bulk_action_group_ids: run.bulkActionGroupIds ?? [],
worker_queue: run.masterQueue,
_version: _version.toString(),
_is_deleted: event === "delete" ? 1 : 0,
};
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/services/setDefaultRegion.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ export class SetDefaultRegionService extends BaseService {

// If their project is restricted, only allow them to set default regions that are allowed
if (!isAdmin) {
if (project.allowedMasterQueues.length > 0) {
if (!project.allowedMasterQueues.includes(workerGroup.masterQueue)) {
if (project.allowedWorkerQueues.length > 0) {
if (!project.allowedWorkerQueues.includes(workerGroup.masterQueue)) {
throw new ServiceValidationError("You're not allowed to set this region as default");
}
} else if (workerGroup.hidden) {
Expand Down
39 changes: 36 additions & 3 deletions apps/webapp/app/v3/services/worker/workerGroupService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,12 @@ export class WorkerGroupService extends WithRunEngine {

async getDefaultWorkerGroupForProject({
projectId,
regionOverride,
}: {
projectId: string;
regionOverride?: string;
}): Promise<WorkerInstanceGroup | undefined> {
const project = await this._prisma.project.findUnique({
const project = await this._prisma.project.findFirst({
where: {
id: projectId,
},
Expand All @@ -208,8 +210,39 @@ export class WorkerGroupService extends WithRunEngine {
});

if (!project) {
logger.error("[WorkerGroupService] Project not found", { projectId });
return;
throw new Error("Project not found.");
}

// If they've specified a region, we need to check they have access to it
if (regionOverride) {
const workerGroup = await this._prisma.workerInstanceGroup.findFirst({
where: {
masterQueue: regionOverride,
},
});

if (!workerGroup) {
throw new Error(`The region you specified doesn't exist ("${regionOverride}").`);
}

// If they're restricted, check they have access
if (project.allowedWorkerQueues.length > 0) {
if (project.allowedWorkerQueues.includes(workerGroup.masterQueue)) {
return workerGroup;
}

throw new Error(
`You don't have access to this region ("${regionOverride}"). You can use the following regions: ${project.allowedWorkerQueues.join(
", "
)}.`
);
}

if (workerGroup.hidden) {
throw new Error(`The region you specified isn't available to you ("${regionOverride}").`);
}

return workerGroup;
}

if (project.defaultWorkerGroup) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
/*
Add worker_queue column.
*/
ALTER TABLE trigger_dev.task_runs_v2
ADD COLUMN worker_queue String DEFAULT '';

-- +goose Down
ALTER TABLE trigger_dev.task_runs_v2
DROP COLUMN worker_queue;
1 change: 1 addition & 0 deletions internal-packages/clickhouse/src/taskRuns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export const TaskRunV2 = z.object({
is_test: z.boolean().default(false),
concurrency_key: z.string().default(""),
bulk_action_group_ids: z.array(z.string()).default([]),
worker_queue: z.string().default(""),
_version: z.string(),
_is_deleted: z.number().int().default(0),
});
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ model Project {
defaultWorkerGroupId String?

/// The master queues they are allowed to use (impacts what they can set as default and trigger runs with)
allowedMasterQueues String[] @default([])
allowedWorkerQueues String[] @default([]) @map("allowedMasterQueues")

environments RuntimeEnvironment[]
backgroundWorkers BackgroundWorker[]
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/v3/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ export const TriggerTaskRequestBody = z.object({
ttl: z.string().or(z.number().nonnegative().int()).optional(),
priority: z.number().optional(),
bulkActionId: z.string().optional(),
region: z.string().optional(),
})
.optional(),
});
Expand Down Expand Up @@ -181,6 +182,7 @@ export const BatchTriggerTaskItem = z.object({
test: z.boolean().optional(),
ttl: z.string().or(z.number().nonnegative().int()).optional(),
priority: z.number().optional(),
region: z.string().optional(),
})
.optional(),
});
Expand Down
15 changes: 15 additions & 0 deletions packages/core/src/v3/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,21 @@ export type TriggerOptions = {
* to the same version as the parent task that is triggering the child tasks.
*/
version?: string;

/**
* Specify the region to run the task in. This overrides the default region set for your project in the dashboard.
*
* Check the Regions page in the dashboard for regions that are available to you.
*
* In DEV this won't do anything, so it's fine to set it in your code.
*
* @example
*
* ```ts
* await myTask.trigger({ foo: "bar" }, { region: "us-east-1" });
* ```
*/
region?: string;
};

export type TriggerAndWaitOptions = Omit<TriggerOptions, "version">;
Expand Down
Loading
Loading