diff --git a/.changeset/eight-keys-impress.md b/.changeset/eight-keys-impress.md new file mode 100644 index 0000000000..7fbf09aca8 --- /dev/null +++ b/.changeset/eight-keys-impress.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Expose concurrencyKey on task run context (ctx.run.concurrencyKey) \ No newline at end of file diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index a00ffa3f3c..12ed751c35 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -559,6 +559,7 @@ export class SpanPresenter extends BasePresenter { tags: run.runTags, isTest: run.isTest, idempotencyKey: run.idempotencyKey ?? undefined, + concurrencyKey: run.concurrencyKey ?? undefined, startedAt: run.startedAt ?? run.createdAt, durationMs: run.usageDurationMs, costInCents: run.costInCents, diff --git a/apps/webapp/app/routes/resources.runs.$runParam.ts b/apps/webapp/app/routes/resources.runs.$runParam.ts index 7b116b31c3..1c767cf30e 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.ts @@ -181,6 +181,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { tags: run.tags.map((tag) => tag.name), isTest: run.isTest, idempotencyKey: run.idempotencyKey ?? undefined, + concurrencyKey: run.concurrencyKey, startedAt: run.startedAt ?? run.createdAt, durationMs: run.usageDurationMs, costInCents: run.costInCents, diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 075732544c..72ff1c5ad9 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -1634,6 +1634,7 @@ export const AttemptForExecutionGetPayload = { metadata: true, metadataType: true, idempotencyKey: true, + concurrencyKey: true, usageDurationMs: true, costInCents: true, baseCostInCents: true, @@ -1717,6 +1718,7 @@ class SharedQueueTasks { tags: taskRun.tags.map((tag) => tag.name), isTest: taskRun.isTest, idempotencyKey: taskRun.idempotencyKey ?? undefined, + concurrencyKey: taskRun.concurrencyKey ?? undefined, durationMs: taskRun.usageDurationMs, costInCents: taskRun.costInCents, baseCostInCents: taskRun.baseCostInCents, diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index df5e4e2b74..8509d82595 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -212,6 +212,7 @@ export class CreateTaskRunAttemptService extends BaseService { tags: taskRun.tags.map((tag) => tag.name), isTest: taskRun.isTest, idempotencyKey: taskRun.idempotencyKey ?? undefined, + concurrencyKey: taskRun.concurrencyKey ?? undefined, startedAt: taskRun.startedAt ?? taskRun.createdAt, durationMs: taskRun.usageDurationMs, costInCents: taskRun.costInCents, diff --git a/docs/context.mdx b/docs/context.mdx index bd9d3547ed..7f2aa048ca 100644 --- a/docs/context.mdx +++ b/docs/context.mdx @@ -90,6 +90,9 @@ export const parentTask = task({ An optional [idempotency key](/idempotency) for the task run. + + An optional [concurrency key](/triggering#concurrencykey) that groups runs by key for concurrency control. + The [maximum number of attempts](/triggering#maxattempts) allowed for this task run. diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 942c22f6be..29ac3e384a 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -193,6 +193,7 @@ export class RunAttemptSystem { runTags: true, isTest: true, idempotencyKey: true, + concurrencyKey: true, startedAt: true, maxAttempts: true, taskVersion: true, @@ -261,6 +262,7 @@ export class RunAttemptSystem { createdAt: run.createdAt, startedAt: run.startedAt ?? run.createdAt, idempotencyKey: run.idempotencyKey ?? undefined, + concurrencyKey: run.concurrencyKey ?? undefined, maxAttempts: run.maxAttempts ?? undefined, version: run.taskVersion ?? "unknown", maxDuration: run.maxDurationInSeconds ?? undefined, @@ -421,6 +423,7 @@ export class RunAttemptSystem { runTags: true, isTest: true, idempotencyKey: true, + concurrencyKey: true, startedAt: true, maxAttempts: true, taskVersion: true, @@ -568,6 +571,7 @@ export class RunAttemptSystem { tags: updatedRun.runTags, isTest: updatedRun.isTest, idempotencyKey: updatedRun.idempotencyKey ?? undefined, + concurrencyKey: updatedRun.concurrencyKey ?? undefined, startedAt: updatedRun.startedAt ?? updatedRun.createdAt, maxAttempts: updatedRun.maxAttempts ?? undefined, version: updatedRun.taskVersion ?? "unknown", diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index 41a095648f..bd1b515f14 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -215,6 +215,7 @@ export const TaskRun = z.object({ createdAt: z.coerce.date(), startedAt: z.coerce.date().default(() => new Date()), idempotencyKey: z.string().optional(), + concurrencyKey: z.string().optional(), maxAttempts: z.number().optional(), version: z.string().optional(), metadata: z.record(DeserializedJsonSchema).optional(), @@ -369,6 +370,7 @@ export const V3TaskRun = z.object({ createdAt: z.coerce.date(), startedAt: z.coerce.date().default(() => new Date()), idempotencyKey: z.string().optional(), + concurrencyKey: z.string().optional(), maxAttempts: z.number().optional(), version: z.string().optional(), metadata: z.record(DeserializedJsonSchema).optional(), diff --git a/packages/core/test/taskExecutor.test.ts b/packages/core/test/taskExecutor.test.ts index 229a952fff..7902a82283 100644 --- a/packages/core/test/taskExecutor.test.ts +++ b/packages/core/test/taskExecutor.test.ts @@ -1840,6 +1840,26 @@ describe("TaskExecutor", () => { }, }); }); + + test("should pass concurrencyKey through to task run context", async () => { + let receivedConcurrencyKey: string | undefined; + + const task = { + id: "test-task", + fns: { + run: async (payload: any, params: RunFnParams) => { + receivedConcurrencyKey = params.ctx.run.concurrencyKey; + return { success: true }; + }, + }, + }; + + const result = await executeTask(task, { test: "data" }, undefined, undefined); + + // Verify that concurrencyKey is passed through to task context + expect(receivedConcurrencyKey).toBe("user-123"); + expect(result.result.ok).toBe(true); + }); }); function executeTask( @@ -1904,6 +1924,7 @@ function executeTask( baseCostInCents: 0, priority: 0, maxDuration: 1000, + concurrencyKey: "user-123", }, machine: { name: "micro",