Skip to content

Conversation

@ericallam
Copy link
Member

@ericallam ericallam commented Aug 27, 2025

  • Port some of the v3 dequeue performance improvements to the run engine (more eager dequeuing, dequeue cooloff periods)
  • Cache authenticating the worker groups
  • Return the worker queue length now when dequeuing from the RunQueue

This PR introduces 3 new env vars:

  • RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS the duration a queue is no longer dequeued from (on search dequeues only) when it's in a cooloff state. Defaults to 10000.
  • RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD the number of consecutive failed dequeues that causes a queue to enter a cooloff period. Defaults to 10.
  • RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT the number of messages we attempt to dequeue from a queue (and add to a worker queue) at a time. Defaults to 10

And as a consequence of this PR, the RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS default is now 1000 and you could even increase this value as less "search dequeues" helps with overall throughput (less hammering of Redis with failed dequeue attempts).

@changeset-bot
Copy link

changeset-bot bot commented Aug 27, 2025

⚠️ No Changeset found

Latest commit: 3f96ae0

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 27, 2025

Walkthrough

This patch threads an optional runnerId from WORKER_HEADERS through worker routes and handler signatures and passes it into engine calls. It adds an in-process cache for authenticated worker instances and simplifies their public surface (removing environment/deployment/runner fields and adding WorkerInstanceEnv). The previously separate DB worker module was removed and its logic inlined into DequeueSystem, which now supports blocking-pop, richer result codes, and emits workerQueueLength. RunQueue gained blocking vs non-blocking worker-queue dequeue paths, per-queue cool-off logic, a new Redis Lua command, test coverage, schema and env config updates, and a package dependency on @internal/cache.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/v4-dequeue-performance-1

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (1)

82-85: Avoid logging raw tokens.

Even on failures, emitting token values is sensitive. Log a hash or last 6 chars instead.

-  logger.warn("[WorkerGroupTokenService] No matching worker group found", { token });
+  logger.warn("[WorkerGroupTokenService] No matching worker group found", { tokenLast6: token.slice(-6) });
...
-  logger.error("[WorkerGroupTokenService] Token does not start with expected prefix", { token, prefix: this.tokenPrefix });
+  logger.error("[WorkerGroupTokenService] Token does not start with expected prefix", { prefix: this.tokenPrefix });

Also applies to: 149-155

🧹 Nitpick comments (21)
apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (4)

863-872: Normalize empty runnerId header to undefined.

As written, an empty header yields runnerId === "". Normalize/trim to avoid surprising downstream behavior.

-      const runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID) ?? undefined;
+      const _runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID);
+      const runnerId =
+        _runnerId && _runnerId.trim().length > 0 ? _runnerId.trim() : undefined;

875-879: Use logger instead of console for consistency.

Prefer the structured logger for observability.

-      console.error("Error in API route:", error);
+      logger.error("Error in API route:", { error });

1030-1040: Normalize empty runnerId header to undefined.

Mirror the loader change to avoid passing an empty string.

-      const runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID) ?? undefined;
+      const _runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID);
+      const runnerId =
+        _runnerId && _runnerId.trim().length > 0 ? _runnerId.trim() : undefined;

1043-1046: Use logger instead of console for consistency.

-      console.error("Error in API route:", error);
+      logger.error("Error in API route:", { error });
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (5)

33-43: Typo: Rename RunWithMininimalEnvironment to RunWithMinimalEnvironment.

Minor naming fix for readability and future grep-ability.

-type RunWithMininimalEnvironment = Prisma.TaskRunGetPayload<{
+type RunWithMinimalEnvironment = Prisma.TaskRunGetPayload<{
   include: {
     runtimeEnvironment: {
       select: {
         id: true;
         type: true;
       };
     };
   };
 }>;

Apply the renamed alias at all references in this file.


44-80: Union type duplicates BACKGROUND_WORKER_MISMATCH variant.

Remove duplication so the discriminator narrows cleanly.

 type RunWithBackgroundWorkerTasksResult =
   | {
       success: false;
       code: "NO_RUN";
       message: string;
     }
   | {
       success: false;
-      code:
-        | "NO_WORKER"
-        | "TASK_NOT_IN_LATEST"
-        | "TASK_NEVER_REGISTERED"
-        | "BACKGROUND_WORKER_MISMATCH"
-        | "QUEUE_NOT_FOUND"
-        | "RUN_ENVIRONMENT_ARCHIVED";
+      code: "NO_WORKER" | "TASK_NOT_IN_LATEST" | "TASK_NEVER_REGISTERED" | "QUEUE_NOT_FOUND" | "RUN_ENVIRONMENT_ARCHIVED";
       message: string;
-      run: RunWithMininimalEnvironment;
+      run: RunWithMinimalEnvironment;
     }
   | {
       success: false;
       code: "BACKGROUND_WORKER_MISMATCH";
       message: string;
       backgroundWorker: {
         expected: string;
         received: string;
       };
-      run: RunWithMininimalEnvironment;
+      run: RunWithMinimalEnvironment;
     }
   | {
       success: true;
-      run: RunWithMininimalEnvironment;
+      run: RunWithMinimalEnvironment;
       worker: BackgroundWorker;
       task: BackgroundWorkerTask;
       queue: TaskQueue;
       deployment: WorkerDeployment | null;
     };

105-123: Default and consistently propagate blockingPop options.

You log blocking_pop defaulting to true, but pass possibly-undefined to RunQueue. Pass explicit defaults both to the call and telemetry.

   async dequeueFromWorkerQueue({
     consumerId,
     workerQueue,
     backgroundWorkerId,
     workerId,
     runnerId,
     tx,
-    blockingPop,
-    blockingPopTimeoutSeconds,
+    blockingPop = true,
+    blockingPopTimeoutSeconds = 2,
   }: {
@@
-        const message = await this.$.runQueue.dequeueMessageFromWorkerQueue(
+        const message = await this.$.runQueue.dequeueMessageFromWorkerQueue(
           consumerId,
           workerQueue,
           {
-            blockingPop,
-            blockingPopTimeoutSeconds,
+            blockingPop,
+            blockingPopTimeoutSeconds,
           }
         );
@@
-        span.setAttribute("blocking_pop", blockingPop ?? true);
+        span.setAttribute("blocking_pop", blockingPop);

Adjust the chosen timeout to your intended default if 2s isn’t desired.

Also applies to: 131-138, 146-154


929-956: Most recent worker selection: prefer createdAt over id if available.

If ids aren’t strictly time-ordered in all stores, order by createdAt desc for clarity.

-        orderBy:
-          {
-            id: "desc",
-          },
+        orderBy: { createdAt: "desc" },

1001-1069: Promotion fallback logic: add attribute to distinguish MANAGED vs non-MANAGED path and environment id.

You already set result tags; also consider logging environmentId for correlation.

-        span.setAttribute("result", "SUCCESS_CURRENT_MANAGED");
+        span.setAttribute("result", "SUCCESS_CURRENT_MANAGED");
+        span.setAttribute("environment_id", environmentId);
@@
-        span.setAttribute("result", "SUCCESS_LATEST_V2");
+        span.setAttribute("result", "SUCCESS_LATEST_V2");
+        span.setAttribute("environment_id", environmentId);
apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts (1)

32-38: Fix incorrect log/response message (“suspend” → “continue”).

Minor copy-paste issue in error path.

-      logger.warn("Failed to suspend run", { runFriendlyId, snapshotFriendlyId, error });
+      logger.warn("Failed to continue run", { runFriendlyId, snapshotFriendlyId, error });
@@
-      throw json({ error: "Failed to continue run execution" }, { status: 422 });
+      throw json({ error: "Failed to continue run" }, { status: 422 });
packages/core/src/v3/schemas/runEngine.ts (1)

238-240: Constrain workerQueueLength to non-negative integer.

Aligns schema with Redis LLEN semantics and avoids floats/negatives.

-  workerQueueLength: z.number().optional(),
+  workerQueueLength: z.number().int().nonnegative().optional(),
apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts (1)

26-35: Add runnerId to logs for traceability

Include runnerId in the debug/error logs to correlate actions per runner.

-    logger.debug("Suspending run", { runFriendlyId, snapshotFriendlyId, body });
+    logger.debug("Suspending run", { runFriendlyId, snapshotFriendlyId, runnerId, body });

-      logger.error("Failed to suspend run", {
-        runFriendlyId,
-        snapshotFriendlyId,
-        error: body.error,
-      });
+      logger.error("Failed to suspend run", {
+        runFriendlyId,
+        snapshotFriendlyId,
+        runnerId,
+        error: body.error,
+      });

-      logger.error("Failed to suspend run", { runFriendlyId, snapshotFriendlyId, error });
+      logger.error("Failed to suspend run", { runFriendlyId, snapshotFriendlyId, runnerId, error });

Also applies to: 47-50

internal-packages/run-engine/src/engine/index.ts (2)

569-581: API surface: optional blocking dequeue params — LGTM with a small polish

Parameters are threaded correctly. Consider defaulting blockingPopTimeoutSeconds from configured queue timeout when provided as undefined.

   async dequeueFromWorkerQueue({
@@
-    blockingPop?: boolean;
-    blockingPopTimeoutSeconds?: number;
+    blockingPop?: boolean;
+    blockingPopTimeoutSeconds?: number;
   }): Promise<DequeuedMessage[]> {

Follow-up (optional): inside the method, apply a fallback if desired:

-    const dequeuedMessage = await this.dequeueSystem.dequeueFromWorkerQueue({
+    const dequeuedMessage = await this.dequeueSystem.dequeueFromWorkerQueue({
       consumerId,
       workerQueue,
@@
-      blockingPop,
-      blockingPopTimeoutSeconds,
+      blockingPop,
+      blockingPopTimeoutSeconds:
+        blockingPopTimeoutSeconds ?? this.options.queue?.dequeueBlockingTimeoutSeconds,
     });

595-596: Avoid hardcoding 10s blocking timeout; use configured default

Use this.options.queue?.dequeueBlockingTimeoutSeconds to keep behavior configurable.

   return this.dequeueFromWorkerQueue({
@@
-    blockingPop: true,
-    blockingPopTimeoutSeconds: 10,
+    blockingPop: true,
+    blockingPopTimeoutSeconds: this.options.queue?.dequeueBlockingTimeoutSeconds ?? 10,
   });

Also applies to: 629-630

internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts (1)

472-556: Cooloff test is solid; consider shortening durations to keep the suite fast.

  • The test currently waits ~11s (1s + 10s). You can keep intent while reducing wall time by lowering the cooloff period and wait:

Apply:

-        masterQueueCooloffPeriodMs: 10_000,
+        masterQueueCooloffPeriodMs: 2_000,
...
-        await setTimeout(10_000);
+        await setTimeout(2_000);

Optional: assert the optional workerQueueLength to exercise the new field in DequeuedMessage:

if (dequeued4?.workerQueueLength !== undefined) expect(typeof dequeued4.workerQueueLength).toBe("number");
internal-packages/run-engine/src/run-queue/index.ts (4)

73-76: New cooloff and dequeue sizing options added; document defaults and interactions.

Add JSDoc on RunQueueOptions to clarify default values and how masterQueueConsumerDequeueCount interacts with env/queue concurrency caps. Helps avoid misconfiguration.


148-157: Cooloff state map may grow unbounded; add pruning.

Queues that disappear will leave entries in _queueCooloffStates. Consider:

  • Deleting entries when a queue successfully yields messages.
  • Periodic sweep to drop expired cooloff entries.
-            messagesDequeued += messages.length;
+            messagesDequeued += messages.length;
+            // reset any cooloff tracking on success
+            this._queueCooloffStates.delete(queue);

Also applies to: 174-175


1220-1223: Per-queue cooloff logic is reasonable; add guardrails and tracing.

  • Good use of Math.max(1, …) for threshold.
  • Suggest setting span attribute when a queue is actively cooled off to ease debugging.
  • Consider capping consecutiveFailures to avoid integer growth.
- if (cooloffState._tag === "cooloff") {
+ if (cooloffState._tag === "cooloff") {
+   span.addEvent("queue_in_cooloff", { queue, cooloff_expires_at: cooloffState.cooloffExpiresAt });
    ...
- } else {
+ } else {
    this._queueCooloffStates.delete(queue);
 }
...
- this._queueCooloffStates.set(queue, {
+ this._queueCooloffStates.set(queue, {
   _tag: "normal",
-  consecutiveFailures: cooloffState.consecutiveFailures + 1,
+  consecutiveFailures: Math.min(cooloffState.consecutiveFailures + 1, 1_000_000),
 });

Also applies to: 1230-1232, 1237-1311, 1316-1317, 1322-1325


2261-2280: Lua: rename local variable for clarity.

messageId holds a Redis key, not an id. Use messageKey to avoid confusion.

-local messageId = redis.call('LPOP', workerQueueKey)
+local messageKey = redis.call('LPOP', workerQueueKey)
...
-if not messageId then
+if not messageKey then
     return nil
 end
...
-return {messageId, queueLength}
+return {messageKey, queueLength}
apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (2)

31-47: Cache settings — LGTM; consider metrics.

10m fresh/11m stale is sensible. Consider instrumenting hit/miss to validate efficacy.


249-259: getOrCreateWorkerInstance race handling — use upsert or fix log field.

  • Current P2002 handling is fine; optionally switch to upsert on the composite unique to avoid the race entirely.
  • The error log references workerInstance, but you likely meant existingWorkerInstance.
-            logger.error("[WorkerGroupTokenService] Failed to find worker instance", {
-              workerGroup,
-              workerInstance,
-            });
+            logger.error("[WorkerGroupTokenService] Failed to find worker instance", {
+              workerGroup,
+              instanceName,
+            });

If the schema has a unique on (workerGroupId, resourceIdentifier), prefer:

// alternative outside selected lines
await this._prisma.workerInstance.upsert({
  where: { workerGroupId_resourceIdentifier: { workerGroupId: workerGroup.id, resourceIdentifier } },
  create: { workerGroupId: workerGroup.id, name: instanceName, resourceIdentifier },
  update: {},
  include: { deployment: true, environment: true },
});

Also applies to: 265-303

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 24a9151 and 3478194.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (15)
  • apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts (1 hunks)
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts (2 hunks)
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts (1 hunks)
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts (2 hunks)
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts (1 hunks)
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts (2 hunks)
  • apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (5 hunks)
  • apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (8 hunks)
  • apps/webapp/package.json (1 hunks)
  • internal-packages/run-engine/src/engine/db/worker.ts (0 hunks)
  • internal-packages/run-engine/src/engine/index.ts (4 hunks)
  • internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (8 hunks)
  • internal-packages/run-engine/src/run-queue/index.ts (12 hunks)
  • internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts (1 hunks)
  • packages/core/src/v3/schemas/runEngine.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • internal-packages/run-engine/src/engine/db/worker.ts
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts
  • packages/core/src/v3/schemas/runEngine.ts
  • internal-packages/run-engine/src/engine/index.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts
  • apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts
  • internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
  • internal-packages/run-engine/src/run-queue/index.ts
  • apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
**/*.test.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Our tests are all vitest

Files:

  • internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • packages/core/src/v3/schemas/runEngine.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts
  • apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts
  • apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: In the webapp, all environment variables must be accessed through the env export of env.server.ts, instead of directly accessing process.env.
When importing from @trigger.dev/core in the webapp, never import from the root @trigger.dev/core path; always use one of the subpath exports as defined in the package's package.json.

Files:

  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts
  • apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
  • apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts
  • apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
apps/webapp/app/services/**/*.server.ts

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

For testable services, separate service logic and configuration, as exemplified by realtimeClient.server.ts (service) and realtimeClientGlobal.server.ts (configuration).

Files:

  • apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
🧬 Code graph analysis (6)
internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts (2)
internal-packages/testcontainers/src/index.ts (3)
  • redisTest (167-167)
  • redisContainer (116-130)
  • assertNonNullable (19-19)
internal-packages/testcontainers/src/utils.ts (1)
  • assertNonNullable (173-176)
apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts (2)
packages/core/src/v3/runEngineWorker/supervisor/schemas.ts (2)
  • WorkerApiDequeueResponseBody (73-73)
  • WorkerApiDequeueResponseBody (74-74)
packages/core/src/v3/apps/http.ts (1)
  • json (65-75)
apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (2)
packages/cli-v3/src/entryPoints/managed/controller.ts (1)
  • runnerId (127-129)
packages/core/src/v3/runEngineWorker/consts.ts (1)
  • WORKER_HEADERS (1-6)
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (5)
internal-packages/database/src/transaction.ts (1)
  • PrismaClientOrTransaction (8-8)
internal-packages/run-engine/src/run-queue/index.ts (6)
  • workerQueue (1542-1650)
  • message (1395-1443)
  • message (1652-1707)
  • message (1709-1757)
  • message (1759-1787)
  • message (1808-1821)
apps/webapp/app/v3/tracing.server.ts (1)
  • startSpan (7-35)
internal-packages/tracing/src/index.ts (1)
  • startSpan (49-79)
packages/core/src/v3/isomorphic/consts.ts (1)
  • CURRENT_DEPLOYMENT_LABEL (1-1)
internal-packages/run-engine/src/run-queue/index.ts (3)
apps/webapp/app/v3/marqs/index.server.ts (10)
  • options (1311-1324)
  • consumerId (759-817)
  • consumerId (822-992)
  • queueKey (994-1050)
  • message (540-546)
  • message (548-554)
  • message (1255-1259)
  • message (1469-1669)
  • message (1745-1813)
  • message (1815-1874)
packages/core/src/v3/schemas/runEngine.ts (2)
  • DequeuedMessage (234-272)
  • DequeuedMessage (273-273)
internal-packages/run-engine/src/run-queue/keyProducer.ts (3)
  • queueKey (99-125)
  • workerQueueKey (41-43)
  • messageKey (254-258)
apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (5)
apps/webapp/app/services/platform.v3.server.ts (1)
  • singleton (116-116)
internal-packages/cache/src/index.ts (4)
  • createCache (2-2)
  • Namespace (4-4)
  • DefaultStatefulContext (3-3)
  • MemoryStore (9-9)
packages/core/src/v3/runEngineWorker/consts.ts (1)
  • WORKER_HEADERS (1-6)
apps/webapp/app/env.server.ts (1)
  • env (1081-1081)
apps/webapp/app/v3/machinePresets.server.ts (1)
  • machinePresetFromName (27-32)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (17)
apps/webapp/package.json (1)

57-59: ✅ Verified “@internal/cache” dependency exists and is in use

  • Monorepo contains a package named @internal/cache.
  • It’s imported in apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (Line 1).

No dead dependency detected—resolving this comment.

apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (2)

799-801: LGTM: runnerId added to worker loader handler signature.


932-934: LGTM: runnerId added to worker action handler signature.

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts (1)

17-19: LGTM: runnerId propagated to continueRunExecution.

Also applies to: 27-28

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts (1)

22-22: RunnerId propagation — LGTM

Accepting and forwarding runnerId is consistent with the new worker-context threading.

Also applies to: 43-44

apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts (1)

12-17: Dequeue is runner-scoped — please verify signature

Forwarding runnerId aligns with the engine API changes. We attempted to locate the signature for authenticatedWorker.dequeue in the codebase but did not find a definition accepting { runnerId?: string }. Please confirm in the @trigger.dev/core/v3/workers package that the method signature for dequeue matches this shape.

• apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts – lines 12–17

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts (1)

16-24: Heartbeat runnerId threading — LGTM

Propagation is correct and consistent with other routes.

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts (1)

21-31: Complete-attempt runnerId threading — LGTM

Passing runnerId through to completeRunAttempt matches the updated engine API.

internal-packages/run-engine/src/engine/index.ts (1)

569-571: Confirm fallback behavior for optional blocking flags in DequeueSystem

Please verify that when blockingPop and blockingPopTimeoutSeconds are left undefined, we fall back to your intended defaults (e.g. non-blocking or a configured timeout) rather than passing undefined through. In particular:

• In
internal-packages/run-engine/src/engine/index.ts (around line 561), the engine’s public dequeueFromWorkerQueue simply forwards these parameters as-is.
• In
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (around lines 105–110), the system calls
ts this.$.runQueue.dequeueMessageFromWorkerQueue( consumerId, workerQueue, { blockingPop, blockingPopTimeoutSeconds } );
without explicit defaults.

Recommendations:
• Either define defaults in the method signature, e.g.

async dequeueFromWorkerQueue({,
  blockingPop = false,
  blockingPopTimeoutSeconds = DEFAULT_TIMEOUT,
}: {  })

• Or, before calling dequeueMessageFromWorkerQueue, normalize to

{
  blockingPop: blockingPop ?? false,
  blockingPopTimeoutSeconds: blockingPopTimeoutSeconds ?? DEFAULT_TIMEOUT,
}

• Also ensure the span.setAttribute("blocking_pop", blockingPop ?? true) aligns with the actual behavior passed to the queue.

Please confirm that the underlying runQueue.dequeueMessageFromWorkerQueue handles undefined values as you expect, or adjust here to enforce your desired defaults.

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts (1)

21-22: runnerId threaded consistently across all worker action routes

I’ve verified that every engine.v1.worker-actions handler now includes runnerId in its parameters and forwards it in the payload:

  • start.ts (lines 21 & 29)
  • suspend.ts (lines 22 & 43)
  • heartbeat.ts (lines 16 & 23)
  • continue.ts (lines 17 & 27)
  • attempts.complete.ts (lines 21 & 30)
  • dequeue.ts (lines 14 & 16)

All routes accept and pass along runnerId for complete telemetry coverage. No further changes needed.

internal-packages/run-engine/src/run-queue/index.ts (5)

103-104: Expose workerQueueLength on dequeues — LGTM.

The field is optional and populated in both blocking and non-blocking paths. Matches schema addition in core.


1102-1108: Pass-through of masterQueueConsumerDequeueCount — LGTM.

Makes dequeue sizing explicit per loop iteration.


1629-1649: Non-blocking path returns messageKey — LGTM.

The Lua command returns the message key and length; conversion to number is correct.


2594-2598: Commander signature for non-blocking dequeue — LGTM.


618-633: Explicitly opt-in to blocking behavior in dequeueMessageFromWorkerQueue

After auditing the codebase, we found over 100 call sites (in both production code and tests) invoking dequeueMessageFromWorkerQueue(…) without passing an options object, meaning blockingPop silently defaults to true for all of them. This implicit blocking can introduce up to 10 seconds of unexpected latency for callers who assumed non-blocking semantics.

To avoid breaking existing behavior while making intent explicit, consider one of the following approaches:

• Require callers to pass a defined options.blockingPop flag (removing the ?? true default).
• Deprecate the implicit blocking default in a major release, switching to false by default and updating all call sites to opt into blocking where needed.

Next steps:

  • Audit these locations and decide on a migration strategy (breaking vs. non-breaking):
    • internal-packages/run-engine/src/run-queue/index.ts (default assignment)
    • All calls to dequeueMessageFromWorkerQueue in tests under src/run-queue/tests and in src/engine/systems/dequeueSystem.ts.
  • Update tests and production code to pass options.blockingPop explicitly.
apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (2)

369-376: runnerId propagation — LGTM.

All worker-facing methods now accept and pass runnerId through to the engine.

Also applies to: 389-404, 406-427, 455-473, 481-499, 501-516


323-325: Zod enum export — LGTM.

Conforms to the guideline (using zod rather than TS enums).

Comment on lines +25 to 26
import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker";

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Confirm subpath export for WORKER_HEADERS.

Ensure "@trigger.dev/core/v3/runEngineWorker" re-exports WORKER_HEADERS. If not, import from the consts file.

Run:


🏁 Script executed:

#!/bin/bash
rg -nP 'export\s+const\s+WORKER_HEADERS' packages/core/src -n -C2
rg -n '@trigger.dev/core/v3/runEngineWorker' packages/core -n || true

Length of output: 417


Replace import to use the consts subpath for WORKER_HEADERS

The WORKER_HEADERS constant lives in packages/core/src/v3/runEngineWorker/consts.ts and is not re-exported at the module root. Please update the import to pull directly from the consts file:

• File: apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Lines: 25–26

- import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker";
+ import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker/consts";

This ensures you’re importing the actual export location and avoids a broken module resolution.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker";
import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker/consts";
🤖 Prompt for AI Agents
In apps/webapp/app/services/routeBuilders/apiBuilder.server.ts around lines
25–26, the import for WORKER_HEADERS currently points to
"@trigger.dev/core/v3/runEngineWorker" which does not re-export that constant;
update the import to reference the actual file path
"packages/core/src/v3/runEngineWorker/consts" (or the package-relative subpath
that maps to runEngineWorker/consts) so WORKER_HEADERS is imported directly from
its defining module; replace the existing import statement with one that imports
WORKER_HEADERS from the consts subpath and ensure any import path uses the
package's published subpath form (e.g.
"@trigger.dev/core/v3/runEngineWorker/consts") to avoid broken module
resolution.

Comment on lines 196 to 229
`worker-group-token-${token}`,
async () => {
const workerGroup = await this.findWorkerGroup({ token });

if (!workerGroup) {
logger.warn("[WorkerGroupTokenService] Worker group not found", { token });
return;
}

const workerInstance = await this.getOrCreateWorkerInstance({
workerGroup,
instanceName,
});

if (!workerInstance.deployment.workerId) {
logger.error(
"[WorkerGroupTokenService] Unmanaged worker instance deployment not linked to background worker",
{ workerGroup, workerInstance }
);
if (!workerInstance) {
logger.error("[WorkerGroupTokenService] Unable to get or create worker instance", {
workerGroup,
instanceName,
});
return;
}

return new AuthenticatedWorkerInstance({
prisma: this._prisma,
engine: this._engine,
type: WorkerInstanceGroupType.MANAGED,
name: workerGroup.name,
workerGroupId: workerGroup.id,
workerInstanceId: workerInstance.id,
masterQueue: workerGroup.masterQueue,
});
}
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Cache key should not include the raw token.

If cache keys ever get logged/inspected, raw tokens leak. Use the token hash instead.

-      `worker-group-token-${token}`,
+      `worker-group-token-${await this.hashToken(token)}`,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const result = await authenticatedWorkerInstanceCache.authenticatedWorkerInstance.swr(
`worker-group-token-${token}`,
async () => {
const workerGroup = await this.findWorkerGroup({ token });
if (!workerGroup) {
logger.warn("[WorkerGroupTokenService] Worker group not found", { token });
return;
}
const workerInstance = await this.getOrCreateWorkerInstance({
workerGroup,
instanceName,
});
if (!workerInstance.deployment.workerId) {
logger.error(
"[WorkerGroupTokenService] Unmanaged worker instance deployment not linked to background worker",
{ workerGroup, workerInstance }
);
if (!workerInstance) {
logger.error("[WorkerGroupTokenService] Unable to get or create worker instance", {
workerGroup,
instanceName,
});
return;
}
return new AuthenticatedWorkerInstance({
prisma: this._prisma,
engine: this._engine,
type: WorkerInstanceGroupType.MANAGED,
name: workerGroup.name,
workerGroupId: workerGroup.id,
workerInstanceId: workerInstance.id,
masterQueue: workerGroup.masterQueue,
});
}
);
const result = await authenticatedWorkerInstanceCache.authenticatedWorkerInstance.swr(
`worker-group-token-${await this.hashToken(token)}`,
async () => {
const workerGroup = await this.findWorkerGroup({ token });
if (!workerGroup) {
logger.warn("[WorkerGroupTokenService] Worker group not found", { token });
return;
}
const workerInstance = await this.getOrCreateWorkerInstance({
workerGroup,
instanceName,
});
if (!workerInstance) {
logger.error("[WorkerGroupTokenService] Unable to get or create worker instance", {
workerGroup,
instanceName,
});
return;
}
return new AuthenticatedWorkerInstance({
prisma: this._prisma,
engine: this._engine,
type: WorkerInstanceGroupType.MANAGED,
name: workerGroup.name,
workerGroupId: workerGroup.id,
workerInstanceId: workerInstance.id,
masterQueue: workerGroup.masterQueue,
});
}
);
🤖 Prompt for AI Agents
In apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts around
lines 196 to 229, the cache key currently embeds the raw token which can leak
secrets if keys are logged or inspected; compute a stable cryptographic hash
(e.g., SHA-256) of the token and use that hash in the cache key instead (e.g.,
worker-group-token-<tokenHash>), ensuring you handle null/undefined tokens
safely and compute the hash synchronously via the Node crypto API or a shared
util so the rest of the swr call remains unchanged.

Comment on lines +870 to +873
const queue = workerWithTasks.queues.find((queue) =>
run.lockedQueueId ? queue.id === run.lockedQueueId : queue.name === run.queue
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Queue resolution may use wrong field; prefer workerQueue with safe fallback.

TaskRun appears to use workerQueue (see later usage). Comparing to run.queue risks false QUEUE_NOT_FOUND.

-      const queue = workerWithTasks.queues.find((queue) =>
-        run.lockedQueueId ? queue.id === run.lockedQueueId : queue.name === run.queue
-      );
+      const requestedQueueName = (run as any).workerQueue ?? (run as any).queue;
+      const queue = workerWithTasks.queues.find((q) =>
+        run.lockedQueueId ? q.id === run.lockedQueueId : q.name === requestedQueueName
+      );

Verify the canonical field on TaskRun:


🏁 Script executed:

#!/bin/bash
rg -nP 'model\s+TaskRun\b|interface\s+TaskRun\b' internal-packages database packages -n -C3 || true
rg -nP '\bworkerQueue\b' -n || true
rg -nP '\bqueue\b' -n | head -n 50

Length of output: 33505


🏁 Script executed:

#!/bin/bash
# Extract the TaskRun model definition from Prisma schema
sed -n '550,620p' internal-packages/database/prisma/schema.prisma

# Show context around the queue resolution in dequeueSystem.ts
sed -n '830,910p' internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Length of output: 4578


Use workerQueue for queue lookup with fallback to queue

The current lookup uses the deprecated queue field, but TaskRun’s primary queue is stored in workerQueue (mapped to masterQueue in the database). To ensure we match the correct queue, switch to workerQueue first and fall back to queue only if workerQueue is unset.

Affected location:

  • internal-packages/run-engine/src/engine/systems/dequeueSystem.ts around line 870

Proposed change:

-      const queue = workerWithTasks.queues.find((queue) =>
-        run.lockedQueueId ? queue.id === run.lockedQueueId : queue.name === run.queue
-      );
+      const requestedQueueName = run.workerQueue ?? run.queue;
+      const queue = workerWithTasks.queues.find((q) =>
+        run.lockedQueueId ? q.id === run.lockedQueueId : q.name === requestedQueueName
+      );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const queue = workerWithTasks.queues.find((queue) =>
run.lockedQueueId ? queue.id === run.lockedQueueId : queue.name === run.queue
);
const requestedQueueName = run.workerQueue ?? run.queue;
const queue = workerWithTasks.queues.find((q) =>
run.lockedQueueId ? q.id === run.lockedQueueId : q.name === requestedQueueName
);
🤖 Prompt for AI Agents
In internal-packages/run-engine/src/engine/systems/dequeueSystem.ts around lines
870-873, the queue lookup currently falls back to the deprecated run.queue;
change the predicate to prefer run.workerQueue and only fall back to run.queue
when workerQueue is unset, while preserving the existing lockedQueueId check.
Concretely, keep the lockedQueueId branch as-is, and in the non-locked branch
match queue.name to run.workerQueue if present, otherwise to run.queue (i.e. use
workerQueue first then queue).

Comment on lines +1351 to +1359
if (messages.length === 0) {
return;
}

await this.#enqueueMessagesToWorkerQueues(messages);

if (messages.length === 10) {
await this.#processQueueForWorkerQueue(queueKey, environmentId);
}
Copy link
Contributor

@coderabbitai coderabbitai bot Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid recursion when draining batches; use scheduling to prevent deep call stacks.

Recursive self-calls can blow the stack on long drains and block the event loop. Reuse the internal worker (like other paths) to schedule follow-up.

-    if (messages.length === 10) {
-      await this.#processQueueForWorkerQueue(queueKey, environmentId);
-    }
+    if (messages.length === 10) {
+      await this.worker.enqueueOnce({
+        id: queueKey,
+        job: "processQueueForWorkerQueue",
+        payload: { queueKey, environmentId },
+        availableAt: new Date(Date.now() + 100), // tiny delay to yield
+      });
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (messages.length === 0) {
return;
}
await this.#enqueueMessagesToWorkerQueues(messages);
if (messages.length === 10) {
await this.#processQueueForWorkerQueue(queueKey, environmentId);
}
if (messages.length === 0) {
return;
}
await this.#enqueueMessagesToWorkerQueues(messages);
if (messages.length === 10) {
await this.worker.enqueueOnce({
id: queueKey,
job: "processQueueForWorkerQueue",
payload: { queueKey, environmentId },
availableAt: new Date(Date.now() + 100), // tiny delay to yield
});
}
🤖 Prompt for AI Agents
In internal-packages/run-engine/src/run-queue/index.ts around lines 1351 to
1359, the code directly calls await this.#processQueueForWorkerQueue(...) after
enqueuing messages which creates a recursive drain path; change this to schedule
a follow-up drain via the internal worker/event loop (like other codepaths)
instead of direct recursion — e.g., push a drain task onto the worker queue or
use the existing internal scheduler to invoke #processQueueForWorkerQueue
asynchronously (setImmediate/nextTick or the internal worker task API) so the
next batch is processed without growing the call stack or blocking the event
loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericallam is it right about blowing the stack? It doesn't make sense to schedule it like it's saying

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

@ericallam ericallam force-pushed the fix/v4-dequeue-performance-1 branch from 6dd5d2f to 3f96ae0 Compare August 27, 2025 17:45
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
internal-packages/run-engine/src/run-queue/index.ts (1)

1237-1311: Reset cooloff failure counter on success to avoid premature cooloff

Currently, consecutiveFailures never resets after a successful dequeue, making a queue enter cooloff after N total misses even with intervening hits.

             if (messages.length === 0) {
               ...
             }
 
-            messagesDequeued += messages.length;
+            messagesDequeued += messages.length;
+            // Reset cooloff state on success
+            this._queueCooloffStates.set(queue, { _tag: "normal", consecutiveFailures: 0 });
 
             await this.#enqueueMessagesToWorkerQueues(messages);
apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (2)

82-85: Do not log raw tokens; log a hash or a redacted value

Leaking bearer tokens is a security risk. Use a sha256 token hash or redact.

-      logger.warn("[WorkerGroupTokenService] No matching worker group found", { token });
+      logger.warn("[WorkerGroupTokenService] No matching worker group found", {
+        tokenHash,
+      });
...
-      logger.error("[WorkerGroupTokenService] Token does not start with expected prefix", {
-        token,
-        prefix: this.tokenPrefix,
-      });
+      logger.error("[WorkerGroupTokenService] Token does not start with expected prefix", {
+        tokenPreview: token.slice(0, 8) + "...",
+        prefix: this.tokenPrefix,
+      });

Also applies to: 150-155


312-321: Exclude managed-secret by default in sanitizeHeaders

Prevent accidental disclosure anywhere sanitizeHeaders is used without explicit skips.

-  private sanitizeHeaders(request: Request, skipHeaders = ["authorization"]) {
+  private sanitizeHeaders(
+    request: Request,
+    skipHeaders = ["authorization", WORKER_HEADERS.MANAGED_SECRET]
+  ) {
♻️ Duplicate comments (4)
internal-packages/run-engine/src/run-queue/index.ts (2)

1351-1360: Avoid recursive draining; schedule follow-up batch instead

Recursion can grow the stack and hog the event loop on long drains. Schedule via the internal worker.

-    if (messages.length === 10) {
-      await this.#processQueueForWorkerQueue(queueKey, environmentId);
-    }
+    if (messages.length === 10) {
+      await this.worker.enqueueOnce({
+        id: queueKey,
+        job: "processQueueForWorkerQueue",
+        payload: { queueKey, environmentId },
+        availableAt: new Date(Date.now() + 100),
+      });
+    }

1579-1605: Blocking pop cleanup can run twice; use once-guard and try/finally

Abort listener + post-BLPOP cleanup can both fire. Ensure cleanup runs exactly once and listener is always removed.

-      const blockingClient = this.#createBlockingDequeueClient();
-
-      async function cleanup() {
-        await blockingClient.quit();
-      }
-
-      this.abortController.signal.addEventListener("abort", cleanup);
-
-      const result = await this.#trace("popMessageFromWorkerQueue", async (span) => {
+      const blockingClient = this.#createBlockingDequeueClient();
+      let cleaned = false;
+      const cleanup = async () => {
+        if (cleaned) return;
+        cleaned = true;
+        try { await blockingClient.quit(); } catch {}
+      };
+      const onAbort = () => void cleanup();
+      this.abortController.signal.addEventListener("abort", onAbort);
+      let result: [string, string] | null = null;
+      try {
+        result = await this.#trace("popMessageFromWorkerQueue", async (span) => {
           span.setAttributes({
             workerQueue,
             workerQueueKey,
             blockingPopTimeoutSeconds,
             blocking: true,
           });
-
-        return await blockingClient.blpop(workerQueueKey, blockingPopTimeoutSeconds);
-      });
-
-      this.abortController.signal.removeEventListener("abort", cleanup);
-
-      cleanup().then(() => {
-        this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
-          service: this.name,
-        });
-      });
+          return await blockingClient.blpop(workerQueueKey, blockingPopTimeoutSeconds);
+        });
+      } finally {
+        this.abortController.signal.removeEventListener("abort", onAbort);
+        cleanup().then(() => {
+          this.logger.debug("dequeueMessageFromWorkerQueue cleanup", { service: this.name });
+        });
+      }

Also applies to: 1615-1631, 1634-1641

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (2)

166-195: Never log the managed secret; sanitize headers and remove secret values from logs

Current logs include managedWorkerSecret on length/mismatch. Exclude it from all logs and from sanitizeHeaders by default.

-    if (!managedWorkerSecret) {
-      logger.error("[WorkerGroupTokenService] Managed secret not found in request", {
-        headers: this.sanitizeHeaders(request),
-      });
+    if (!managedWorkerSecret) {
+      logger.error("[WorkerGroupTokenService] Managed secret not found in request", {
+        headers: this.sanitizeHeaders(request, ["authorization", WORKER_HEADERS.MANAGED_SECRET]),
+      });
       return;
     }
...
-    if (a.byteLength !== b.byteLength) {
-      logger.error("[WorkerGroupTokenService] Managed secret length mismatch", {
-        managedWorkerSecret,
-        headers: this.sanitizeHeaders(request),
-      });
+    if (a.byteLength !== b.byteLength) {
+      logger.error("[WorkerGroupTokenService] Managed secret length mismatch", {
+        headers: this.sanitizeHeaders(request, ["authorization", WORKER_HEADERS.MANAGED_SECRET]),
+      });
       return;
     }
 
-    if (!timingSafeEqual(a, b)) {
-      logger.error("[WorkerGroupTokenService] Managed secret mismatch", {
-        managedWorkerSecret,
-        headers: this.sanitizeHeaders(request),
-      });
+    if (!timingSafeEqual(a, b)) {
+      logger.error("[WorkerGroupTokenService] Managed secret mismatch", {
+        headers: this.sanitizeHeaders(request, ["authorization", WORKER_HEADERS.MANAGED_SECRET]),
+      });
       return;
     }

Additionally, update sanitizeHeaders default (see below).


196-201: Cache key must not embed raw token

Use a stable token hash in the key to avoid leaking secrets via metrics/logs.

-    const cacheKey = ["worker-group-token", token, instanceName];
+    const cacheKey = ["worker-group-token", await this.hashToken(token), instanceName];
🧹 Nitpick comments (3)
internal-packages/run-engine/src/engine/types.ts (1)

42-44: Document units and intended bounds for new queue options

Add brief JSDoc noting ms units and expected min values (e.g., period ≥100ms, thresholds/counts ≥1) to prevent misconfiguration.

internal-packages/run-engine/src/run-queue/index.ts (2)

73-76: New options look good; add runtime clamps where used

The schema allows undefined; ensure call sites clamp to safe mins (see suggestions below on consumer count and cooloff).


2316-2334: Non-blocking worker-queue pop: API and Lua shape match the call sites

Looks correct; consider renaming Lua local “messageId” to “messageKey” to avoid confusion with the runId.

Also applies to: 2649-2653

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 6dd5d2f and 3f96ae0.

📒 Files selected for processing (6)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/v3/runEngine.server.ts (1 hunks)
  • apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (8 hunks)
  • internal-packages/run-engine/src/engine/locking.ts (2 hunks)
  • internal-packages/run-engine/src/engine/types.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/index.ts (13 hunks)
✅ Files skipped from review due to trivial changes (1)
  • internal-packages/run-engine/src/engine/locking.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • internal-packages/run-engine/src/engine/types.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
  • internal-packages/run-engine/src/run-queue/index.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • apps/webapp/app/v3/runEngine.server.ts
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: In the webapp, all environment variables must be accessed through the env export of env.server.ts, instead of directly accessing process.env.
When importing from @trigger.dev/core in the webapp, never import from the root @trigger.dev/core path; always use one of the subpath exports as defined in the package's package.json.

Files:

  • apps/webapp/app/v3/runEngine.server.ts
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
🧬 Code graph analysis (2)
apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (4)
apps/webapp/app/services/platform.v3.server.ts (1)
  • singleton (116-116)
internal-packages/cache/src/index.ts (4)
  • createCache (2-2)
  • Namespace (4-4)
  • DefaultStatefulContext (3-3)
  • MemoryStore (9-9)
apps/webapp/app/env.server.ts (1)
  • env (1084-1084)
apps/webapp/app/v3/machinePresets.server.ts (1)
  • machinePresetFromName (27-32)
internal-packages/run-engine/src/run-queue/index.ts (4)
apps/webapp/app/v3/marqs/index.server.ts (14)
  • options (1311-1324)
  • consumerId (759-817)
  • consumerId (822-992)
  • queueKey (994-1050)
  • messageQueue (1671-1743)
  • envConcurrencyLimitKey (1955-1966)
  • messageId (1261-1265)
  • messageId (1917-1953)
  • message (540-546)
  • message (548-554)
  • message (1255-1259)
  • message (1469-1669)
  • message (1745-1813)
  • message (1815-1874)
packages/core/src/v3/schemas/runEngine.ts (2)
  • DequeuedMessage (234-272)
  • DequeuedMessage (273-273)
internal-packages/run-engine/src/run-queue/keyProducer.ts (7)
  • queueKey (99-125)
  • queueConcurrencyLimitKey (45-47)
  • queueCurrentConcurrencyKey (157-165)
  • envConcurrencyLimitKey (51-67)
  • envCurrentConcurrencyKey (199-217)
  • envQueueKey (127-129)
  • workerQueueKey (41-43)
internal-packages/run-engine/src/run-queue/types.ts (2)
  • OutputPayload (31-31)
  • OutputPayload (33-33)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
internal-packages/run-engine/src/run-queue/index.ts (2)

103-104: Propagating workerQueueLength is a useful telemetry addition

No issues; aligns with core schema changes.


1454-1555: Trace/log enrichment for dequeue-from-queue looks solid

The additional attributes and structured parsing are clear and low-risk.

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (3)

371-377: runnerId propagation looks correct

Passing runnerId through to engine methods matches the new API. No changes needed.


433-441: Environment lookup and env var resolution post-startRunAttempt is sound

Matches engine payload; handles optional parentEnvironment.


533-541: toJSON always returns MANAGED while the response type still includes UNMANAGED

If UNMANAGED is truly obsolete, narrow the union; otherwise preserve legacy shape for unmanaged callers.

Do you still return UNMANAGED anywhere? If not, I can submit a follow-up to tighten WorkerGroupTokenAuthenticationResponse and remove dead fields.

Also applies to: 571-588

Comment on lines +70 to 73
masterQueueCooloffPeriodMs: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS,
masterQueueCooloffCountThreshold: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD,
masterQueueConsumerDequeueCount: env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT,
concurrencySweeper: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Defensive clamping of queue controls at runtime

Add guards so a bad env can’t zero-out dequeues or set a 0ms cooloff.

-      masterQueueCooloffPeriodMs: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS,
-      masterQueueCooloffCountThreshold: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD,
-      masterQueueConsumerDequeueCount: env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT,
+      masterQueueCooloffPeriodMs: Math.max(100, env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS),
+      masterQueueCooloffCountThreshold: Math.max(1, env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD),
+      masterQueueConsumerDequeueCount: Math.max(1, env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
masterQueueCooloffPeriodMs: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS,
masterQueueCooloffCountThreshold: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD,
masterQueueConsumerDequeueCount: env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT,
concurrencySweeper: {
masterQueueCooloffPeriodMs: Math.max(100, env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS),
masterQueueCooloffCountThreshold: Math.max(1, env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD),
masterQueueConsumerDequeueCount: Math.max(1, env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT),
concurrencySweeper: {
🤖 Prompt for AI Agents
In apps/webapp/app/v3/runEngine.server.ts around lines 70-73, the runtime
options taken directly from env can be set to zero or invalid values; parse the
relevant env vars to numbers, apply sane minimums and fallbacks, and clamp them
before assigning (e.g., masterQueueCooloffPeriodMs = Math.max(parsedValue ||
DEFAULT_MS, MIN_MS), masterQueueCooloffCountThreshold = Math.max(parsedValue ||
DEFAULT_THRESHOLD, 1), masterQueueConsumerDequeueCount = Math.max(parsedValue ||
DEFAULT_DEQUEUE_COUNT, 1)); ensure non-numeric inputs fall back to defaults and
that cooloff ms cannot be 0 (use a small positive min like 1 or a more
conservative min such as 50ms).

Comment on lines +283 to 306
// Gracefully handle race conditions when connecting for the first time
if (error instanceof Prisma.PrismaClientKnownRequestError) {
// Unique constraint violation
if (error.code === "P2002") {
try {
const existingWorkerInstance = await this._prisma.workerInstance.findFirst({
where: {
workerGroupId: workerGroup.id,
resourceIdentifier,
},
include: {
deployment: true,
environment: true,
},
});

if (workerGroup.type === WorkerInstanceGroupType.MANAGED) {
if (deploymentId) {
logger.warn(
"[WorkerGroupTokenService] Shared worker group instances should not authenticate with a deployment ID",
{
return existingWorkerInstance;
} catch (error) {
logger.error("[WorkerGroupTokenService] Failed to find worker instance", {
workerGroup,
workerInstance,
deploymentId,
}
);
}

try {
const newWorkerInstance = await tx.workerInstance.create({
data: {
workerGroupId: workerGroup.id,
name: instanceName,
resourceIdentifier,
},
include: {
// This will always be empty for shared worker instances, but required for types
deployment: true,
environment: true,
},
});
return newWorkerInstance;
} catch (error) {
// Gracefully handle race conditions when connecting for the first time
if (error instanceof Prisma.PrismaClientKnownRequestError) {
// Unique constraint violation
if (error.code === "P2002") {
try {
const existingWorkerInstance = await tx.workerInstance.findFirst({
where: {
workerGroupId: workerGroup.id,
resourceIdentifier,
},
include: {
deployment: true,
environment: true,
},
});
return existingWorkerInstance;
} catch (error) {
logger.error("[WorkerGroupTokenService] Failed to find worker instance", {
workerGroup,
workerInstance,
deploymentId,
});
return;
}
}
});
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix logging of undefined variable in catch block

workerInstance is not defined in this scope; logging it will throw and mask the original error.

-            logger.error("[WorkerGroupTokenService] Failed to find worker instance", {
-              workerGroup,
-              workerInstance,
-            });
+            logger.error("[WorkerGroupTokenService] Failed to find worker instance", {
+              workerGroupId: workerGroup.id,
+              instanceName,
+              resourceIdentifier,
+            });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Gracefully handle race conditions when connecting for the first time
if (error instanceof Prisma.PrismaClientKnownRequestError) {
// Unique constraint violation
if (error.code === "P2002") {
try {
const existingWorkerInstance = await this._prisma.workerInstance.findFirst({
where: {
workerGroupId: workerGroup.id,
resourceIdentifier,
},
include: {
deployment: true,
environment: true,
},
});
if (workerGroup.type === WorkerInstanceGroupType.MANAGED) {
if (deploymentId) {
logger.warn(
"[WorkerGroupTokenService] Shared worker group instances should not authenticate with a deployment ID",
{
return existingWorkerInstance;
} catch (error) {
logger.error("[WorkerGroupTokenService] Failed to find worker instance", {
workerGroup,
workerInstance,
deploymentId,
}
);
}
try {
const newWorkerInstance = await tx.workerInstance.create({
data: {
workerGroupId: workerGroup.id,
name: instanceName,
resourceIdentifier,
},
include: {
// This will always be empty for shared worker instances, but required for types
deployment: true,
environment: true,
},
});
return newWorkerInstance;
} catch (error) {
// Gracefully handle race conditions when connecting for the first time
if (error instanceof Prisma.PrismaClientKnownRequestError) {
// Unique constraint violation
if (error.code === "P2002") {
try {
const existingWorkerInstance = await tx.workerInstance.findFirst({
where: {
workerGroupId: workerGroup.id,
resourceIdentifier,
},
include: {
deployment: true,
environment: true,
},
});
return existingWorkerInstance;
} catch (error) {
logger.error("[WorkerGroupTokenService] Failed to find worker instance", {
workerGroup,
workerInstance,
deploymentId,
});
return;
}
}
});
return;
}
// Gracefully handle race conditions when connecting for the first time
if (error instanceof Prisma.PrismaClientKnownRequestError) {
// Unique constraint violation
if (error.code === "P2002") {
try {
const existingWorkerInstance = await this._prisma.workerInstance.findFirst({
where: {
workerGroupId: workerGroup.id,
resourceIdentifier,
},
include: {
deployment: true,
environment: true,
},
});
return existingWorkerInstance;
} catch (error) {
logger.error("[WorkerGroupTokenService] Failed to find worker instance", {
workerGroupId: workerGroup.id,
instanceName,
resourceIdentifier,
});
return;
}
🤖 Prompt for AI Agents
In apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts around
lines 283 to 306, the catch block logs an undefined variable workerInstance
which will throw and hide the original error; update the logger call to not
reference workerInstance and instead include the caught error and relevant
context (e.g., workerGroup, resourceIdentifier or the fetched
existingWorkerInstance if available), for example pass the caught error object
and resourceIdentifier in the log message so the original error is preserved and
no undefined variable is referenced.

@matt-aitken matt-aitken merged commit 82401ec into main Aug 27, 2025
31 checks passed
@matt-aitken matt-aitken deleted the fix/v4-dequeue-performance-1 branch August 27, 2025 20:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants