Skip to content

Commit e78bee2

Browse files
committed
Pass a region in when triggering
1 parent 7823a78 commit e78bee2

File tree

8 files changed

+90
-9
lines changed

8 files changed

+90
-9
lines changed

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.serv
1414
import type { RunEngine } from "~/v3/runEngine.server";
1515
import { env } from "~/env.server";
1616
import { EngineServiceValidationError } from "./errors";
17+
import { tryCatch } from "@trigger.dev/core/v3";
1718

1819
export class DefaultQueueManager implements QueueManager {
1920
constructor(
@@ -196,7 +197,10 @@ export class DefaultQueueManager implements QueueManager {
196197
};
197198
}
198199

199-
async getWorkerQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
200+
async getWorkerQueue(
201+
environment: AuthenticatedEnvironment,
202+
regionOverride?: string
203+
): Promise<string | undefined> {
200204
if (environment.type === "DEVELOPMENT") {
201205
return environment.id;
202206
}
@@ -206,9 +210,16 @@ export class DefaultQueueManager implements QueueManager {
206210
engine: this.engine,
207211
});
208212

209-
const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
210-
projectId: environment.projectId,
211-
});
213+
const [error, workerGroup] = await tryCatch(
214+
workerGroupService.getDefaultWorkerGroupForProject({
215+
projectId: environment.projectId,
216+
regionOverride,
217+
})
218+
);
219+
220+
if (error) {
221+
throw new EngineServiceValidationError(error.message);
222+
}
212223

213224
if (!workerGroup) {
214225
throw new EngineServiceValidationError("No worker group found");

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ export class RunEngineTriggerTaskService {
234234

235235
const depth = parentRun ? parentRun.depth + 1 : 0;
236236

237-
const workerQueue = await this.queueConcern.getWorkerQueue(environment);
237+
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
238238

239239
try {
240240
return await this.traceEventConcern.traceRun(triggerRequest, async (event) => {

apps/webapp/app/runEngine/types.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ export interface QueueManager {
6767
): Promise<QueueProperties>;
6868
getQueueName(request: TriggerTaskRequest): Promise<string>;
6969
validateQueueLimits(env: AuthenticatedEnvironment): Promise<QueueValidationResult>;
70-
getWorkerQueue(env: AuthenticatedEnvironment): Promise<string | undefined>;
70+
getWorkerQueue(
71+
env: AuthenticatedEnvironment,
72+
regionOverride?: string
73+
): Promise<string | undefined>;
7174
}
7275

7376
export interface PayloadProcessor {

apps/webapp/app/v3/services/worker/workerGroupService.server.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,10 +195,12 @@ export class WorkerGroupService extends WithRunEngine {
195195

196196
async getDefaultWorkerGroupForProject({
197197
projectId,
198+
regionOverride,
198199
}: {
199200
projectId: string;
201+
regionOverride?: string;
200202
}): Promise<WorkerInstanceGroup | undefined> {
201-
const project = await this._prisma.project.findUnique({
203+
const project = await this._prisma.project.findFirst({
202204
where: {
203205
id: projectId,
204206
},
@@ -208,8 +210,39 @@ export class WorkerGroupService extends WithRunEngine {
208210
});
209211

210212
if (!project) {
211-
logger.error("[WorkerGroupService] Project not found", { projectId });
212-
return;
213+
throw new Error("Project not found.");
214+
}
215+
216+
// If they've specified a region, we need to check they have access to it
217+
if (regionOverride) {
218+
const workerGroup = await this._prisma.workerInstanceGroup.findFirst({
219+
where: {
220+
masterQueue: regionOverride,
221+
},
222+
});
223+
224+
if (!workerGroup) {
225+
throw new Error(`The region you specified doesn't exist ("${regionOverride}").`);
226+
}
227+
228+
// If they're restricted, check they have access
229+
if (project.allowedWorkerQueues.length > 0) {
230+
if (project.allowedWorkerQueues.includes(workerGroup.masterQueue)) {
231+
return workerGroup;
232+
}
233+
234+
throw new Error(
235+
`You don't have access to this region ("${regionOverride}"). You can use the following regions: ${project.allowedWorkerQueues.join(
236+
", "
237+
)}.`
238+
);
239+
}
240+
241+
if (workerGroup.hidden) {
242+
throw new Error(`The region you specified isn't available to you ("${regionOverride}").`);
243+
}
244+
245+
return workerGroup;
213246
}
214247

215248
if (project.defaultWorkerGroup) {

packages/core/src/v3/schemas/api.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ export const TriggerTaskRequestBody = z.object({
134134
ttl: z.string().or(z.number().nonnegative().int()).optional(),
135135
priority: z.number().optional(),
136136
bulkActionId: z.string().optional(),
137+
region: z.string().optional(),
137138
})
138139
.optional(),
139140
});
@@ -181,6 +182,7 @@ export const BatchTriggerTaskItem = z.object({
181182
test: z.boolean().optional(),
182183
ttl: z.string().or(z.number().nonnegative().int()).optional(),
183184
priority: z.number().optional(),
185+
region: z.string().optional(),
184186
})
185187
.optional(),
186188
});

packages/core/src/v3/types/tasks.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,21 @@ export type TriggerOptions = {
855855
* to the same version as the parent task that is triggering the child tasks.
856856
*/
857857
version?: string;
858+
859+
/**
860+
* Specify the region to run the task in. This overrides the default region set for your project in the dashboard.
861+
*
862+
* Check the Regions page in the dashboard for regions that are available to you.
863+
*
864+
* In DEV this won't do anything, so it's fine to set it in your code.
865+
*
866+
* @example
867+
*
868+
* ```ts
869+
* await myTask.trigger({ foo: "bar" }, { region: "us-east-1" });
870+
* ```
871+
*/
872+
region?: string;
858873
};
859874

860875
export type TriggerAndWaitOptions = Omit<TriggerOptions, "version">;

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,7 @@ export async function batchTriggerById<TTask extends AnyTask>(
627627
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
628628
machine: item.options?.machine,
629629
priority: item.options?.priority,
630+
region: item.options?.region,
630631
lockToVersion: item.options?.version ?? getEnvVar("TRIGGER_VERSION"),
631632
},
632633
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
@@ -796,6 +797,7 @@ export async function batchTriggerByIdAndWait<TTask extends AnyTask>(
796797
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
797798
machine: item.options?.machine,
798799
priority: item.options?.priority,
800+
region: item.options?.region,
799801
},
800802
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
801803
})
@@ -955,6 +957,7 @@ export async function batchTriggerTasks<TTasks extends readonly AnyTask[]>(
955957
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
956958
machine: item.options?.machine,
957959
priority: item.options?.priority,
960+
region: item.options?.region,
958961
lockToVersion: item.options?.version ?? getEnvVar("TRIGGER_VERSION"),
959962
},
960963
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
@@ -1126,6 +1129,7 @@ export async function batchTriggerAndWaitTasks<TTasks extends readonly AnyTask[]
11261129
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
11271130
machine: item.options?.machine,
11281131
priority: item.options?.priority,
1132+
region: item.options?.region,
11291133
},
11301134
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
11311135
})
@@ -1198,6 +1202,7 @@ async function trigger_internal<TRunTypes extends AnyRunTypes>(
11981202
parentRunId: taskContext.ctx?.run.id,
11991203
machine: options?.machine,
12001204
priority: options?.priority,
1205+
region: options?.region,
12011206
lockToVersion: options?.version ?? getEnvVar("TRIGGER_VERSION"),
12021207
},
12031208
},
@@ -1270,6 +1275,7 @@ async function batchTrigger_internal<TRunTypes extends AnyRunTypes>(
12701275
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
12711276
machine: item.options?.machine,
12721277
priority: item.options?.priority,
1278+
region: item.options?.region,
12731279
lockToVersion: item.options?.version ?? getEnvVar("TRIGGER_VERSION"),
12741280
},
12751281
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
@@ -1354,6 +1360,7 @@ async function triggerAndWait_internal<TIdentifier extends string, TPayload, TOu
13541360
idempotencyKeyTTL: options?.idempotencyKeyTTL,
13551361
machine: options?.machine,
13561362
priority: options?.priority,
1363+
region: options?.region,
13571364
},
13581365
},
13591366
{},
@@ -1444,6 +1451,7 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
14441451
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
14451452
machine: item.options?.machine,
14461453
priority: item.options?.priority,
1454+
region: item.options?.region,
14471455
},
14481456
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
14491457
})
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { task } from "@trigger.dev/sdk";
2+
import { fixedLengthTask } from "./batches.js";
3+
4+
export const regionsTask = task({
5+
id: "regions",
6+
run: async ({ region }: { region?: string }, { ctx }) => {
7+
await fixedLengthTask.triggerAndWait({ waitSeconds: 1 }, { region });
8+
},
9+
});

0 commit comments

Comments
 (0)