diff --git a/apps/webapp/app/routes/resources.runs.$runParam.ts b/apps/webapp/app/routes/resources.runs.$runParam.ts index 7b116b31c3..044f25e372 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.ts @@ -189,6 +189,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { version: run.lockedToVersion?.version, parentTaskRunId: run.parentTaskRun?.friendlyId ?? undefined, rootTaskRunId: run.rootTaskRun?.friendlyId ?? undefined, + concurrencyKey: run.concurrencyKey ?? undefined, }, queue: { name: run.queue, diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 075732544c..f158020a37 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -1639,6 +1639,7 @@ export const AttemptForExecutionGetPayload = { baseCostInCents: true, maxDurationInSeconds: true, tags: true, + concurrencyKey: true, }, }, queue: { @@ -1722,6 +1723,7 @@ class SharedQueueTasks { baseCostInCents: taskRun.baseCostInCents, metadata, maxDuration: taskRun.maxDurationInSeconds ?? undefined, + concurrencyKey: taskRun.concurrencyKey ?? undefined, }, queue: { id: queue.friendlyId, diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index df5e4e2b74..e49b9dce0c 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -220,6 +220,7 @@ export class CreateTaskRunAttemptService extends BaseService { version: lockedBy.worker.version, metadata, maxDuration: taskRun.maxDurationInSeconds ?? undefined, + concurrencyKey: taskRun.concurrencyKey ?? undefined, }, queue: { id: queue.friendlyId, diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index 2928995606..129d5bfe48 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -222,6 +222,7 @@ export const TaskRun = z.object({ /** The priority of the run. Wih a value of 10 it will be dequeued before runs that were triggered 9 seconds before it (assuming they had no priority set). */ priority: z.number().optional(), baseCostInCents: z.number().optional(), + concurrencyKey: z.string().optional(), parentTaskRunId: z.string().optional(), rootTaskRunId: z.string().optional(), @@ -375,6 +376,7 @@ export const V3TaskRun = z.object({ durationMs: z.number(), costInCents: z.number(), baseCostInCents: z.number(), + concurrencyKey: z.string().optional(), }); export type V3TaskRun = z.infer; diff --git a/packages/core/test/taskExecutor.test.ts b/packages/core/test/taskExecutor.test.ts index 229a952fff..0db0fe9e55 100644 --- a/packages/core/test/taskExecutor.test.ts +++ b/packages/core/test/taskExecutor.test.ts @@ -944,6 +944,28 @@ describe("TaskExecutor", () => { }); }); + test("should include concurrencyKey in task context", async () => { + let capturedContext: any; + + const task = { + id: "test-task", + fns: { + run: async (payload: any, params: RunFnParams) => { + capturedContext = params.ctx; + return { + output: "test-output", + }; + }, + }, + }; + + const result = await executeTask(task, { test: "data" }, undefined, undefined, "user-123"); + + expect(result.result.ok).toBe(true); + expect(capturedContext).toBeDefined(); + expect(capturedContext.run.concurrencyKey).toBe("user-123"); + }); + test("should handle middleware errors correctly", async () => { const executionOrder: string[] = []; const expectedError = new Error("Middleware error"); @@ -1846,7 +1868,8 @@ function executeTask( task: TaskMetadataWithFunctions, payload: any, signal?: AbortSignal, - retrySettings?: RetryOptions + retrySettings?: RetryOptions, + concurrencyKey?: string ) { const tracingSDK = new TracingSDK({ url: "http://localhost:4318", @@ -1904,6 +1927,7 @@ function executeTask( baseCostInCents: 0, priority: 0, maxDuration: 1000, + concurrencyKey: concurrencyKey, }, machine: { name: "micro",