Skip to content

Commit af14621

Browse files
authored
Specify a region when triggering (#2366)
* Map new allowedMasterQueues → allowedWorkerQueues * ClickHouse worker_queue on task runs * Added the Region to the run inspector * Pass a region in when triggering * Added a changeset * Added triggering regions docs * Added region to the ctx * Fix for backfiller masterQueue/workerQueue
1 parent 9787120 commit af14621

File tree

22 files changed

+191
-25
lines changed

22 files changed

+191
-25
lines changed

.changeset/thick-poets-yawn.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Specify a region override when triggering a run

apps/webapp/app/presenters/v3/RegionsPresenter.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export class RegionsPresenter extends BasePresenter {
3030
id: true,
3131
organizationId: true,
3232
defaultWorkerGroupId: true,
33-
allowedMasterQueues: true,
33+
allowedWorkerQueues: true,
3434
},
3535
where: {
3636
slug: projectSlug,
@@ -70,9 +70,9 @@ export class RegionsPresenter extends BasePresenter {
7070
where: isAdmin
7171
? undefined
7272
: // Hide hidden unless they're allowed to use them
73-
project.allowedMasterQueues.length > 0
73+
project.allowedWorkerQueues.length > 0
7474
? {
75-
masterQueue: { in: project.allowedMasterQueues },
75+
masterQueue: { in: project.allowedWorkerQueues },
7676
}
7777
: {
7878
hidden: false,

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import {
2-
MachinePreset,
2+
type MachinePreset,
33
prettyPrintPacket,
44
SemanticInternalAttributes,
5-
TaskRunContext,
5+
type TaskRunContext,
66
TaskRunError,
77
TriggerTraceContext,
8-
V3TaskRunContext,
8+
type V3TaskRunContext,
99
} from "@trigger.dev/core/v3";
1010
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
1111
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
@@ -176,6 +176,22 @@ export class SpanPresenter extends BasePresenter {
176176

177177
const externalTraceId = this.#getExternalTraceId(run.traceContext);
178178

179+
let region: { name: string; location: string | null } | null = null;
180+
181+
if (run.runtimeEnvironment.type !== "DEVELOPMENT" && run.engine !== "V1") {
182+
const workerGroup = await this._replica.workerInstanceGroup.findFirst({
183+
select: {
184+
name: true,
185+
location: true,
186+
},
187+
where: {
188+
masterQueue: run.workerQueue,
189+
},
190+
});
191+
192+
region = workerGroup ?? null;
193+
}
194+
179195
return {
180196
id: run.id,
181197
friendlyId: run.friendlyId,
@@ -233,6 +249,7 @@ export class SpanPresenter extends BasePresenter {
233249
maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds),
234250
batch: run.batch ? { friendlyId: run.batch.friendlyId } : undefined,
235251
engine: run.engine,
252+
region,
236253
workerQueue: run.workerQueue,
237254
spanId: run.spanId,
238255
isCached: !!span.originalRun,

apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ export async function action({ request }: ActionFunctionArgs) {
5959
throw new Error("Runs replication instance not found");
6060
}
6161

62-
await runsReplicationInstance.backfill(runs);
62+
await runsReplicationInstance.backfill(
63+
runs.map((run) => ({
64+
...run,
65+
masterQueue: run.workerQueue,
66+
}))
67+
);
6368

6469
logger.info("Backfilled runs", { runs });
6570

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ import {
7676
} from "~/utils/pathBuilder";
7777
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
7878
import { CompleteWaitpointForm } from "../resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.waitpoints.$waitpointFriendlyId.complete/route";
79+
import { FlagIcon } from "~/assets/icons/RegionIcons";
7980

8081
export const loader = async ({ request, params }: LoaderFunctionArgs) => {
8182
const { projectParam, organizationSlug, envParam, runParam, spanParam } =
@@ -701,6 +702,19 @@ function RunBody({
701702
<MachineLabelCombo preset={run.machinePreset} />
702703
</Property.Value>
703704
</Property.Item>
705+
{run.region && (
706+
<Property.Item>
707+
<Property.Label>Region</Property.Label>
708+
<Property.Value>
709+
<span className="flex items-center gap-1">
710+
{run.region.location ? (
711+
<FlagIcon region={run.region.location} className="size-5" />
712+
) : null}
713+
{run.region.name}
714+
</span>
715+
</Property.Value>
716+
</Property.Item>
717+
)}
704718
<Property.Item>
705719
<Property.Label>Run invocation cost</Property.Label>
706720
<Property.Value>

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.serv
1414
import type { RunEngine } from "~/v3/runEngine.server";
1515
import { env } from "~/env.server";
1616
import { EngineServiceValidationError } from "./errors";
17+
import { tryCatch } from "@trigger.dev/core/v3";
1718

1819
export class DefaultQueueManager implements QueueManager {
1920
constructor(
@@ -196,7 +197,10 @@ export class DefaultQueueManager implements QueueManager {
196197
};
197198
}
198199

199-
async getWorkerQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
200+
async getWorkerQueue(
201+
environment: AuthenticatedEnvironment,
202+
regionOverride?: string
203+
): Promise<string | undefined> {
200204
if (environment.type === "DEVELOPMENT") {
201205
return environment.id;
202206
}
@@ -206,9 +210,16 @@ export class DefaultQueueManager implements QueueManager {
206210
engine: this.engine,
207211
});
208212

209-
const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
210-
projectId: environment.projectId,
211-
});
213+
const [error, workerGroup] = await tryCatch(
214+
workerGroupService.getDefaultWorkerGroupForProject({
215+
projectId: environment.projectId,
216+
regionOverride,
217+
})
218+
);
219+
220+
if (error) {
221+
throw new EngineServiceValidationError(error.message);
222+
}
212223

213224
if (!workerGroup) {
214225
throw new EngineServiceValidationError("No worker group found");

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ export class RunEngineTriggerTaskService {
234234

235235
const depth = parentRun ? parentRun.depth + 1 : 0;
236236

237-
const workerQueue = await this.queueConcern.getWorkerQueue(environment);
237+
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
238238

239239
try {
240240
return await this.traceEventConcern.traceRun(triggerRequest, async (event) => {

apps/webapp/app/runEngine/types.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ export interface QueueManager {
6767
): Promise<QueueProperties>;
6868
getQueueName(request: TriggerTaskRequest): Promise<string>;
6969
validateQueueLimits(env: AuthenticatedEnvironment): Promise<QueueValidationResult>;
70-
getWorkerQueue(env: AuthenticatedEnvironment): Promise<string | undefined>;
70+
getWorkerQueue(
71+
env: AuthenticatedEnvironment,
72+
regionOverride?: string
73+
): Promise<string | undefined>;
7174
}
7275

7376
export interface PayloadProcessor {

apps/webapp/app/services/runsBackfiller.server.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,12 @@ export class RunsBackfillerService {
7373
lastCreatedAt: runs[runs.length - 1].createdAt,
7474
});
7575

76-
await this.runsReplicationInstance.backfill(runs);
76+
await this.runsReplicationInstance.backfill(
77+
runs.map((run) => ({
78+
...run,
79+
masterQueue: run.workerQueue,
80+
}))
81+
);
7782

7883
const lastRun = runs[runs.length - 1];
7984

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,13 @@ export type RunsReplicationServiceOptions = {
5959
insertMaxDelayMs?: number;
6060
};
6161

62-
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
62+
type PostgresTaskRun = TaskRun & { masterQueue: string };
63+
64+
type TaskRunInsert = {
65+
_version: bigint;
66+
run: PostgresTaskRun;
67+
event: "insert" | "update" | "delete";
68+
};
6369

6470
export type RunsReplicationServiceEvents = {
6571
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
@@ -243,7 +249,7 @@ export class RunsReplicationService {
243249
}
244250
}
245251

246-
async backfill(runs: TaskRun[]) {
252+
async backfill(runs: PostgresTaskRun[]) {
247253
// divide into batches of 50 to get data from Postgres
248254
const flushId = nanoid();
249255
// Use current timestamp as LSN (high enough to be above existing data)
@@ -352,7 +358,7 @@ export class RunsReplicationService {
352358
const replicationLagMs = Date.now() - Number(message.commitTime / 1000n);
353359
this._currentTransaction.commitEndLsn = message.commitEndLsn;
354360
this._currentTransaction.replicationLagMs = replicationLagMs;
355-
const transaction = this._currentTransaction as Transaction<TaskRun>;
361+
const transaction = this._currentTransaction as Transaction<PostgresTaskRun>;
356362
this._currentTransaction = null;
357363

358364
if (transaction.commitEndLsn) {
@@ -370,7 +376,7 @@ export class RunsReplicationService {
370376
}
371377
}
372378

373-
#handleTransaction(transaction: Transaction<TaskRun>) {
379+
#handleTransaction(transaction: Transaction<PostgresTaskRun>) {
374380
if (this._isShutDownComplete) return;
375381

376382
if (this._isShuttingDown) {
@@ -764,7 +770,7 @@ export class RunsReplicationService {
764770
}
765771

766772
async #prepareTaskRunInsert(
767-
run: TaskRun,
773+
run: PostgresTaskRun,
768774
organizationId: string,
769775
environmentType: string,
770776
event: "insert" | "update" | "delete",
@@ -814,6 +820,7 @@ export class RunsReplicationService {
814820
output,
815821
concurrency_key: run.concurrencyKey ?? "",
816822
bulk_action_group_ids: run.bulkActionGroupIds ?? [],
823+
worker_queue: run.masterQueue,
817824
_version: _version.toString(),
818825
_is_deleted: event === "delete" ? 1 : 0,
819826
};

0 commit comments

Comments
 (0)