Skip to content

Commit 0172cca

Browse files
committed
expose concurrencyKey in task context
- Schemas: add optional concurrencyKey to TaskRunExecution and TaskRunContext types - Engine: include concurrencyKey in execution/context building - Services: pass concurrencyKey in execution payload - Presenters/Routes: include concurrencyKey in context returned to UI - Tests: verify execution.concurrencyKey is exposed as ctx.concurrencyKey - Docs: document in docs/context.mdx - Add changeset - No concurrency behavior changes
1 parent 6f29b7d commit 0172cca

File tree

9 files changed

+44
-0
lines changed

9 files changed

+44
-0
lines changed

.changeset/eight-keys-impress.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Expose concurrencyKey on task run context (ctx.concurrencyKey)

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,7 @@ export class SpanPresenter extends BasePresenter {
571571
name: run.queue,
572572
id: run.queue,
573573
},
574+
concurrencyKey: run.concurrencyKey ?? undefined,
574575
environment: {
575576
id: run.runtimeEnvironment.id,
576577
slug: run.runtimeEnvironment.slug,

apps/webapp/app/routes/resources.runs.$runParam.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
193193
queue: {
194194
name: run.queue,
195195
},
196+
concurrencyKey: run.concurrencyKey,
196197
environment: {
197198
id: run.runtimeEnvironment.id,
198199
slug: run.runtimeEnvironment.slug,

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,6 +1639,7 @@ export const AttemptForExecutionGetPayload = {
16391639
baseCostInCents: true,
16401640
maxDurationInSeconds: true,
16411641
tags: true,
1642+
concurrencyKey: true,
16421643
},
16431644
},
16441645
queue: {
@@ -1727,6 +1728,7 @@ class SharedQueueTasks {
17271728
id: queue.friendlyId,
17281729
name: queue.name,
17291730
},
1731+
concurrencyKey: taskRun.concurrencyKey ?? undefined,
17301732
environment: {
17311733
id: attempt.runtimeEnvironment.id,
17321734
slug: attempt.runtimeEnvironment.slug,

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ export class CreateTaskRunAttemptService extends BaseService {
225225
id: queue.friendlyId,
226226
name: queue.name,
227227
},
228+
concurrencyKey: taskRun.concurrencyKey ?? undefined,
228229
environment: {
229230
id: environment.id,
230231
slug: environment.slug,

docs/context.mdx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ export const parentTask = task({
125125
</Expandable>
126126
</ResponseField>
127127

128+
<ResponseField name="concurrencyKey" type="string" optional>
129+
If set, the run is queued under this key. Runs with the same key execute sequentially.
130+
See <a href="/queue-concurrency">Queue concurrency</a> and <a href="/triggering#concurrencykey">Triggering: concurrencyKey</a>.
131+
</ResponseField>
132+
128133
<ResponseField name="environment" type="object">
129134
<Expandable title="properties">
130135
<ResponseField name="id" type="string">

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ export class RunAttemptSystem {
186186
lockedById: true,
187187
lockedQueueId: true,
188188
queue: true,
189+
concurrencyKey: true,
189190
attemptNumber: true,
190191
status: true,
191192
ttl: true,
@@ -275,6 +276,7 @@ export class RunAttemptSystem {
275276
},
276277
task,
277278
queue,
279+
concurrencyKey: run.concurrencyKey ?? undefined,
278280
organization,
279281
project,
280282
machine: machinePreset,
@@ -410,6 +412,7 @@ export class RunAttemptSystem {
410412
lockedById: true,
411413
lockedQueueId: true,
412414
queue: true,
415+
concurrencyKey: true,
413416
attemptNumber: true,
414417
status: true,
415418
ttl: true,
@@ -596,6 +599,7 @@ export class RunAttemptSystem {
596599
},
597600
task,
598601
queue,
602+
concurrencyKey: updatedRun.concurrencyKey ?? undefined,
599603
environment: {
600604
id: updatedRun.runtimeEnvironment.id,
601605
slug: updatedRun.runtimeEnvironment.slug,

packages/core/src/v3/schemas/common.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ export const TaskRunExecution = z.object({
336336
traceContext: z.record(z.unknown()).optional(),
337337
})
338338
),
339+
concurrencyKey: z.string().optional(),
339340
...StaticTaskRunExecutionShape,
340341
});
341342

@@ -390,6 +391,7 @@ export const V3TaskRunExecution = z.object({
390391
})
391392
),
392393
queue: TaskRunExecutionQueue,
394+
concurrencyKey: z.string().optional(),
393395
environment: TaskRunExecutionEnvironment,
394396
organization: TaskRunExecutionOrganization,
395397
project: TaskRunExecutionProject,
@@ -408,6 +410,7 @@ export const TaskRunContext = z.object({
408410
durationMs: true,
409411
costInCents: true,
410412
}),
413+
concurrencyKey: z.string().optional(),
411414
...StaticTaskRunExecutionShape,
412415
});
413416

@@ -433,6 +436,7 @@ export const V3TaskRunContext = z.object({
433436
}),
434437
task: V3TaskRunExecutionTask,
435438
queue: TaskRunExecutionQueue,
439+
concurrencyKey: z.string().optional(),
436440
environment: V3TaskRunExecutionEnvironment,
437441
organization: TaskRunExecutionOrganization,
438442
project: TaskRunExecutionProject,

packages/core/test/taskExecutor.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1840,6 +1840,26 @@ describe("TaskExecutor", () => {
18401840
},
18411841
});
18421842
});
1843+
1844+
test("should pass concurrencyKey through to task context", async () => {
1845+
let receivedConcurrencyKey: string | undefined;
1846+
1847+
const task = {
1848+
id: "test-task",
1849+
fns: {
1850+
run: async (payload: any, params: RunFnParams<any>) => {
1851+
receivedConcurrencyKey = params.ctx.concurrencyKey;
1852+
return { success: true };
1853+
},
1854+
},
1855+
};
1856+
1857+
const result = await executeTask(task, { test: "data" }, undefined, undefined);
1858+
1859+
// Verify that concurrencyKey is passed through to task context
1860+
expect(receivedConcurrencyKey).toBe("user-123");
1861+
expect(result.result.ok).toBe(true);
1862+
});
18431863
});
18441864

18451865
function executeTask(
@@ -1915,6 +1935,7 @@ function executeTask(
19151935
name: "test-queue",
19161936
id: "test-queue-id",
19171937
},
1938+
concurrencyKey: "user-123",
19181939
environment: {
19191940
type: "PRODUCTION",
19201941
id: "test-environment-id",

0 commit comments

Comments
 (0)