Skip to content

Commit 024e097

Browse files
committed
expose concurrencyKey in task run context
- Schemas: add optional concurrencyKey to TaskRun and V3TaskRun types - Engine: include concurrencyKey in run context building - Services: pass concurrencyKey in execution payload as part of run object - Presenters/Routes: include concurrencyKey in run context returned to UI - Tests: verify concurrencyKey is exposed as ctx.run.concurrencyKey - Docs: document concurrencyKey in docs/context.mdx - Add changeset - No concurrency behavior changes
1 parent 6f29b7d commit 024e097

File tree

9 files changed

+40
-0
lines changed

9 files changed

+40
-0
lines changed

.changeset/eight-keys-impress.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Expose concurrencyKey on task run context (ctx.run.concurrencyKey)

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ export class SpanPresenter extends BasePresenter {
559559
tags: run.runTags,
560560
isTest: run.isTest,
561561
idempotencyKey: run.idempotencyKey ?? undefined,
562+
concurrencyKey: run.concurrencyKey ?? undefined,
562563
startedAt: run.startedAt ?? run.createdAt,
563564
durationMs: run.usageDurationMs,
564565
costInCents: run.costInCents,

apps/webapp/app/routes/resources.runs.$runParam.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
181181
tags: run.tags.map((tag) => tag.name),
182182
isTest: run.isTest,
183183
idempotencyKey: run.idempotencyKey ?? undefined,
184+
concurrencyKey: run.concurrencyKey,
184185
startedAt: run.startedAt ?? run.createdAt,
185186
durationMs: run.usageDurationMs,
186187
costInCents: run.costInCents,

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,6 +1634,7 @@ export const AttemptForExecutionGetPayload = {
16341634
metadata: true,
16351635
metadataType: true,
16361636
idempotencyKey: true,
1637+
concurrencyKey: true,
16371638
usageDurationMs: true,
16381639
costInCents: true,
16391640
baseCostInCents: true,
@@ -1717,6 +1718,7 @@ class SharedQueueTasks {
17171718
tags: taskRun.tags.map((tag) => tag.name),
17181719
isTest: taskRun.isTest,
17191720
idempotencyKey: taskRun.idempotencyKey ?? undefined,
1721+
concurrencyKey: taskRun.concurrencyKey ?? undefined,
17201722
durationMs: taskRun.usageDurationMs,
17211723
costInCents: taskRun.costInCents,
17221724
baseCostInCents: taskRun.baseCostInCents,

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ export class CreateTaskRunAttemptService extends BaseService {
212212
tags: taskRun.tags.map((tag) => tag.name),
213213
isTest: taskRun.isTest,
214214
idempotencyKey: taskRun.idempotencyKey ?? undefined,
215+
concurrencyKey: taskRun.concurrencyKey ?? undefined,
215216
startedAt: taskRun.startedAt ?? taskRun.createdAt,
216217
durationMs: taskRun.usageDurationMs,
217218
costInCents: taskRun.costInCents,

docs/context.mdx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ export const parentTask = task({
9090
<ResponseField name="idempotencyKey" type="string" optional>
9191
An optional [idempotency key](/idempotency) for the task run.
9292
</ResponseField>
93+
<ResponseField name="concurrencyKey" type="string" optional>
94+
An optional [concurrency key](/triggering#concurrencykey) that groups runs by key for concurrency control.
95+
</ResponseField>
9396
<ResponseField name="maxAttempts" type="number" optional>
9497
The [maximum number of attempts](/triggering#maxattempts) allowed for this task run.
9598
</ResponseField>

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ export class RunAttemptSystem {
193193
runTags: true,
194194
isTest: true,
195195
idempotencyKey: true,
196+
concurrencyKey: true,
196197
startedAt: true,
197198
maxAttempts: true,
198199
taskVersion: true,
@@ -261,6 +262,7 @@ export class RunAttemptSystem {
261262
createdAt: run.createdAt,
262263
startedAt: run.startedAt ?? run.createdAt,
263264
idempotencyKey: run.idempotencyKey ?? undefined,
265+
concurrencyKey: run.concurrencyKey ?? undefined,
264266
maxAttempts: run.maxAttempts ?? undefined,
265267
version: run.taskVersion ?? "unknown",
266268
maxDuration: run.maxDurationInSeconds ?? undefined,
@@ -421,6 +423,7 @@ export class RunAttemptSystem {
421423
runTags: true,
422424
isTest: true,
423425
idempotencyKey: true,
426+
concurrencyKey: true,
424427
startedAt: true,
425428
maxAttempts: true,
426429
taskVersion: true,
@@ -568,6 +571,7 @@ export class RunAttemptSystem {
568571
tags: updatedRun.runTags,
569572
isTest: updatedRun.isTest,
570573
idempotencyKey: updatedRun.idempotencyKey ?? undefined,
574+
concurrencyKey: updatedRun.concurrencyKey ?? undefined,
571575
startedAt: updatedRun.startedAt ?? updatedRun.createdAt,
572576
maxAttempts: updatedRun.maxAttempts ?? undefined,
573577
version: updatedRun.taskVersion ?? "unknown",

packages/core/src/v3/schemas/common.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ export const TaskRun = z.object({
215215
createdAt: z.coerce.date(),
216216
startedAt: z.coerce.date().default(() => new Date()),
217217
idempotencyKey: z.string().optional(),
218+
concurrencyKey: z.string().optional(),
218219
maxAttempts: z.number().optional(),
219220
version: z.string().optional(),
220221
metadata: z.record(DeserializedJsonSchema).optional(),
@@ -369,6 +370,7 @@ export const V3TaskRun = z.object({
369370
createdAt: z.coerce.date(),
370371
startedAt: z.coerce.date().default(() => new Date()),
371372
idempotencyKey: z.string().optional(),
373+
concurrencyKey: z.string().optional(),
372374
maxAttempts: z.number().optional(),
373375
version: z.string().optional(),
374376
metadata: z.record(DeserializedJsonSchema).optional(),

packages/core/test/taskExecutor.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1840,6 +1840,26 @@ describe("TaskExecutor", () => {
18401840
},
18411841
});
18421842
});
1843+
1844+
test("should pass concurrencyKey through to task run context", async () => {
1845+
let receivedConcurrencyKey: string | undefined;
1846+
1847+
const task = {
1848+
id: "test-task",
1849+
fns: {
1850+
run: async (payload: any, params: RunFnParams<any>) => {
1851+
receivedConcurrencyKey = params.ctx.run.concurrencyKey;
1852+
return { success: true };
1853+
},
1854+
},
1855+
};
1856+
1857+
const result = await executeTask(task, { test: "data" }, undefined, undefined);
1858+
1859+
// Verify that concurrencyKey is passed through to task context
1860+
expect(receivedConcurrencyKey).toBe("user-123");
1861+
expect(result.result.ok).toBe(true);
1862+
});
18431863
});
18441864

18451865
function executeTask(
@@ -1904,6 +1924,7 @@ function executeTask(
19041924
baseCostInCents: 0,
19051925
priority: 0,
19061926
maxDuration: 1000,
1927+
concurrencyKey: "user-123",
19071928
},
19081929
machine: {
19091930
name: "micro",

0 commit comments

Comments
 (0)