-
-
Notifications
You must be signed in to change notification settings - Fork 803
Expose concurrencyKey in task run context #2397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: 024e097 The changes in this PR will be included in the next version bump. Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
WalkthroughAdds an optional concurrencyKey?: string field across schemas, runtime contexts, payloads, docs, and tests. packages/core Zod schemas (TaskRun, V3TaskRun) gain concurrencyKey. run-engine resolves and propagates run.concurrencyKey into TaskRunContext and execution payloads (initial and after start). Webapp layers surface concurrencyKey in payloads (SpanPresenter moves it as a top-level run field; loaders, shared queue consumer, and createTaskRunAttempt include it on execution.run). docs/context.mdx documents context.run.concurrencyKey. A unit test ensures TaskExecutor exposes concurrencyKey in task ctx. Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (2)
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1)
1697-1755
: execution payload: concurrencyKey surfaced at the right level — fix remaining queue.concurrencyKey usagesI scanned the repo. No uses of execution.concurrencyKey were found, but there are lingering references to queue.concurrencyKey that must be updated.
Files to fix:
- internal-packages/run-engine/src/run-queue/index.ts:54 — defines CONCURRENCY_KEY: "runqueue.concurrencyKey". Verify whether this string/key should be renamed or kept (it may be a message attribute and intentionally unchanged).
- apps/webapp/app/routes/.../route.tsx:656 — UI uses
run.queue.concurrencyKey
. Update this to read the new location (e.g.run.execution.concurrencyKey
orrun.concurrencyKey
depending on the run object shape).Action: replace consumer accesses of run.queue.concurrencyKey with the new execution-level field and review the run-engine constant for whether the key name needs migration.
apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1)
570-575
: Confirm run payload shape — keep queue.concurrencyKey for now and add top-level concurrencyKeyFound consumers of queue.concurrencyKey in the repo, so removing it now would be breaking. Recommend adding/keeping a top-level concurrencyKey while leaving queue.concurrencyKey in place and migrating callers to prefer the top-level field.
Files to inspect/update:
- apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx (line ~656) — displays: Concurrency key: {run.queue.concurrencyKey ? run.queue.concurrencyKey : "–"}
- internal-packages/run-engine/src/run-queue/index.ts (line 54) — defines CONCURRENCY_KEY: "runqueue.concurrencyKey"
- apps/webapp/app/presenters/v3/SpanPresenter.server.ts (lines ~570-575) — already includes top-level concurrencyKey: concurrencyKey: run.concurrencyKey ?? undefined
Recommended, backward-compatible path:
- Ensure presenters include a top-level concurrencyKey (if not already).
- Keep queue.concurrencyKey in the payload for now.
- Update UI/consumers to prefer run.concurrencyKey and fall back to run.queue.concurrencyKey. Example change:
Concurrency key: {run.concurrencyKey ?? run.queue.concurrencyKey ?? "–"}- Deprecate and remove queue.concurrencyKey in a future release after consumers have migrated.
🧹 Nitpick comments (4)
apps/webapp/app/routes/resources.runs.$runParam.ts (2)
196-196
: Normalize optional to undefined to match schema expectationsElsewhere in this PR, concurrencyKey is emitted as
string | undefined
(notnull
). Align this context payload to avoid serializingnull
into the JSON string.- concurrencyKey: run.concurrencyKey, + concurrencyKey: run.concurrencyKey ?? undefined,
244-247
: Optional: keep payloads consistent by avoiding nulls for optional fieldsFor consistency with other presenters/services in this PR, avoid returning
null
for optional fields in the loader payload. Use undefined when not set so downstream zod schemas/types don’t need to special-casenull
.queue: { name: run.queue, isCustomQueue: !run.queue.startsWith("task/"), - concurrencyKey: run.concurrencyKey, + concurrencyKey: run.concurrencyKey ?? undefined, },packages/core/test/taskExecutor.test.ts (1)
1844-1862
: Good coverage: verifies ctx.concurrencyKey passthroughThe test effectively asserts propagation into RunFn params. Small nit: you can simplify the call and avoid passing explicit
undefined
parameters.- const result = await executeTask(task, { test: "data" }, undefined, undefined); + const result = await executeTask(task, { test: "data" });Optional follow-up: add a complementary test ensuring
ctx.concurrencyKey
isundefined
when the execution payload omits it, to guard against regressions that might surfacenull
instead ofundefined
.Example test to add (outside this hunk):
test("ctx.concurrencyKey is undefined when not provided", async () => { let receivedConcurrencyKey: string | undefined; const task = { id: "test-task", fns: { run: async (_: any, params: RunFnParams<any>) => { receivedConcurrencyKey = params.ctx.concurrencyKey; return { ok: true }; }, }, }; const result = await executeTask(task, {}); expect(result.result.ok).toBe(true); expect(receivedConcurrencyKey).toBeUndefined(); });packages/core/src/v3/schemas/common.ts (1)
331-341
: TaskRunExecution: concurrencyKey addition looks correct and backward-compatibleAdding concurrencyKey as an optional top-level string aligns with the PR objective and preserves backward compatibility (still using passthrough on attempt/task).
If helpful, consider documenting the field’s semantics (e.g., “Set when a run was triggered with a concurrencyKey; undefined otherwise”) as a code comment to aid SDK consumers.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
📒 Files selected for processing (9)
.changeset/eight-keys-impress.md
(1 hunks)apps/webapp/app/presenters/v3/SpanPresenter.server.ts
(1 hunks)apps/webapp/app/routes/resources.runs.$runParam.ts
(1 hunks)apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
(2 hunks)apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
(1 hunks)docs/context.mdx
(1 hunks)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
(4 hunks)packages/core/src/v3/schemas/common.ts
(4 hunks)packages/core/test/taskExecutor.test.ts
(2 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{ts,tsx}
📄 CodeRabbit Inference Engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
packages/core/test/taskExecutor.test.ts
packages/core/src/v3/schemas/common.ts
apps/webapp/app/routes/resources.runs.$runParam.ts
apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
apps/webapp/app/presenters/v3/SpanPresenter.server.ts
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
**/*.test.{ts,tsx}
📄 CodeRabbit Inference Engine (.github/copilot-instructions.md)
Our tests are all vitest
Files:
packages/core/test/taskExecutor.test.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit Inference Engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
packages/core/test/taskExecutor.test.ts
packages/core/src/v3/schemas/common.ts
apps/webapp/app/routes/resources.runs.$runParam.ts
apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
apps/webapp/app/presenters/v3/SpanPresenter.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit Inference Engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}
: In the webapp, all environment variables must be accessed through theenv
export ofenv.server.ts
, instead of directly accessingprocess.env
.
When importing from@trigger.dev/core
in the webapp, never import from the root@trigger.dev/core
path; always use one of the subpath exports as defined in the package's package.json.
Files:
apps/webapp/app/routes/resources.runs.$runParam.ts
apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
apps/webapp/app/presenters/v3/SpanPresenter.server.ts
🧬 Code Graph Analysis (4)
packages/core/test/taskExecutor.test.ts (2)
packages/core/src/v3/workers/taskExecutor.ts (11)
payload
(362-414)payload
(416-452)payload
(680-780)payload
(782-855)payload
(857-930)payload
(932-942)payload
(944-1008)payload
(1010-1018)payload
(1020-1090)payload
(1302-1375)result
(1253-1300)packages/core/src/v3/types/tasks.ts (1)
RunFnParams
(88-95)
apps/webapp/app/routes/resources.runs.$runParam.ts (1)
apps/webapp/app/presenters/v3/SpanPresenter.server.ts (3)
run
(521-527)run
(529-595)run
(597-599)
apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1)
apps/webapp/app/services/runsReplicationService.server.ts (2)
run
(772-827)run
(829-837)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (2)
apps/webapp/app/presenters/v3/SpanPresenter.server.ts (3)
run
(521-527)run
(529-595)run
(597-599)apps/webapp/app/services/runsReplicationService.server.ts (2)
run
(772-827)run
(829-837)
🔇 Additional comments (11)
.changeset/eight-keys-impress.md (1)
1-5
: Changeset looks correct and scopedPatch bump for @trigger.dev/core with a concise public API note. Matches the PR scope (no behavior changes, context shape addition).
apps/webapp/app/v3/services/createTaskRunAttempt.server.ts (1)
228-229
: LGTM: concurrencyKey is correctly propagated and normalizedAdding
concurrencyKey: taskRun.concurrencyKey ?? undefined
to the execution payload matches the updated schema and avoids leaking nulls. This keeps the API shape consistent across services.packages/core/test/taskExecutor.test.ts (1)
1938-1939
: LGTM: execution payload includes concurrencyKey for test harnessSetting
concurrencyKey: "user-123"
on the execution ensures the TaskExecutor receives and maps it toctx
. Matches the updatedTaskRunExecution
type.packages/core/src/v3/schemas/common.ts (3)
385-400
: V3TaskRunExecution: concurrencyKey inclusion is consistent with V3 payloadsConsistent with the V3 execution shape, and properly optional. This keeps the queue clean and exposes concurrencyKey at the correct level.
404-415
: TaskRunContext: concurrencyKey on ctx is exactly what we needExposing ctx.concurrencyKey fulfills the PR’s primary objective. Nice job keeping it optional and at the top level of the context.
427-445
: V3TaskRunContext: concurrencyKey added in parity with non-V3 contextParity across V3 and non-V3 context schemas looks good; optional string matches the rest of the PR changes.
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1)
1621-1644
: Selecting concurrencyKey in AttemptForExecutionGetPayload is necessary and sufficientIncluding concurrencyKey in the taskRun selection ensures it’s available wherever execution payloads are derived. Good catch including it in the base selection used by multiple paths.
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (4)
172-221
: resolveTaskRunContext: selecting run.concurrencyKey is necessary for ctx propagationSelecting concurrencyKey in the run query is required to populate ctx.concurrencyKey later. Good placement.
257-293
: resolveTaskRunContext: concurrencyKey exposed on the returned context — correctSetting concurrencyKey: run.concurrencyKey ?? undefined at the top level of the context satisfies the ctx contract.
395-451
: startRunAttempt: selecting concurrencyKey in the update result prevents undefined laterIncluding concurrencyKey in the post-update select ensures execution payloads can include it reliably. Good defensive selection.
553-621
: Execution payload: concurrencyKey passed through with backward compatible typeAdding concurrencyKey to the BackwardsCompatibleTaskRunExecution instance leverages the Omit<TaskRunExecution, ...> type correctly; the field comes from the underlying TaskRunExecution shape. Aligns with the PR scope.
docs/context.mdx
Outdated
<ResponseField name="concurrencyKey" type="string" optional> | ||
If set, the run is queued under this key. Runs with the same key execute sequentially. | ||
See <a href="/queue-concurrency">Queue concurrency</a> and <a href="/triggering#concurrencykey">Triggering: concurrencyKey</a>. | ||
</ResponseField> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Docs addition is accurate; please verify anchors
The description is clear and placed correctly as a top-level context field. Please confirm the referenced anchors exist and match current slugs.
🌐 Web query:
Do the docs pages `/queue-concurrency` and `/triggering#concurrencykey` exist with the exact slugs/anchors referenced here?
💡 Result:
Short answer — yes and sort-of:
- /docs/queue-concurrency exists. (See the "Concurrency & Queues" docs page.) (trigger.dev)
- /docs/triggering exists and it documents a concurrencyKey option (the page includes a "
concurrencyKey
" subsection). (trigger.dev)
About the exact fragment /triggering#concurrencykey:
- The triggering page contains the concurrencyKey subsection, so linking to the triggering page with a fragment for that heading will generally work in browsers. However, I could not find an explicit HTML id="concurrencykey" string in the page source returned — heading IDs are generated by the site generator and may be hyphenated or use different casing (for example "concurrency-key" or "concurrency-keys-and-per-tenant-queuing"). If you need a link that is guaranteed to land exactly on the "concurrencyKey" content, use the Queue/Concurrency page (/docs/queue-concurrency) which has the "Concurrency keys and per-tenant queuing" section. (trigger.dev)
If you want, I can fetch the raw HTML and look for the exact id attribute (or test the fragment URL directly) and confirm the precise anchor string. Which would you prefer?
Citations:
- 1: https://trigger.dev/docs/queue-concurrency
- 2: https://trigger.dev/docs/triggering
- 3: https://trigger.dev/docs/queue-concurrency
Docs addition is accurate; update Triggering anchor
Both pages exist, but the Triggering page's generated heading ID is likely hyphenated (e.g., concurrency-key), so /triggering#concurrencykey may not jump to the section.
- File: docs/context.mdx — lines 128-132
Suggested update:
<ResponseField name="concurrencyKey" type="string" optional>
If set, the run is queued under this key. Runs with the same key execute sequentially.
See <a href="/queue-concurrency">Queue concurrency</a> and <a href="/triggering#concurrency-key">Triggering: concurrencyKey</a>.
</ResponseField>
🤖 Prompt for AI Agents
In docs/context.mdx around lines 128 to 132, the anchor link to the Triggering
page uses "/triggering#concurrencykey" which likely doesn't match the generated
heading ID; update the anchor to the hyphenated form
"/triggering#concurrency-key" so the link correctly jumps to the concurrencyKey
section; ensure the rest of the text remains unchanged.
@omkardongre this looks great! But I think it doesn't make sense as a top level property on ctx, instead it should go on ctx.run.concurrencyKey |
Thanks for the feedback, I will update it accordingly |
0172cca
to
a9e2c95
Compare
- Schemas: add optional concurrencyKey to TaskRun and V3TaskRun types - Engine: include concurrencyKey in run context building - Services: pass concurrencyKey in execution payload as part of run object - Presenters/Routes: include concurrencyKey in run context returned to UI - Tests: verify concurrencyKey is exposed as ctx.run.concurrencyKey - Docs: document concurrencyKey in docs/context.mdx - Add changeset - No concurrency behavior changes
@ericallam Moved concurrencyKey to ctx.run.concurrencyKey |
a9e2c95
to
024e097
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1)
195-258
: Consider also surfacing concurrencyKey at the top level of the run payload (optional)The run object returned by
getRun()
currently exposesqueue.concurrencyKey
and includesconcurrencyKey
inside the stringifiedcontext
, but not as a top-level field. If the UI or APIs benefit from direct access (e.g., filtering/searching without parsingcontext
), consider adding a top-levelconcurrencyKey
as well. This would mirror how other layers surface it in execution payloads.If you decide to add it, here’s a minimal patch:
idempotencyKey: run.idempotencyKey, idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt, + concurrencyKey: run.concurrencyKey ?? undefined, schedule: await this.resolveSchedule(run.scheduleId ?? undefined), queue: { name: run.queue, isCustomQueue: !run.queue.startsWith("task/"), concurrencyKey: run.concurrencyKey, },
Follow-up checks to consider:
- Confirm no conflicting consumers expect
concurrencyKey
exclusively underqueue
or only insidecontext.run
.- Add/adjust a lightweight presenter-level test (or a route test) to assert that
run.concurrencyKey
is present when the DB row has it set.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (9)
.changeset/eight-keys-impress.md
(1 hunks)apps/webapp/app/presenters/v3/SpanPresenter.server.ts
(1 hunks)apps/webapp/app/routes/resources.runs.$runParam.ts
(1 hunks)apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
(2 hunks)apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
(1 hunks)docs/context.mdx
(1 hunks)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
(4 hunks)packages/core/src/v3/schemas/common.ts
(2 hunks)packages/core/test/taskExecutor.test.ts
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
- .changeset/eight-keys-impress.md
- packages/core/src/v3/schemas/common.ts
- apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
- apps/webapp/app/routes/resources.runs.$runParam.ts
- packages/core/test/taskExecutor.test.ts
- apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
- internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
- docs/context.mdx
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/presenters/v3/SpanPresenter.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/presenters/v3/SpanPresenter.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}
: In the webapp, all environment variables must be accessed through theenv
export ofenv.server.ts
, instead of directly accessingprocess.env
.
When importing from@trigger.dev/core
in the webapp, never import from the root@trigger.dev/core
path; always use one of the subpath exports as defined in the package's package.json.
Files:
apps/webapp/app/presenters/v3/SpanPresenter.server.ts
🧬 Code graph analysis (1)
apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1)
apps/webapp/app/services/runsReplicationService.server.ts (2)
run
(772-827)run
(829-837)
🔇 Additional comments (3)
apps/webapp/app/presenters/v3/SpanPresenter.server.ts (3)
556-570
: Good: concurrencyKey is correctly exposed under ctx.run for V3Placing
concurrencyKey
atcontext.run.concurrencyKey
aligns with the guidance to keep it scoped to the run object. Using?? undefined
avoids leaking empty strings into the context JSON. Looks good.
292-400
: LGTM: Prisma select includes concurrencyKey
concurrencyKey: true
is selected, ensuring availability for both the queue section and the V3 context builder below. No issues spotted.
521-527
: V4 context surfacesconcurrencyKey
as expectedThe V4 code in
runAttemptSystem.resolveTaskRunContext
explicitly mapsrun.concurrencyKey
into the returnedTaskRunContext
(concurrencyKey: run.concurrencyKey ?? undefined
). No changes are required.
Closes #2250
✅ Checklist
Testing
params.ctx.run.concurrencyKey
.run.concurrencyKey
(inside the context.run object).packages/core/test/taskExecutor.test.ts
assertingparams.ctx.run.concurrencyKey
is passed through.concurrencyKey
field under the run object as documented.Changelog
Expose concurrencyKey on task run context (ctx.run.concurrencyKey)
Screenshots
N/A