-
-
Notifications
You must be signed in to change notification settings - Fork 838
feat(server): add two admin endpoints for queue and environment concurrency debugging and repairing #2559
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…rrency debugging and repairing
|
WalkthroughThe change set adds two Remix admin API routes: one action for environment/queue repairs and one loader for generating an environment engine report. Both authenticate via personal access tokens, enforce admin access, validate environmentId, reject V1 engines, and operate on V2 queues. The repair route parses dryRun and queue filters, lists eligible queues, runs a dry-run environment repair, and repairs queues in parallel. The report route supports pagination flags and verbose mode, fetches queues, and returns an engine-generated report. The run-engine gains repair and reporting methods plus analysis helpers and a private queue report generator. Run-queue adds concurrency/read accessor methods. A new ReportableQueue type is introduced, and p-map is added as a dependency. Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (18)
internal-packages/run-engine/src/run-queue/index.ts (5)
358-366
: Add explicit return type for clarityAvoid implicit any/union drift across callers. Return type here is number.
- public async getEnvConcurrencyBurstFactor(env: MinimalAuthenticatedEnvironment) { + public async getEnvConcurrencyBurstFactor( + env: MinimalAuthenticatedEnvironment + ): Promise<number> {
368-370
: Type the SMEMBERS returnSMEMBERS returns string[]. Make it explicit to prevent accidental Set usage downstream.
- public async getCurrentConcurrencyOfEnvironment(env: MinimalAuthenticatedEnvironment) { + public async getCurrentConcurrencyOfEnvironment( + env: MinimalAuthenticatedEnvironment + ): Promise<string[]> {
372-374
: Type the SMEMBERS returnSame as above for queue scope.
- public async getCurrentConcurrencyOfQueue(env: MinimalAuthenticatedEnvironment, queue: string) { + public async getCurrentConcurrencyOfQueue( + env: MinimalAuthenticatedEnvironment, + queue: string + ): Promise<string[]> {
384-395
: Minor: accept timestamp to avoid Date allocations in hot pathsNot urgent, but this helper may be used in loops. Accept a number to reduce allocations and string conversions.
- public async lengthOfQueueAvailableMessages( - env: MinimalAuthenticatedEnvironment, - queue: string, - currentTime: Date = new Date(), - concurrencyKey?: string - ) { - return this.redis.zcount( - this.keys.queueKey(env, queue, concurrencyKey), - "-inf", - String(currentTime.getTime()) - ); - } + public async lengthOfQueueAvailableMessages( + env: MinimalAuthenticatedEnvironment, + queue: string, + currentTimeMs: number = Date.now(), + concurrencyKey?: string + ): Promise<number> { + return this.redis.zcount(this.keys.queueKey(env, queue, concurrencyKey), "-inf", String(currentTimeMs)); + }
453-459
: Naming consistency: dequeued vs concurrencyGood addition. Consider documenting that “currentDequeued” is the user-facing metric whereas “currentConcurrency” is operational, to avoid confusion when both are exposed.
internal-packages/run-engine/src/engine/index.ts (4)
64-64
: p-map default error behavior can abort whole repairBy default, p-map rejects on the first error (stopOnError: true). For admin repair flows, best-effort is preferable.
- await pMap( + await pMap( completedRuns, async (run) => { await this.runQueue.acknowledgeMessage(run.orgId, run.id, { skipDequeueProcessing: true, removeFromWorkerQueue: false, }); }, - { concurrency: 5 } + { concurrency: 5, stopOnError: false } );Repeat the same change in repairQueue.
1172-1209
: Repair environment: swallow-and-log per-ack errors to avoid aborting allEven with stopOnError: false, unexpected throw inside the mapper would still reject. Wrap the ack call to log and continue.
- await pMap( + await pMap( completedRuns, async (run) => { - await this.runQueue.acknowledgeMessage(run.orgId, run.id, { - skipDequeueProcessing: true, - removeFromWorkerQueue: false, - }); + try { + await this.runQueue.acknowledgeMessage(run.orgId, run.id, { + skipDequeueProcessing: true, + removeFromWorkerQueue: false, // consider making this configurable for deep cleanup + }); + } catch (err) { + this.logger.error("repairEnvironment acknowledge failed", { + runId: run.id, + orgId: run.orgId, + error: err, + }); + } }, - { concurrency: 5 } + { concurrency: 5, stopOnError: false } );
1211-1251
: Repair queue: same best-effort pattern and error handlingMirror the defensive wrapping here too.
- await pMap( + await pMap( completedRuns, async (run) => { - await this.runQueue.acknowledgeMessage(run.orgId, run.id, { - skipDequeueProcessing: true, - removeFromWorkerQueue: false, - }); + try { + await this.runQueue.acknowledgeMessage(run.orgId, run.id, { + skipDequeueProcessing: true, + removeFromWorkerQueue: false, + }); + } catch (err) { + this.logger.error("repairQueue acknowledge failed", { + queue, + runId: run.id, + orgId: run.orgId, + error: err, + }); + } }, - { concurrency: 5 } + { concurrency: 5, stopOnError: false } );
1646-1654
: Outdated inline commentThe comment hardcodes “10 minutes” while completedAtOffsetMs is parameterized and is passed as 5000ms by the repair methods.
- lte: new Date(Date.now() - completedAtOffsetMs), // This only finds runs that were completed more than 10 minutes ago + lte: new Date(Date.now() - completedAtOffsetMs), // Only runs completed earlier than completedAtOffsetMs agoapps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts (4)
12-16
: Validate and normalize query params (booleans/limits) to avoid negative skip/takePrevent negative pagination and support common truthy values for verbose.
-const SearchParamsSchema = z.object({ - verbose: z.string().default("0"), - page: z.coerce.number().optional(), - per_page: z.coerce.number().optional(), -}); +const SearchParamsSchema = z.object({ + verbose: z + .string() + .optional() + .transform((v) => v === "1" || v === "true"), + page: z.coerce.number().int().min(1).default(1), + per_page: z.coerce.number().int().min(1).max(500).default(50), +});
66-68
: Use sanitized defaults directlyEarlier schema change sets safe defaults and bounds.
- const page = searchParams.page ?? 1; - const perPage = searchParams.per_page ?? 50; + const page = searchParams.page; + const perPage = searchParams.per_page;
88-92
: Pass boolean verboseAlign with normalized schema.
- const report = await engine.generateEnvironmentReport( - environment, - queues, - searchParams.verbose === "1" - ); + const report = await engine.generateEnvironmentReport(environment, queues, !!searchParams.verbose);
57-61
: determineEngineVersion might throw on missing worker versionConsider wrapping with try/catch and returning a 400 with the message to aid debugging, rather than a 500.
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts (5)
86-92
: Make per-queue repairs resilient and abortable; capture errors and pass request.signal.Today, a single rejection aborts the whole batch or bubbles a 500. Capture per-queue errors and tie work to the request’s abort signal.
Apply this diff:
- const repairResults = await pMap( - queues, - async (queue) => { - return engine.repairQueue(environment, queue.name, parsedBody.dryRun); - }, - { concurrency: 5 } - ); + const repairResults = await pMap( + queues, + async (queue) => { + try { + const result = await engine.repairQueue(environment, queue.name, parsedBody.dryRun); + return { queue: queue.name, result }; + } catch (err) { + return { + queue: queue.name, + error: err instanceof Error ? err.message : String(err), + }; + } + }, + { concurrency: 5, signal: request.signal } + );
84-85
: Don’t fail the whole request if environment-level repair throws; return a structured error.Keep the endpoint useful even if the environment repair step fails.
Apply this diff:
- const repairEnvironmentResults = await engine.repairEnvironment(environment, parsedBody.dryRun); + let environmentResult: unknown; + try { + environmentResult = await engine.repairEnvironment(environment, parsedBody.dryRun); + } catch (err) { + environmentResult = { error: err instanceof Error ? err.message : String(err) }; + } @@ - return json({ environment: repairEnvironmentResults, queues: repairResults }); + return json({ environment: environmentResult, queues: repairResults }); }Also applies to: 94-95
46-51
: Avoid fetching unused relations on environment (trim include).
organization
andorgMember
aren’t used. Reduce payload/CPU.Apply this diff:
include: { - organization: true, project: true, - orgMember: true, },
72-78
: Only select queue fields you actually use.You only need
name
forrepairQueue
; drop the rest to reduce transfer.Apply this diff:
- select: { - friendlyId: true, - name: true, - concurrencyLimit: true, - type: true, - paused: true, - }, + select: { name: true },
66-82
: Optional: surface unknown queue names to the caller.If the request specifies queues that don’t exist, they’re silently ignored. Consider reporting a
notFound
list alongsidequeues
so operators can catch typos.Would you like me to add a small diff that computes
requestedButMissing = parsedBody.queues.filter(q => !queues.some(x => x.name === q))
and returns it in the response?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (6)
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
(1 hunks)apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts
(1 hunks)internal-packages/run-engine/package.json
(1 hunks)internal-packages/run-engine/src/engine/index.ts
(5 hunks)internal-packages/run-engine/src/engine/types.ts
(1 hunks)internal-packages/run-engine/src/run-queue/index.ts
(4 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
internal-packages/run-engine/src/run-queue/index.ts
internal-packages/run-engine/src/engine/types.ts
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts
internal-packages/run-engine/src/engine/index.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts
🧠 Learnings (2)
📚 Learning: 2025-08-29T10:06:49.293Z
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-08-29T10:06:49.293Z
Learning: Prefer Run Engine 2.0 via internal/run-engine; avoid extending legacy run engine code
Applied to files:
internal-packages/run-engine/src/engine/index.ts
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
PR: triggerdotdev/trigger.dev#2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
Applied to files:
internal-packages/run-engine/src/engine/index.ts
🧬 Code graph analysis (4)
internal-packages/run-engine/src/run-queue/index.ts (2)
internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts (5)
env
(501-521)env
(560-578)env
(580-594)env
(596-616)queue
(618-624)internal-packages/run-engine/src/shared/index.ts (1)
MinimalAuthenticatedEnvironment
(8-19)
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts (4)
apps/webapp/app/services/personalAccessToken.server.ts (1)
authenticateApiRequestWithPersonalAccessToken
(105-114)apps/webapp/app/v3/engineVersion.server.ts (1)
determineEngineVersion
(17-76)apps/webapp/app/db.server.ts (1)
$replica
(103-106)apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts (5)
apps/webapp/app/services/personalAccessToken.server.ts (1)
authenticateApiRequestWithPersonalAccessToken
(105-114)internal-packages/run-engine/src/engine/index.ts (1)
environment
(1316-1370)apps/webapp/app/v3/engineVersion.server.ts (1)
determineEngineVersion
(17-76)apps/webapp/app/db.server.ts (1)
$replica
(103-106)apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)
internal-packages/run-engine/src/engine/index.ts (3)
internal-packages/run-engine/src/index.ts (1)
AuthenticatedEnvironment
(8-8)internal-packages/run-engine/src/shared/index.ts (1)
AuthenticatedEnvironment
(4-6)internal-packages/run-engine/src/engine/types.ts (1)
ReportableQueue
(154-160)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (9)
internal-packages/run-engine/src/engine/types.ts (1)
153-160
: New public type looks goodReportableQueue fields align with usage in the report generator.
internal-packages/run-engine/src/run-queue/index.ts (1)
544-551
: LGTM: operational metric accessorThis mirrors envCurrentConcurrencyKey and complements the existing displayed metric.
internal-packages/run-engine/src/engine/index.ts (6)
33-33
: Broadened shared types import is fineAuthenticatedEnvironment is required by new methods. No concerns.
56-62
: Public API surface addition is coherentUsing ReportableQueue from types keeps the report endpoint strongly typed.
1253-1314
: Env report: solid snapshot of both displayed and operational metricsGood coverage (limits, burst, current sets) and optional key exposure under verbose. No issues.
1316-1370
: Queue report: aligns with Lua semantics and provides actionable reasonsEffective limit = min(queueLimit || 1e6, envLimit) mirrors script; due messages count via ZCOUNT is correct. LGTM.
1703-1722
: Env analysis: concise and sufficientComputation matches queue logic (burst applied). Looks good.
1734-1788
: Queue analysis mirrors dequeue script accuratelyEffective limit capping, capacity checks, and optional dueCount reasoning are consistent. LGTM.
internal-packages/run-engine/package.json (1)
33-35
: No action needed: p-map v6 is compatible with Node ≥16
CI workflows use Node 20.11.1 (≥16), satisfying p-map’s engine requirement.
const parsedParams = ParamsSchema.parse(params); | ||
|
||
const environment = await prisma.runtimeEnvironment.findFirst({ | ||
where: { | ||
id: parsedParams.environmentId, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return 400 on invalid route params instead of throwing (use safeParse).
z.parse
will throw and produce a 500 on bad/missing params. Prefer safeParse
and return a 400 with issues.
Apply this diff:
- const parsedParams = ParamsSchema.parse(params);
+ const parsedParamsResult = ParamsSchema.safeParse(params);
+ if (!parsedParamsResult.success) {
+ return json(
+ { error: "Invalid params", issues: parsedParamsResult.error.flatten() },
+ { status: 400 }
+ );
+ }
+ const parsedParams = parsedParamsResult.data;
const environment = await prisma.runtimeEnvironment.findFirst({
where: {
id: parsedParams.environmentId,
},
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const parsedParams = ParamsSchema.parse(params); | |
const environment = await prisma.runtimeEnvironment.findFirst({ | |
where: { | |
id: parsedParams.environmentId, | |
}, | |
const parsedParamsResult = ParamsSchema.safeParse(params); | |
if (!parsedParamsResult.success) { | |
return json( | |
{ error: "Invalid params", issues: parsedParamsResult.error.flatten() }, | |
{ status: 400 } | |
); | |
} | |
const parsedParams = parsedParamsResult.data; | |
const environment = await prisma.runtimeEnvironment.findFirst({ | |
where: { | |
id: parsedParams.environmentId, | |
}, |
🤖 Prompt for AI Agents
In
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
around lines 40 to 45, the code currently uses ParamsSchema.parse(params) which
will throw on invalid/missing params and surface as a 500; change this to
ParamsSchema.safeParse(params), check the result, and if not successful return a
400 response containing the validation issues (e.g., res.status(400).json({
error: "Invalid route params", issues: result.error.issues })), otherwise
proceed using result.data as parsedParams before calling prisma.
const body = await request.json(); | ||
const parsedBody = BodySchema.parse(body); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard JSON parsing and validate body with safeParse; avoid 500s on bad/missing JSON.
request.json()
throws on empty/invalid JSON, and z.parse
throws; both yield 500. Handle both and return 400 with validation details.
Apply this diff:
- const body = await request.json();
- const parsedBody = BodySchema.parse(body);
+ let body: unknown;
+ try {
+ body = await request.json();
+ } catch {
+ body = {};
+ }
+ const parsedBodyResult = BodySchema.safeParse(body);
+ if (!parsedBodyResult.success) {
+ return json(
+ { error: "Invalid request body", issues: parsedBodyResult.error.flatten() },
+ { status: 400 }
+ );
+ }
+ const parsedBody = parsedBodyResult.data;
…rrency debugging and repairing (#2559)
No description provided.