Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/thick-poets-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Specify a region override when triggering a run
6 changes: 3 additions & 3 deletions apps/webapp/app/presenters/v3/RegionsPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class RegionsPresenter extends BasePresenter {
id: true,
organizationId: true,
defaultWorkerGroupId: true,
allowedMasterQueues: true,
allowedWorkerQueues: true,
},
where: {
slug: projectSlug,
Expand Down Expand Up @@ -70,9 +70,9 @@ export class RegionsPresenter extends BasePresenter {
where: isAdmin
? undefined
: // Hide hidden unless they're allowed to use them
project.allowedMasterQueues.length > 0
project.allowedWorkerQueues.length > 0
? {
masterQueue: { in: project.allowedMasterQueues },
masterQueue: { in: project.allowedWorkerQueues },
}
: {
hidden: false,
Expand Down
23 changes: 20 additions & 3 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {
MachinePreset,
type MachinePreset,
prettyPrintPacket,
SemanticInternalAttributes,
TaskRunContext,
type TaskRunContext,
TaskRunError,
TriggerTraceContext,
V3TaskRunContext,
type V3TaskRunContext,
} from "@trigger.dev/core/v3";
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
Expand Down Expand Up @@ -176,6 +176,22 @@ export class SpanPresenter extends BasePresenter {

const externalTraceId = this.#getExternalTraceId(run.traceContext);

let region: { name: string; location: string | null } | null = null;

if (run.runtimeEnvironment.type !== "DEVELOPMENT" && run.engine !== "V1") {
const workerGroup = await this._replica.workerInstanceGroup.findFirst({
select: {
name: true,
location: true,
},
where: {
masterQueue: run.workerQueue,
},
});

region = workerGroup ?? null;
}

return {
id: run.id,
friendlyId: run.friendlyId,
Expand Down Expand Up @@ -233,6 +249,7 @@ export class SpanPresenter extends BasePresenter {
maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds),
batch: run.batch ? { friendlyId: run.batch.friendlyId } : undefined,
engine: run.engine,
region,
workerQueue: run.workerQueue,
spanId: run.spanId,
isCached: !!span.originalRun,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import {
} from "~/utils/pathBuilder";
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
import { CompleteWaitpointForm } from "../resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.waitpoints.$waitpointFriendlyId.complete/route";
import { FlagIcon } from "~/assets/icons/RegionIcons";

export const loader = async ({ request, params }: LoaderFunctionArgs) => {
const { projectParam, organizationSlug, envParam, runParam, spanParam } =
Expand Down Expand Up @@ -701,6 +702,19 @@ function RunBody({
<MachineLabelCombo preset={run.machinePreset} />
</Property.Value>
</Property.Item>
{run.region && (
<Property.Item>
<Property.Label>Region</Property.Label>
<Property.Value>
<span className="flex items-center gap-1">
{run.region.location ? (
<FlagIcon region={run.region.location} className="size-5" />
) : null}
{run.region.name}
</span>
</Property.Value>
</Property.Item>
)}
<Property.Item>
<Property.Label>Run invocation cost</Property.Label>
<Property.Value>
Expand Down
19 changes: 15 additions & 4 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.serv
import type { RunEngine } from "~/v3/runEngine.server";
import { env } from "~/env.server";
import { EngineServiceValidationError } from "./errors";
import { tryCatch } from "@trigger.dev/core/v3";

export class DefaultQueueManager implements QueueManager {
constructor(
Expand Down Expand Up @@ -196,7 +197,10 @@ export class DefaultQueueManager implements QueueManager {
};
}

async getWorkerQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
async getWorkerQueue(
environment: AuthenticatedEnvironment,
regionOverride?: string
): Promise<string | undefined> {
if (environment.type === "DEVELOPMENT") {
return environment.id;
}
Expand All @@ -206,9 +210,16 @@ export class DefaultQueueManager implements QueueManager {
engine: this.engine,
});

const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
projectId: environment.projectId,
});
const [error, workerGroup] = await tryCatch(
workerGroupService.getDefaultWorkerGroupForProject({
projectId: environment.projectId,
regionOverride,
})
);

if (error) {
throw new EngineServiceValidationError(error.message);
}

if (!workerGroup) {
throw new EngineServiceValidationError("No worker group found");
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export class RunEngineTriggerTaskService {

const depth = parentRun ? parentRun.depth + 1 : 0;

const workerQueue = await this.queueConcern.getWorkerQueue(environment);
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);

try {
return await this.traceEventConcern.traceRun(triggerRequest, async (event) => {
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/runEngine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ export interface QueueManager {
): Promise<QueueProperties>;
getQueueName(request: TriggerTaskRequest): Promise<string>;
validateQueueLimits(env: AuthenticatedEnvironment): Promise<QueueValidationResult>;
getWorkerQueue(env: AuthenticatedEnvironment): Promise<string | undefined>;
getWorkerQueue(
env: AuthenticatedEnvironment,
regionOverride?: string
): Promise<string | undefined>;
}

export interface PayloadProcessor {
Expand Down
17 changes: 12 additions & 5 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ export type RunsReplicationServiceOptions = {
insertMaxDelayMs?: number;
};

type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
type PostgresTaskRun = TaskRun & { masterQueue: string };

type TaskRunInsert = {
_version: bigint;
run: PostgresTaskRun;
event: "insert" | "update" | "delete";
};

export type RunsReplicationServiceEvents = {
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
Expand Down Expand Up @@ -243,7 +249,7 @@ export class RunsReplicationService {
}
}

async backfill(runs: TaskRun[]) {
async backfill(runs: PostgresTaskRun[]) {
// divide into batches of 50 to get data from Postgres
const flushId = nanoid();
// Use current timestamp as LSN (high enough to be above existing data)
Expand Down Expand Up @@ -352,7 +358,7 @@ export class RunsReplicationService {
const replicationLagMs = Date.now() - Number(message.commitTime / 1000n);
this._currentTransaction.commitEndLsn = message.commitEndLsn;
this._currentTransaction.replicationLagMs = replicationLagMs;
const transaction = this._currentTransaction as Transaction<TaskRun>;
const transaction = this._currentTransaction as Transaction<PostgresTaskRun>;
this._currentTransaction = null;

if (transaction.commitEndLsn) {
Expand All @@ -370,7 +376,7 @@ export class RunsReplicationService {
}
}

#handleTransaction(transaction: Transaction<TaskRun>) {
#handleTransaction(transaction: Transaction<PostgresTaskRun>) {
if (this._isShutDownComplete) return;

if (this._isShuttingDown) {
Expand Down Expand Up @@ -764,7 +770,7 @@ export class RunsReplicationService {
}

async #prepareTaskRunInsert(
run: TaskRun,
run: PostgresTaskRun,
organizationId: string,
environmentType: string,
event: "insert" | "update" | "delete",
Expand Down Expand Up @@ -814,6 +820,7 @@ export class RunsReplicationService {
output,
concurrency_key: run.concurrencyKey ?? "",
bulk_action_group_ids: run.bulkActionGroupIds ?? [],
worker_queue: run.masterQueue,
_version: _version.toString(),
_is_deleted: event === "delete" ? 1 : 0,
};
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/services/setDefaultRegion.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ export class SetDefaultRegionService extends BaseService {

// If their project is restricted, only allow them to set default regions that are allowed
if (!isAdmin) {
if (project.allowedMasterQueues.length > 0) {
if (!project.allowedMasterQueues.includes(workerGroup.masterQueue)) {
if (project.allowedWorkerQueues.length > 0) {
if (!project.allowedWorkerQueues.includes(workerGroup.masterQueue)) {
throw new ServiceValidationError("You're not allowed to set this region as default");
}
} else if (workerGroup.hidden) {
Expand Down
39 changes: 36 additions & 3 deletions apps/webapp/app/v3/services/worker/workerGroupService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,12 @@ export class WorkerGroupService extends WithRunEngine {

async getDefaultWorkerGroupForProject({
projectId,
regionOverride,
}: {
projectId: string;
regionOverride?: string;
}): Promise<WorkerInstanceGroup | undefined> {
const project = await this._prisma.project.findUnique({
const project = await this._prisma.project.findFirst({
where: {
id: projectId,
},
Expand All @@ -208,8 +210,39 @@ export class WorkerGroupService extends WithRunEngine {
});

if (!project) {
logger.error("[WorkerGroupService] Project not found", { projectId });
return;
throw new Error("Project not found.");
}

// If they've specified a region, we need to check they have access to it
if (regionOverride) {
const workerGroup = await this._prisma.workerInstanceGroup.findFirst({
where: {
masterQueue: regionOverride,
},
});

if (!workerGroup) {
throw new Error(`The region you specified doesn't exist ("${regionOverride}").`);
}

// If they're restricted, check they have access
if (project.allowedWorkerQueues.length > 0) {
if (project.allowedWorkerQueues.includes(workerGroup.masterQueue)) {
return workerGroup;
}

throw new Error(
`You don't have access to this region ("${regionOverride}"). You can use the following regions: ${project.allowedWorkerQueues.join(
", "
)}.`
);
}

if (workerGroup.hidden) {
throw new Error(`The region you specified isn't available to you ("${regionOverride}").`);
}

return workerGroup;
}

if (project.defaultWorkerGroup) {
Expand Down
12 changes: 12 additions & 0 deletions docs/triggering.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,18 @@ View our [metadata doc](/runs/metadata) for more information.

View our [maxDuration doc](/runs/max-duration) for more information.

### `region`

You can override the default region when you trigger a run:

```ts
await yourTask.trigger(payload, { region: "eu-central-1" });
```

If you don't specify a region it will use the default for your project. Go to the "Regions" page in the dashboard to see available regions or switch your default.

The region is where your runs are executed, it does not change where the run payload, output, tags, logs, or are any other data is stored.

## Large Payloads

We recommend keeping your task payloads as small as possible. We currently have a hard limit on task payloads above 10MB.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
/*
Add worker_queue column.
*/
ALTER TABLE trigger_dev.task_runs_v2
ADD COLUMN worker_queue String DEFAULT '';

-- +goose Down
ALTER TABLE trigger_dev.task_runs_v2
DROP COLUMN worker_queue;
1 change: 1 addition & 0 deletions internal-packages/clickhouse/src/taskRuns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export const TaskRunV2 = z.object({
is_test: z.boolean().default(false),
concurrency_key: z.string().default(""),
bulk_action_group_ids: z.array(z.string()).default([]),
worker_queue: z.string().default(""),
_version: z.string(),
_is_deleted: z.number().int().default(0),
});
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ model Project {
defaultWorkerGroupId String?

/// The master queues they are allowed to use (impacts what they can set as default and trigger runs with)
allowedMasterQueues String[] @default([])
allowedWorkerQueues String[] @default([]) @map("allowedMasterQueues")

environments RuntimeEnvironment[]
backgroundWorkers BackgroundWorker[]
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/v3/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ export const TriggerTaskRequestBody = z.object({
ttl: z.string().or(z.number().nonnegative().int()).optional(),
priority: z.number().optional(),
bulkActionId: z.string().optional(),
region: z.string().optional(),
})
.optional(),
});
Expand Down Expand Up @@ -181,6 +182,7 @@ export const BatchTriggerTaskItem = z.object({
test: z.boolean().optional(),
ttl: z.string().or(z.number().nonnegative().int()).optional(),
priority: z.number().optional(),
region: z.string().optional(),
})
.optional(),
});
Expand Down
15 changes: 15 additions & 0 deletions packages/core/src/v3/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,21 @@ export type TriggerOptions = {
* to the same version as the parent task that is triggering the child tasks.
*/
version?: string;

/**
* Specify the region to run the task in. This overrides the default region set for your project in the dashboard.
*
* Check the Regions page in the dashboard for regions that are available to you.
*
* In DEV this won't do anything, so it's fine to set it in your code.
*
* @example
*
* ```ts
* await myTask.trigger({ foo: "bar" }, { region: "us-east-1" });
* ```
*/
region?: string;
};

export type TriggerAndWaitOptions = Omit<TriggerOptions, "version">;
Expand Down
Loading
Loading