-
-
Notifications
You must be signed in to change notification settings - Fork 852
v4: dequeue performance improvements (split concurrency from dequeue) #2127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
WalkthroughThis update introduces a comprehensive overhaul of the queueing and metrics systems across the run-engine and webapp components. The core queue architecture transitions from a "master queue" model to a sharded "worker queue" model, with associated renaming of types, fields, and method signatures throughout the codebase and database schema. Sharding is implemented using a jump consistent hash function, and new key generation utilities are provided for managing master and worker queues. The RunQueue class and its Redis-backed implementation now support sharded master queues and dedicated worker queues, with new methods for enqueueing, dequeueing, acknowledging, and migrating messages. The dequeue API shifts to a blocking pop from worker queues, and master queue consumers are now sharded and can be disabled or debounced via configuration. OpenTelemetry metrics are integrated throughout, replacing Prometheus metrics in the redis-worker package and adding comprehensive gauges and histograms for queue and lock states. The webapp and run-engine now initialize and export a metrics meter alongside tracing and logging, and new environment variables allow configuration of metrics exporters and queue processing parameters. The logger gains a new "verbose" level, and a new idempotent enqueue method ( ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 18
🔭 Outside diff range comments (2)
internal-packages/run-engine/src/engine/tests/dequeuing.test.ts (1)
88-89
: 💡 Verification agent🧩 Analysis chain
Address the failing test for machine constraints.
The test is marked as
.fails()
with a comment indicating that dequeuing multiple runs from a single environment within machine constraints isn't yet supported.Verify if this limitation is intentional for the current phase or if it needs to be implemented:
Do you want me to help create an issue to track the implementation of machine constraint-aware dequeuing?
🏁 Script executed:
#!/bin/bash # Check if there are related issues or TODOs about machine constraint dequeuing rg -i "machine.*constraint\|dequeue.*multiple\|fails.*machine" --type tsLength of output: 74
I couldn’t find any references to “machine constraints” or helpers around this test, so let’s pull in the test file itself and inspect for any comments or TODOs that explain why it’s marked to fail:
🏁 Script executed:
#!/bin/bash # Locate the dequeuing.test.ts file and show context around the failing test TEST_FILE=$(fd dequeuing.test.ts) if [ -n "$TEST_FILE" ]; then echo "Found test file: $TEST_FILE" echo -e "\nContext around the ‘fails’ case:" rg -n "Dequeues runs within machine constraints" -C3 "$TEST_FILE" echo -e "\nAny TODO/FIXME in this file?" rg -n "TODO\|FIXME" "$TEST_FILE" else echo "❗️ dequeuing.test.ts not found" fiLength of output: 1026
Feature Pending: Support for Dequeuing Multiple Runs Under Machine Constraints
The failing test in
internal-packages/run-engine/src/engine/tests/dequeuing.test.ts
(lines 86–89) is intentional:// This will fail until we support dequeuing multiple runs from a single environment containerTest.fails( "Dequeues runs within machine constraints", … );No TODO or issue exists to track this work. Please open an issue to implement machine-constraint-aware dequeuing so we can remove the
.fails()
marker once it’s supported. Let me know if you’d like help drafting that issue.apps/webapp/app/routes/engine.v1.worker-actions.deployments.$deploymentFriendlyId.dequeue.ts (1)
6-18
: 🛠️ Refactor suggestionConsider a more gradual deprecation approach for backwards compatibility.
While the simplification aligns with the master queue to worker queue refactoring, returning an empty array for all requests could break existing clients that depend on this endpoint. Consider implementing a more graceful deprecation strategy.
Suggested approach:
+// Keep this route for backwards compatibility +// TODO: Remove after migration to worker queue API is complete export const loader = createLoaderWorkerApiRoute( { params: z.object({ deploymentFriendlyId: z.string(), }), searchParams: z.object({ maxRunCount: z.coerce.number().optional(), }), }, - async (): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => { + async (_, { request }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => { + // Log usage for migration tracking + console.warn(`Deprecated endpoint called: ${request.url}. Please migrate to worker queue API.`); return json([]); } );This approach provides visibility into continued usage and helps track migration progress.
♻️ Duplicate comments (3)
apps/docker-provider/tsconfig.json (1)
3-3
: Duplicate: TS target bump
Same update as inapps/coordinator/tsconfig.json
.apps/kubernetes-provider/tsconfig.json (1)
3-3
: Duplicate: TS target bump
Consistent with other app-level tsconfig updates.internal-packages/run-engine/src/engine/tests/trigger.test.ts (1)
112-113
: Same timing dependency concerns as in cancelling tests.The 500ms delays before dequeue operations create the same potential for test flakiness and slower test execution. Consider implementing a more deterministic synchronization mechanism.
Also applies to: 275-276
🧹 Nitpick comments (28)
internal-packages/run-engine/src/engine/tests/cancelling.test.ts (2)
78-78
: Consider the impact of hardcoded delays on test reliability.The addition of 500ms delays before dequeue operations suggests timing dependencies in the new worker queue processing. While these delays may be necessary for the current implementation, they could make tests slower and potentially flaky if the timing assumptions change.
Consider implementing a more robust waiting mechanism, such as polling for queue readiness or using event-driven synchronization instead of fixed delays:
-await setTimeout(500); -const dequeued = await engine.dequeueFromWorkerQueue({ +// Wait for queue processing to complete +const dequeued = await engine.dequeueFromWorkerQueue({Also applies to: 115-115, 275-275, 293-293
69-69
: Verify the hardcoded "main" worker queue value.All worker queue references use the hardcoded value "main". Ensure this aligns with the intended queue naming strategy and consider whether this should be configurable for different test scenarios.
Also applies to: 81-81, 104-104, 118-118, 284-284, 296-296
internal-packages/run-engine/src/engine/tests/waitpoints.test.ts (1)
17-44
: Consider adding queue configuration options for consistency.While the
workerQueue
changes are correctly applied, this test file is missing the queue configuration options (masterQueueConsumersDisabled: true
andprocessWorkerQueueDebounceMs: 50
) that are present in the other test files. This inconsistency might lead to different queue behavior across test suites.Consider adding the queue configuration for consistency:
queue: { redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, },internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts (1)
121-121
: Consider making the timeout configurable or reducing it.The 500ms delay might be necessary for queue processing stabilization, but it could make tests slower than needed.
Consider using the debounce value from the configuration or a smaller timeout:
- await setTimeout(500); + await setTimeout(testOptions.processWorkerQueueDebounceMs + 50);internal-packages/run-engine/src/run-queue/tests/ack.test.ts (1)
76-76
: Consider reducing the timeout duration.The 1000ms delay seems excessive for tests and might indicate a need for better synchronization mechanisms or event-driven waiting.
Consider using a shorter timeout or implementing a polling mechanism:
- await setTimeout(1000); + await setTimeout(100); // Shorter delay for faster testsAlso applies to: 144-144
apps/webapp/app/routes/admin.api.v1.migrate-legacy-master-queues.ts (1)
28-36
: Consider adding logging and timeout handling for the migration operation.The migration operation could be long-running and might benefit from:
- Logging the start and completion of migration
- Consider if this should be an async operation with status tracking
- More specific error handling to provide better debugging information
try { + console.log(`Starting legacy master queue migration for admin user ${user.id}`); await engine.migrateLegacyMasterQueues(); + console.log(`Successfully completed legacy master queue migration`); return json({ success: true, }); } catch (error) { + console.error(`Migration failed:`, error); return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); }internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts (1)
109-109
: Consider making the delay configurable or document why 1 second is needed.The 1-second delay before dequeue operations appears to be necessary for queue processing, but this timing might be environment-dependent.
Consider either:
- Making this delay configurable through test options
- Adding a comment explaining why this specific delay is needed
- Using a more deterministic approach to wait for queue readiness
+ // Wait for queue processing to complete await setTimeout(1000);
internal-packages/run-engine/src/engine/tests/dequeuing.test.ts (1)
70-70
: Document timing requirements for queue processing.The 500ms delays before dequeue operations are consistent with other engine tests. Consider documenting why this specific timing is needed.
Add comments to clarify the timing requirements:
+ // Allow time for queue processing and message distribution await setTimeout(500);
Also applies to: 147-147, 158-158
internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts (2)
25-26
: Document the purpose of new queue configuration options.The new configuration options
masterQueueConsumersDisabled: true
andprocessWorkerQueueDebounceMs: 50
lack documentation or comments explaining their purpose in tests.queue: { redis: redisOptions, + // Disable master queue consumers for testing worker queue functionality masterQueueConsumersDisabled: true, + // Reduce debounce time for faster test execution processWorkerQueueDebounceMs: 50, },
75-79
: Consider making test timing more robust.The hard-coded 500ms delay suggests potential race conditions in the queue implementation. This could make tests brittle and slow.
Consider implementing a more robust approach:
-await setTimeout(500); -const dequeued = await engine.dequeueFromWorkerQueue({ - consumerId: "test_12345", - workerQueue: "main", -}); +// Wait for message to be available with retry logic +let dequeued; +let attempts = 0; +const maxAttempts = 10; + +while (attempts < maxAttempts) { + dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + + if (dequeued.length > 0) break; + + await setTimeout(50); + attempts++; +} + +expect(dequeued.length).toBeGreaterThan(0);This approach is more reliable and provides better error messages when timing issues occur.
internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts (2)
81-81
: Consider extracting timing constants for better maintainability.The 1000ms timeout is hardcoded throughout multiple tests. Consider extracting this as a constant for easier maintenance.
+const QUEUE_PROCESSING_DELAY_MS = 1000; + const testOptions = { // ... existing options }; // In tests: -await setTimeout(1000); +await setTimeout(QUEUE_PROCESSING_DELAY_MS);
283-283
: Good practice: Explicit verification of dequeued message content.The explicit check
expect(message.message.runId).toBe(messageProd.runId)
in the last test is a good practice that should be applied consistently across all tests to ensure the correct message is dequeued.Consider adding similar verification to other tests:
const message = await queue.dequeueMessageFromWorkerQueue( "test_12345", authenticatedEnvProd.id ); assertNonNullable(message); +expect(message.message.runId).toBe(messageProd.runId);
packages/core/src/logger.ts (1)
90-94
: Consider using console.debug for verbose level consistency.The verbose method uses
console.log
while the debug method usesconsole.debug
. Consider whether verbose should useconsole.debug
for consistency, since verbose is typically more detailed than debug.verbose(message: string, ...args: Array<Record<string, unknown> | undefined>) { if (this.#level < 5) return; - this.#structuredLog(console.log, message, "verbose", ...args); + this.#structuredLog(console.debug, message, "verbose", ...args); }Alternatively, if the intention is to have verbose messages more visible than debug messages, consider documenting this design decision in the JSDoc comment at the top of the file.
internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts (1)
105-105
: Evaluate if 1000ms delay is appropriate for test stability.The 1000ms delay before dequeue operations seems quite long for tests. Consider if this delay is necessary for queue stability or if it could be reduced for faster test execution.
Consider reducing the delay or implementing a more efficient waiting mechanism:
-await setTimeout(1000); +await setTimeout(100); // Reduced delay +// Or implement queue state pollingAlso applies to: 173-173, 185-185, 236-236
internal-packages/run-engine/src/engine/tests/checkpoints.test.ts (1)
80-80
: Consider reducing timing delays or implementing more robust waiting.Multiple 500ms delays before dequeue operations suggest potential race conditions. Consider if these delays could be reduced or replaced with more deterministic waiting mechanisms.
Consider implementing queue state polling instead of fixed delays:
-await setTimeout(500); +// Wait for queue to be ready +await waitForQueueReady(engine, workerQueue);Or at minimum, reduce delays where possible:
-await setTimeout(500); +await setTimeout(100); // Reduced delay for faster testsAlso applies to: 343-343, 548-548, 851-851, 1082-1082
packages/core/src/v3/serverOnly/jumpHash.ts (1)
14-16
: Make the 8-byte requirement explicit.The
jumpConsistentHash
function expects exactly 8 bytes, but the hash function returns 32 bytes by default. WhilebytesToBigInt
only reads the first 8 bytes, it would be clearer to explicitly request 8 bytes.export function jumpHash(key: string, buckets: number): number { - return jumpConsistentHash(hash(Buffer.from(key)), buckets); + return jumpConsistentHash(hash(Buffer.from(key), 8), buckets); }apps/webapp/app/env.server.ts (1)
474-474
: Consider documenting the impact of debounce timing on performance.The 200ms default debounce interval could significantly impact queue processing latency. Consider adding documentation about the trade-offs between debounce timing and responsiveness.
Apply this diff to add helpful documentation:
+ /** Debounce interval for worker queue processing - lower values increase responsiveness but may increase Redis load */ RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
packages/redis-worker/src/queue.ts (1)
519-537
: Redis Lua script logic is correct but could be more robust.The
enqueueItemOnce
Lua script correctly implements the idempotent enqueue logic usingHSETNX
followed byZADD
withNX
flag. However, consider the edge case where the hash set succeeds but the sorted set addition fails.For additional robustness, consider adding error handling in the Lua script:
this.redis.defineCommand("enqueueItemOnce", { numberOfKeys: 2, lua: ` local queue = KEYS[1] local items = KEYS[2] local id = ARGV[1] local score = ARGV[2] local serializedItem = ARGV[3] -- Only add if not exists local added = redis.call('HSETNX', items, id, serializedItem) if added == 1 then - redis.call('ZADD', queue, 'NX', score, id) + local zaddResult = redis.call('ZADD', queue, 'NX', score, id) + if zaddResult == 0 then + -- Cleanup hash if sorted set add failed + redis.call('HDEL', items, id) + return 0 + end return 1 else return 0 end `, });internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (1)
82-83
: Consider initializing cached values to avoid potential undefined behavior.The cached metric values should be initialized to prevent potential issues with initial metric observations.
- private _lastReleasingsLength: number = 0; - private _lastMasterQueueLength: number = 0; + private _lastReleasingsLength: number = 0; + private _lastMasterQueueLength: number = 0;Actually, the initialization is already correct. This is fine as-is.
internal-packages/run-engine/src/engine/locking.ts (1)
73-73
: Consider using a more descriptive variable name.While the underscore is acceptable for unused variables, consider using a more descriptive name for clarity.
- for (const [_, lockInfo] of this.activeLocks) { + for (const [lockId, lockInfo] of this.activeLocks) {internal-packages/run-engine/src/run-queue/index.test.ts (1)
131-147
: Consider making the timeout duration configurable for tests.The 1-second delay before dequeue operations is necessary for queue processing, but hardcoding the value could make tests brittle. Consider extracting this as a test constant or configuration option.
+const QUEUE_PROCESSING_DELAY_MS = 1000; + const testOptions = { name: "rq", tracer: trace.getTracer("rq"),Then use it throughout the tests:
-await setTimeout(1000); +await setTimeout(QUEUE_PROCESSING_DELAY_MS);internal-packages/testcontainers/src/index.ts (1)
162-163
: Remove or conditionally enable the console.log statementConsole.log statements in library code can clutter test output. Consider removing this line or making it conditional based on a debug flag or log level.
- console.log("Redis options", options); -Alternatively, if logging is needed for debugging:
- console.log("Redis options", options); + if (process.env.DEBUG_REDIS) { + console.log("Redis options", options); + }apps/webapp/app/v3/tracer.server.ts (1)
139-169
: Remove unnecessary async keywords from logging functionsThese functions are marked as
async
but don't perform any asynchronous operations. Theemit
method appears to be synchronous.-export async function emitDebugLog(message: string, params: Record<string, unknown> = {}) { +export function emitDebugLog(message: string, params: Record<string, unknown> = {}) { otelLogger.emit({ severityNumber: SeverityNumber.DEBUG, body: message, attributes: { ...flattenAttributes(params, "params") }, }); } -export async function emitInfoLog(message: string, params: Record<string, unknown> = {}) { +export function emitInfoLog(message: string, params: Record<string, unknown> = {}) { otelLogger.emit({ severityNumber: SeverityNumber.INFO, body: message, attributes: { ...flattenAttributes(params, "params") }, }); } -export async function emitErrorLog(message: string, params: Record<string, unknown> = {}) { +export function emitErrorLog(message: string, params: Record<string, unknown> = {}) { otelLogger.emit({ severityNumber: SeverityNumber.ERROR, body: message, attributes: { ...flattenAttributes(params, "params") }, }); } -export async function emitWarnLog(message: string, params: Record<string, unknown> = {}) { +export function emitWarnLog(message: string, params: Record<string, unknown> = {}) { otelLogger.emit({ severityNumber: SeverityNumber.WARN, body: message, attributes: { ...flattenAttributes(params, "params") }, }); }internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (1)
432-433
: Consider a proper migration strategy for the masterQueue fieldWhile keeping the field name for backwards compatibility is understandable, this approach could lead to confusion. Consider implementing a proper migration strategy.
Consider one of these approaches:
- Add both fields temporarily with a deprecation notice:
masterQueue: lockedTaskRun.workerQueue, // @deprecated Use workerQueue instead workerQueue: lockedTaskRun.workerQueue,
- Use a version-based response format:
// In DequeuedMessage type export type DequeuedMessage = DequeuedMessageV1 | DequeuedMessageV2;
- Document the migration timeline in the codebase and communicate it to consumers.
internal-packages/run-engine/src/engine/index.ts (1)
1098-1130
: Well-structured migration utilityThe
migrateLegacyMasterQueues
method provides a clean way to migrate from the old master queue system. The logging is comprehensive and will help track migration progress.Consider adding error handling for individual worker group migrations to ensure one failure doesn't stop the entire migration process.
for (const workerGroup of workerGroups) { - this.logger.info("Migrating legacy master queue", { - workerGroupId: workerGroup.id, - workerGroupName: workerGroup.name, - workerGroupMasterQueue: workerGroup.masterQueue, - }); - - await this.runQueue.migrateLegacyMasterQueue(workerGroup.masterQueue); - - this.logger.info("Migrated legacy master queue", { - workerGroupId: workerGroup.id, - workerGroupName: workerGroup.name, - workerGroupMasterQueue: workerGroup.masterQueue, - }); + try { + this.logger.info("Migrating legacy master queue", { + workerGroupId: workerGroup.id, + workerGroupName: workerGroup.name, + workerGroupMasterQueue: workerGroup.masterQueue, + }); + + await this.runQueue.migrateLegacyMasterQueue(workerGroup.masterQueue); + + this.logger.info("Migrated legacy master queue", { + workerGroupId: workerGroup.id, + workerGroupName: workerGroup.name, + workerGroupMasterQueue: workerGroup.masterQueue, + }); + } catch (error) { + this.logger.error("Failed to migrate legacy master queue", { + workerGroupId: workerGroup.id, + workerGroupName: workerGroup.name, + workerGroupMasterQueue: workerGroup.masterQueue, + error, + }); + } }internal-packages/run-engine/src/run-queue/index.ts (3)
997-997
: Consider making the master queue polling interval configurableThe 500ms polling interval for master queue consumers is hardcoded. Consider making this configurable through
RunQueueOptions
to allow tuning based on workload characteristics.- for await (const _ of setInterval(500, null, { signal: this.abortController.signal })) { + const pollInterval = this.options.masterQueuePollIntervalMs ?? 500; + for await (const _ of setInterval(pollInterval, null, { signal: this.abortController.signal })) {
1733-1756
: Potential inefficiency in batch dequeue loopThe batch dequeue script processes messages one by one in a loop, performing a GET operation for each message. For large batches, this could be inefficient. Consider using MGET for batch message retrieval.
The current approach with individual GET calls is correct but could be optimized for larger batches if performance becomes an issue.
1130-1130
: Consider making max dequeue count configurableThe maximum number of messages dequeued in a batch is hardcoded to 10. This affects throughput and should be configurable to allow tuning based on message processing characteristics.
maxCount: number; }): Promise<DequeuedMessage[]> { messageQueue: string; shard: number; - // TODO: make this configurable - maxCount: 10, + maxCount: this.options.maxDequeueCount ?? 10, })Also applies to: 1175-1175
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (70)
.vscode/launch.json
(1 hunks)apps/coordinator/tsconfig.json
(1 hunks)apps/docker-provider/tsconfig.json
(1 hunks)apps/kubernetes-provider/tsconfig.json
(1 hunks)apps/webapp/app/env.server.ts
(3 hunks)apps/webapp/app/presenters/v3/SpanPresenter.server.ts
(2 hunks)apps/webapp/app/routes/admin.api.v1.migrate-legacy-master-queues.ts
(1 hunks)apps/webapp/app/routes/engine.v1.dev.dequeue.ts
(1 hunks)apps/webapp/app/routes/engine.v1.worker-actions.deployments.$deploymentFriendlyId.dequeue.ts
(2 hunks)apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
(1 hunks)apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
(1 hunks)apps/webapp/app/runEngine/concerns/queues.server.ts
(1 hunks)apps/webapp/app/runEngine/services/triggerTask.server.ts
(2 hunks)apps/webapp/app/runEngine/types.ts
(1 hunks)apps/webapp/app/services/deleteProject.server.ts
(1 hunks)apps/webapp/app/v3/runEngine.server.ts
(4 hunks)apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
(4 hunks)apps/webapp/app/v3/tracer.server.ts
(8 hunks)apps/webapp/package.json
(1 hunks)apps/webapp/test/engine/triggerTask.test.ts
(12 hunks)internal-packages/database/prisma/schema.prisma
(1 hunks)internal-packages/run-engine/src/engine/index.ts
(13 hunks)internal-packages/run-engine/src/engine/locking.ts
(3 hunks)internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
(7 hunks)internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
(2 hunks)internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/systems.ts
(2 hunks)internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts
(19 hunks)internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts
(4 hunks)internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts
(13 hunks)internal-packages/run-engine/src/engine/tests/cancelling.test.ts
(8 hunks)internal-packages/run-engine/src/engine/tests/checkpoints.test.ts
(26 hunks)internal-packages/run-engine/src/engine/tests/delays.test.ts
(5 hunks)internal-packages/run-engine/src/engine/tests/dequeuing.test.ts
(6 hunks)internal-packages/run-engine/src/engine/tests/heartbeats.test.ts
(15 hunks)internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts
(6 hunks)internal-packages/run-engine/src/engine/tests/priority.test.ts
(5 hunks)internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts
(13 hunks)internal-packages/run-engine/src/engine/tests/trigger.test.ts
(6 hunks)internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts
(9 hunks)internal-packages/run-engine/src/engine/tests/ttl.test.ts
(2 hunks)internal-packages/run-engine/src/engine/tests/utils/engineTest.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts
(2 hunks)internal-packages/run-engine/src/engine/tests/waitpoints.test.ts
(23 hunks)internal-packages/run-engine/src/engine/types.ts
(5 hunks)internal-packages/run-engine/src/run-queue/index.test.ts
(24 hunks)internal-packages/run-engine/src/run-queue/index.ts
(40 hunks)internal-packages/run-engine/src/run-queue/keyProducer.ts
(2 hunks)internal-packages/run-engine/src/run-queue/tests/ack.test.ts
(4 hunks)internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts
(6 hunks)internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts
(3 hunks)internal-packages/run-engine/src/run-queue/tests/migrateLegacyMasterQueue.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/tests/nack.test.ts
(6 hunks)internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts
(6 hunks)internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts
(4 hunks)internal-packages/run-engine/src/run-queue/types.ts
(2 hunks)internal-packages/run-engine/tsconfig.build.json
(1 hunks)internal-packages/run-engine/tsconfig.src.json
(1 hunks)internal-packages/run-engine/tsconfig.test.json
(1 hunks)internal-packages/testcontainers/src/index.ts
(7 hunks)internal-packages/tracing/src/index.ts
(1 hunks)packages/core/src/logger.ts
(2 hunks)packages/core/src/v3/serverOnly/index.ts
(1 hunks)packages/core/src/v3/serverOnly/jumpHash.ts
(1 hunks)packages/core/test/jumpHash.test.ts
(1 hunks)packages/redis-worker/package.json
(1 hunks)packages/redis-worker/src/queue.test.ts
(1 hunks)packages/redis-worker/src/queue.ts
(3 hunks)packages/redis-worker/src/worker.ts
(7 hunks)src/run-queue/tests/nack.test.ts
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (17)
apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts (2)
packages/core/src/v3/runEngineWorker/supervisor/schemas.ts (4)
WorkerApiDequeueRequestBody
(67-70)WorkerApiDequeueRequestBody
(71-71)WorkerApiDequeueResponseBody
(73-73)WorkerApiDequeueResponseBody
(74-74)packages/core/src/v3/apps/http.ts (1)
json
(65-75)
apps/webapp/app/runEngine/concerns/queues.server.ts (1)
internal-packages/run-engine/src/engine/tests/setup.ts (1)
AuthenticatedEnvironment
(16-18)
apps/webapp/app/runEngine/types.ts (1)
internal-packages/run-engine/src/engine/tests/setup.ts (1)
AuthenticatedEnvironment
(16-18)
internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts (2)
internal-packages/run-engine/src/run-queue/index.ts (5)
message
(1214-1256)message
(1436-1469)message
(1471-1513)message
(1515-1539)message
(1554-1567)internal-packages/testcontainers/src/index.ts (1)
assertNonNullable
(19-19)
packages/core/test/jumpHash.test.ts (1)
packages/core/src/v3/serverOnly/jumpHash.ts (1)
jumpHash
(14-16)
internal-packages/run-engine/src/run-queue/tests/ack.test.ts (1)
internal-packages/testcontainers/src/index.ts (1)
assertNonNullable
(19-19)
internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts (1)
internal-packages/testcontainers/src/index.ts (2)
redisOptions
(132-165)prisma
(91-112)
internal-packages/run-engine/src/engine/tests/waitpoints.test.ts (1)
apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)
internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts (2)
internal-packages/testcontainers/src/index.ts (1)
prisma
(91-112)apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)
internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts (1)
apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)
internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts (2)
internal-packages/testcontainers/src/index.ts (2)
redisTest
(167-167)assertNonNullable
(19-19)internal-packages/run-engine/src/run-queue/index.ts (1)
workerQueue
(1350-1434)
internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts (1)
apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)
apps/webapp/test/engine/triggerTask.test.ts (2)
apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)packages/core/src/v3/workers/taskExecutor.ts (1)
result
(1262-1309)
internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (2)
internal-packages/testcontainers/src/index.ts (1)
prisma
(91-112)apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)
internal-packages/run-engine/src/engine/tests/checkpoints.test.ts (3)
apps/webapp/app/v3/runEngine.server.ts (1)
engine
(9-9)internal-packages/testcontainers/src/index.ts (2)
assertNonNullable
(19-19)prisma
(91-112)internal-packages/testcontainers/src/utils.ts (1)
assertNonNullable
(175-178)
internal-packages/run-engine/src/run-queue/types.ts (1)
internal-packages/run-engine/src/run-queue/index.ts (1)
workerQueue
(1350-1434)
internal-packages/run-engine/src/run-queue/index.ts (5)
packages/core/src/logger.ts (3)
LogLevel
(15-15)Logger
(19-130)error
(66-70)packages/redis-worker/src/worker.ts (6)
WorkerConcurrencyOptions
(45-49)Worker
(638-638)observableResult
(175-181)observableResult
(183-188)observableResult
(190-194)observableResult
(196-200)internal-packages/tracing/src/index.ts (6)
Meter
(16-16)getMeter
(45-47)ValueType
(38-38)ObservableResult
(25-25)Attributes
(15-15)context
(31-31)internal-packages/redis/src/index.ts (3)
Redis
(4-4)Callback
(4-4)Result
(4-4)internal-packages/run-engine/src/run-queue/types.ts (7)
RunQueueKeyProducer
(49-99)InputPayload
(5-16)InputPayload
(17-17)OutputPayloadV2
(25-28)OutputPayloadV2
(29-29)OutputPayload
(31-31)OutputPayload
(33-33)
🪛 Biome (1.9.4)
internal-packages/run-engine/src/engine/tests/utils/engineTest.ts
[error] 48-48: Unexpected empty object pattern.
(lint/correctness/noEmptyPattern)
🔇 Additional comments (129)
apps/coordinator/tsconfig.json (1)
3-3
: Bump TS target to ES2020
Aligns with other applications’ compiler targets and enables newer ECMAScript features.internal-packages/run-engine/tsconfig.build.json (2)
3-3
: Exclude internal test utilities
Addingsrc/engine/tests/utils/*.ts
toexclude
prevents accidental compilation of test helper code.
6-7
: Upgrade build config target and libs
Target raised to ES2020 andlib
updated to include ES2020 and async iterable support, matching other configs.packages/redis-worker/package.json (2)
54-54
: Remove Prometheus client dependency
Droppingprom-client
aligns with the migration to OpenTelemetry metrics.
54-54
:✅ Verification successful
Verify no lingering prom-client imports
Ensure all references toprom-client
have been removed to avoid runtime errors.Run the following script:
🏁 Script executed:
#!/bin/bash # Search for any remaining prom-client imports or usage rg -n "prom-client" .Length of output: 1998
No lingering
prom-client
imports inredis-worker
I ranrg -n prom-client packages/redis-worker
and can confirm there are no remaining references in that package. The dependency has been fully removed here. Other packages in the monorepo still useprom-client
, which is expected.internal-packages/run-engine/tsconfig.src.json (2)
3-3
: Verify the new exclude pattern.The added exclude entry
"src/engine/tests/utils/*.ts"
should correctly prevent compilation of utility test files. Please confirm that paths align with your directory structure and that no necessary files are inadvertently skipped.
6-7
: Upgrade to ES2020 is consistent with other packages.Raising the target to
"ES2020"
and thelib
entries is aligned with similar changes in other modules. No issues spotted here.packages/core/src/v3/serverOnly/index.ts (1)
7-7
: Export jumpHash for central access.Re‐exporting
jumpHash.js
here makes the shard‐hash function available from this module. Looks good..vscode/launch.json (1)
149-149
: Include--run
flag for RunQueue tests.Adding
--run
ensures the correct test runner behavior in VSCode. The change matches the updated test commands.internal-packages/run-engine/src/engine/systems/systems.ts (2)
1-1
: Import Meter alongside Tracer.Adding
Meter
is essential for OpenTelemetry integration. Ensure that downstream modules handle the new import.
16-16
: Ensure all SystemResources initializations includemeter
.The
SystemResources
type now requires ameter
property. Verify that every instantiation site (e.g., inRunEngine
setup) is updated to pass aMeter
instance, or you will face type errors.internal-packages/run-engine/tsconfig.test.json (2)
2-2
: LGTM: Test utilities inclusion supports new engine test infrastructure.The addition of
src/engine/tests/utils/*.ts
properly includes the new test utilities that support the engine refactoring and queue architecture changes.
6-7
: LGTM: ES2020 upgrade provides modern language features.The upgrade from ES2019 to ES2020 target and libraries is appropriate for the current Node.js environment and provides access to newer ECMAScript features that may be utilized in the refactored codebase.
apps/webapp/package.json (1)
64-64
: LGTM: OpenTelemetry metrics dependencies properly support new telemetry features.The addition of
@opentelemetry/exporter-metrics-otlp-proto
and@opentelemetry/sdk-metrics
provides the necessary infrastructure for the metrics enhancements mentioned in the PR objectives. The versions are consistent with the existing OpenTelemetry ecosystem in the project.Also applies to: 71-71
apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx (1)
703-704
: LGTM: UI properly reflects the master queue to worker queue architecture migration.The change from displaying separate "Primary master queue" and "Secondary master queue" properties to a unified "Worker queue" correctly reflects the underlying architectural refactoring from dual master queues to a single worker queue system.
apps/webapp/app/runEngine/services/triggerTask.server.ts (2)
237-237
: LGTM: Queue manager method updated for worker queue architecture.The change from
getMasterQueue
togetWorkerQueue
correctly reflects the updated queue management interface as part of the master queue to worker queue migration.
274-274
: LGTM: Engine trigger payload updated for worker queue system.The change from
masterQueue
toworkerQueue
in the engine trigger payload properly implements the new queue architecture where the unified worker queue replaces the previous dual master queue system.internal-packages/run-engine/src/engine/tests/ttl.test.ts (2)
26-27
: LGTM! Configuration properly reflects new worker queue architecture.The addition of
processWorkerQueueDebounceMs
andmasterQueueConsumersDisabled
correctly configures the test environment for the new worker queue processing model with debouncing enabled and legacy master queue consumers disabled.
70-70
: LGTM! Correctly updated to use worker queue terminology.The change from
masterQueue
toworkerQueue
aligns with the broader refactoring and maintains the same queue identifier "main".internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (2)
77-78
: LGTM! Development environment isolation is well-designed.The logic to force development runs to use the environment ID as the worker queue provides good isolation between development environments. This prevents runs from different development environments from interfering with each other.
82-97
: LGTM! Simplified enqueue call aligns with new architecture.The change from passing an array of master queues to a single
workerQueue
parameter simplifies the API and aligns with the new worker queue architecture.apps/webapp/app/presenters/v3/SpanPresenter.server.ts (2)
148-148
: LGTM! Database query updated for new schema.The change from selecting
masterQueue
toworkerQueue
correctly aligns with the database schema migration from master queue to worker queue terminology.
366-366
: LGTM! Response field updated consistently.The change from returning
masterQueue
toworkerQueue
maintains consistency with the database schema changes and ensures the UI displays the correct field.apps/webapp/app/runEngine/types.ts (1)
69-69
: LGTM! Interface method renamed consistently with the architectural refactoring.The method signature remains unchanged, ensuring compatibility while reflecting the shift from master queue to worker queue terminology.
internal-packages/run-engine/src/engine/tests/cancelling.test.ts (1)
28-29
: New queue configuration options align with the architectural changes.The addition of
masterQueueConsumersDisabled
andprocessWorkerQueueDebounceMs
suggests the new worker queue architecture requires different processing semantics compared to the previous master queue approach.internal-packages/run-engine/src/engine/tests/trigger.test.ts (3)
7-7
: Good practice using the node: prefix for built-in modules.The import change from
"timers/promises"
to"node:timers/promises"
follows Node.js best practices for importing built-in modules.
26-27
: Queue configuration changes are consistent with the architectural refactoring.The addition of
masterQueueConsumersDisabled
andprocessWorkerQueueDebounceMs
options maintains consistency with the cancelling tests and supports the new worker queue processing model.
70-70
: Consistent use of "main" worker queue value.The hardcoded "main" value is used consistently across all tests, which aligns with the pattern in the cancelling tests.
Also applies to: 117-117, 267-267, 280-280
internal-packages/database/prisma/schema.prisma (1)
767-769
: LGTM! Clean field rename with backward compatibility.The approach of renaming
masterQueue
toworkerQueue
while preserving the database column mapping using@map("masterQueue")
is excellent for maintaining backward compatibility. The deprecation comment forsecondaryMasterQueue
is also appropriate.apps/webapp/app/services/deleteProject.server.ts (1)
44-50
:✅ Verification successful
Verify the updated method signature matches the new usage.
The simplified approach of directly iterating over environments and using
runtimeEnvironmentId: environment.id
is cleaner than the previous implementation. However, please ensure that theremoveEnvironmentQueuesFromMasterQueue
method signature in the RunEngine has been updated to acceptruntimeEnvironmentId
instead of master queue strings.
🏁 Script executed:
#!/bin/bash # Description: Verify that RunEngine.removeEnvironmentQueuesFromMasterQueue accepts runtimeEnvironmentId parameter # Expected: Method should accept an object with runtimeEnvironmentId property ast-grep --pattern $'removeEnvironmentQueuesFromMasterQueue($$$) { $$$ }'Length of output: 76
To locate the method signature, let’s search for its definition and surrounding context:
🏁 Script executed:
#!/bin/bash set -euo pipefail # Search for the removeEnvironmentQueuesFromMasterQueue method definition and signature rg -n -C3 "removeEnvironmentQueuesFromMasterQueue" || trueLength of output: 3480
removeEnvironmentQueuesFromMasterQueue signature matches usage
Verified that in internal-packages/run-engine/src/engine/index.ts (lines 748–756) the method is defined as:
async removeEnvironmentQueuesFromMasterQueue({ runtimeEnvironmentId, organizationId, projectId, }) { … }which aligns exactly with the call in apps/webapp/app/services/deleteProject.server.ts (lines 45–48).
apps/webapp/app/runEngine/concerns/queues.server.ts (1)
199-202
:✅ Verification successful
Verify the behavior change for DEVELOPMENT environments is intentional.
The method rename from
getMasterQueue
togetWorkerQueue
is consistent with the overall refactoring. However, the behavior change for DEVELOPMENT environments is significant - previously returningundefined
, now returningenvironment.id
.Please confirm this change is intentional and that all consumers of this method have been updated to handle this new behavior correctly.
🏁 Script executed:
#!/bin/bash # Description: Find all usages of getWorkerQueue/getMasterQueue to verify they handle the new behavior # Expected: All usages should properly handle environment.id for DEVELOPMENT environments rg -A 3 -B 3 "getWorkerQueue|getMasterQueue"Length of output: 5173
Confirmed: getWorkerQueue change is safe
- Only one call site (
apps/webapp/app/runEngine/services/triggerTask.server.ts
) invokesgetWorkerQueue(environment)
and uses its return value directly.- There’s no special handling for
undefined
, so always returningenvironment.id
in DEVELOPMENT has no adverse effect.All consumers accommodate the new behavior—no further action needed.
internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts (2)
18-22
: Good queue configuration pattern.The addition of
masterQueueConsumersDisabled: true
andprocessWorkerQueueDebounceMs: 50
provides a good pattern for the queue configuration during the migration. This configuration should be consistently applied across all test files.
58-62
: Consistent migration pattern applied correctly.The changes from
masterQueue
toworkerQueue: "main"
and the corresponding dequeue method update are correctly implemented with the necessary delay for queue processing.internal-packages/run-engine/src/engine/tests/delays.test.ts (2)
67-67
: Migration pattern correctly applied.The replacement of
masterQueue
withworkerQueue: "main"
is consistent with the broader refactor and correctly maintains the test's intended functionality.
387-391
: Dequeue operation updated correctly.The transition from
dequeueFromMasterQueue
todequeueFromWorkerQueue
with the necessary 500ms delay and parameter changes is properly implemented.internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts (3)
26-27
: Complete queue configuration implementation.This file correctly includes both
masterQueueConsumersDisabled: true
andprocessWorkerQueueDebounceMs: 50
configuration options, demonstrating the complete migration pattern that should be applied across all test files.
75-88
: Systematic migration of trigger and dequeue operations.The comprehensive update of trigger operations to use
workerQueue: "main"
and dequeue operations to usedequeueFromWorkerQueue
with proper delays maintains test functionality while adapting to the new queue architecture.
375-376
: Consistent configuration across test cases.The queue configuration is properly replicated in the second test case, ensuring consistent behavior across all batch testing scenarios.
internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts (3)
27-28
: LGTM! Queue configuration aligned with new architecture.The new configuration options
masterQueueConsumersDisabled: true
andprocessWorkerQueueDebounceMs: 50
properly configure the test environment for the worker queue paradigm.
78-78
: LGTM! Parameter naming updated for worker queue model.The transition from
masterQueue
toworkerQueue: "main"
correctly reflects the new queue processing architecture.Also applies to: 99-99
125-129
: LGTM! Dequeue method updated correctly.The transition to
dequeueFromWorkerQueue
with theworkerQueue
parameter aligns with the new worker queue architecture.apps/webapp/app/v3/runEngine.server.ts (4)
7-7
: LGTM! Metrics integration properly imported.Adding
meter
import enables OpenTelemetry metrics collection as part of the observability improvements.
16-16
: LGTM! Log level configuration added.The
logLevel
configuration allows runtime control of logging verbosity for the run engine.
62-63
: LGTM! Queue sharding and debounce configuration added.The
shardCount
andprocessWorkerQueueDebounceMs
options enable the new sharded queue architecture and worker queue processing controls.
77-77
: LGTM! Meter instance properly passed to RunEngine.Adding the
meter
instance enables metrics collection throughout the run engine components.packages/core/test/jumpHash.test.ts (1)
1-82
: Excellent comprehensive test suite for jumpHash function.The test coverage is thorough and validates all critical properties of a jump consistent hash:
- Deterministic behavior: Ensures same input produces same output
- Range validation: Verifies output is always within [0, buckets-1]
- Distribution: Tests even distribution across buckets
- Minimal movement: Validates consistent hashing property when buckets increase
- Edge cases: Handles empty strings and single bucket scenarios
The statistical assertions for distribution and movement are well-calibrated with appropriate tolerance ranges.
internal-packages/run-engine/src/run-queue/tests/ack.test.ts (3)
1-1
: LGTM! Necessary imports added for queue refactoring.The
assertNonNullable
andsetTimeout
imports support the updated test logic for the worker queue paradigm.Also applies to: 5-5
73-73
: LGTM! Enqueue parameter updated for worker queue model.The transition from
masterQueues
toworkerQueue
correctly reflects the new queue architecture.Also applies to: 134-134
78-82
: LGTM! Dequeue method updated with proper null checking.The transition to
dequeueMessageFromWorkerQueue
withassertNonNullable
ensures robust test behavior and aligns with the new worker queue architecture.Also applies to: 147-151
internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts (2)
112-122
:❓ Verification inconclusive
Validate the new dequeue API signature and message structure.
The dequeue method has changed from returning arrays to single messages, and there's version-specific logic for extracting the worker queue. This suggests significant API changes.
The test correctly adapts to the new worker queue dequeue API and handles the version-specific message structure. Verify that the
assertNonNullable
usage is consistent with other dequeue tests:
🏁 Script executed:
#!/bin/bash # Check other dequeue test patterns for consistency rg -A 5 "dequeueMessageFromWorkerQueue\|dequeueFromWorkerQueue" --type tsLength of output: 75
Confirm assertNonNullable usage across dequeue tests
I ran a repo-wide search and didn’t find any other tests calling
dequeueMessageFromWorkerQueue
ordequeueFromWorkerQueue
. Please manually verify that usingassertNonNullable
here aligns with your existing dequeue test conventions—or update other tests to match this pattern for consistency.
83-83
:✅ Verification successful
Verify worker queue identifier usage in tests.
The test now uses
authenticatedEnvDev.id
as the worker queue identifier. Ensure this aligns with the production behavior where environment IDs serve as worker queue identifiers in development environments.
🏁 Script executed:
#!/bin/bash # Verify how worker queue identifiers are determined in the run engine ast-grep --pattern $'workerQueue: $_'Length of output: 38
🏁 Script executed:
#!/bin/bash # Search for all occurrences of `workerQueue` in run-queue package rg -n "workerQueue" internal-packages/run-engine/src/run-queueLength of output: 9933
🏁 Script executed:
#!/bin/bash # Find definition of authenticatedEnvDev in the repository rg -n "authenticatedEnvDev" -n .Length of output: 12501
🏁 Script executed:
#!/bin/bash # Display definition of authenticatedEnvDev in run-queue index.test.ts sed -n '30,50p' internal-packages/run-engine/src/run-queue/index.test.tsLength of output: 541
🏁 Script executed:
#!/bin/bash # Display code around enqueueMessage in run-queue index.ts to verify workerQueue assignment sed -n '420,500p' internal-packages/run-engine/src/run-queue/index.tsLength of output: 2452
Worker queue identifier usage in tests is correct.
The tests explicitly pass
authenticatedEnvDev.id
as theworkerQueue
parameter, which aligns with theenqueueMessage
signature and theworkerQueueKey
generation logic in the production code. No changes required.internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (3)
29-30
: Consistent test configuration for worker queue architecture.The test configuration correctly disables master queue consumers and enables worker queue processing with debounce timing. This aligns well with the architectural changes.
The
masterQueueConsumersDisabled: true
andprocessWorkerQueueDebounceMs: 50
configuration is consistently applied across all test cases, which helps ensure isolated testing of the new worker queue functionality.Also applies to: 147-148, 270-271, 401-402, 506-507, 652-653
76-76
: Standardized worker queue identifier usage.All tests consistently use
"main"
as the worker queue identifier, which suggests this is the standard identifier for test environments.The consistent usage of
workerQueue: "main"
across all test cases aligns with the new worker queue architecture and provides predictable test behavior.Also applies to: 195-195, 317-317, 448-448, 553-553, 699-699
84-84
:✅ Verification successful
Verify timing requirements for queue processing delays.
All heartbeat tests use a 500ms delay before dequeue operations, which is shorter than the 1000ms used in the enqueue test.
Verify that 500ms is sufficient for all test scenarios, especially considering the
processWorkerQueueDebounceMs: 50
configuration:Also applies to: 203-203, 325-325, 456-456, 561-561, 707-707
🏁 Script executed:
#!/bin/bash # Check if there are any timing-related issues or configurations rg -A 3 -B 3 "setTimeout.*[0-9]+" internal-packages/run-engine/src/engine/tests/ --type tsLength of output: 85297
500ms delay is sufficient for queue debouncing
500ms provides a 10× buffer over the configured 50msprocessWorkerQueueDebounceMs
and is the established pattern across all worker‐queue tests (including heartbeats, trigger, priority, etc.). No timing adjustment is needed.internal-packages/run-engine/src/engine/tests/dequeuing.test.ts (2)
28-29
: Consistent worker queue test configuration.The test configuration properly disables master queue consumers and sets appropriate debounce timing for worker queue processing.
Also applies to: 103-104
74-78
:✅ Verification successful
Verify dequeue method return type consistency.
The dequeue method calls suggest the API now returns arrays that can be spread into the dequeued array, but this differs from the single message return in the enqueue test.
Verify the expected return type of
dequeueFromWorkerQueue
:Also applies to: 148-151, 159-162
🏁 Script executed:
#!/bin/bash # Check the dequeue method signature and return type ast-grep --pattern $'dequeueFromWorkerQueue($_): $_ { $$$ }'Length of output: 63
🏁 Script executed:
#!/bin/bash # Search for all occurrences of dequeueFromWorkerQueue to locate its implementation and return type rg -n "dequeueFromWorkerQueue" .Length of output: 12239
🏁 Script executed:
#!/bin/bash # Extract the method signature and return type from the engine index sed -n '540,650p' internal-packages/run-engine/src/engine/index.ts # Inspect the implementation in dequeueSystem to see what is returned sed -n '1,200p' internal-packages/run-engine/src/engine/systems/dequeueSystem.tsLength of output: 10793
Return type confirmed for dequeueFromWorkerQueue
The
dequeueFromWorkerQueue
method ininternal-packages/run-engine/src/engine/index.ts
is declared to returnPromise<DequeuedMessage[]>
(it wraps the single or undefined result fromDequeueSystem
into a 0-or-1 element array). The existing spread usage in your tests is therefore correct and no changes are needed.internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts (1)
69-69
:✅ Verification successful
Verify consistency of workerQueue parameter.
The
workerQueue: "main"
parameter is used consistently throughout tests, but it's worth verifying this aligns with the actual worker queue implementation.
🏁 Script executed:
#!/bin/bash # Description: Verify worker queue parameter usage across the codebase # Expected: Consistent usage of "main" as default worker queue identifier # Search for workerQueue parameter usage rg -A 2 -B 2 'workerQueue.*:.*"' # Search for worker queue configuration in engine ast-grep --pattern 'workerQueue: $_'Length of output: 78887
No action required:
workerQueue: "main"
is consistent with implementationI’ve confirmed via a repository-wide search that every call to both
engine.dequeueFromWorkerQueue
and the run-queue tests useworkerQueue: "main"
. The implementation constructs queue keys solely from the passed string and doesn’t rely on any other default.All occurrences already align—no changes needed.
internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts (1)
78-87
: Approve the improved queue parameter usage.Using
authenticatedEnvProd.id
as the worker queue identifier is more realistic than the hardcoded "main" value used in other tests. The addition ofassertNonNullable(message)
provides better error messages when dequeue fails.packages/core/src/logger.ts (1)
15-17
: Approve the systematic addition of verbose log level.The addition of "verbose" to both the type definition and logLevels array follows the established pattern correctly. The placement at the end maintains backward compatibility with existing log level indices.
internal-packages/run-engine/src/engine/tests/priority.test.ts (4)
30-31
: LGTM: Queue configuration updated for worker queue architecture.The addition of
processWorkerQueueDebounceMs: 50
andmasterQueueConsumersDisabled: true
aligns with the architectural shift to worker queues and properly configures debounce timing for queue processing.
79-79
: Appropriate delay added for async queue processing.The 500ms delay allows time for asynchronous queue processing to complete before attempting to dequeue messages, which is necessary given the new debounced worker queue architecture.
84-87
: Correctly migrated to worker queue dequeue method.The change from
dequeueFromMasterQueue
todequeueFromWorkerQueue
and the parameter change frommasterQueue
toworkerQueue: "main"
properly reflects the new queue architecture.
234-234
: Worker queue parameter correctly updated in trigger payload.The change from
masterQueue
toworkerQueue: "main"
in the trigger function aligns with the overall migration to worker queue semantics.internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts (3)
27-28
: Queue configuration properly migrated for worker queue architecture.The addition of debounce timing and master queue consumer disabling is consistent with the broader architectural migration.
108-111
: Dequeue method correctly updated with proper timing.The migration to
dequeueFromWorkerQueue
withworkerQueue: "main"
is correct, and the preceding delay accounts for asynchronous queue processing.
253-256
: Consistent worker queue dequeue implementation.The dequeue method and parameters are correctly updated to match the new worker queue architecture.
internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts (6)
1-1
: Import addition for test utility.Adding
assertNonNullable
import is appropriate for ensuring dequeued messages are not null in the updated architecture.
8-8
: Required import for async delays.The
setTimeout
import fromnode:timers/promises
is necessary for the new timing requirements in worker queue processing.
72-73
: Worker queue parameter correctly updated in enqueue.The change from
masterQueue
toworkerQueue
in the enqueue operation aligns with the architectural migration.
77-81
: Proper dequeue method migration with safety check.The update to
dequeueMessageFromWorkerQueue
withassertNonNullable
ensures the test fails fast if the dequeue operation doesn't return a message as expected.
108-108
: Master queue consumers properly disabled for test isolation.Adding
masterQueueConsumersDisabled: true
ensures the test runs in isolation with only worker queue processing active.
128-137
: Enhanced test with skip dequeue processing flag.The addition of
skipDequeueProcessing: true
and explicit master queue processing call provides better control over the test execution flow.internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts (3)
23-24
: Queue configuration consistently updated across all test cases.The addition of
masterQueueConsumersDisabled: true
andprocessWorkerQueueDebounceMs: 50
is applied consistently across all test cases in this file, ensuring uniform test behavior.
67-67
: Worker queue parameter correctly applied in trigger payloads.All trigger calls have been updated to use
workerQueue: "main"
instead of the previous master queue approach, maintaining consistency across the test suite.
76-80
: Dequeue operations properly migrated with timing considerations.The consistent pattern of adding 500ms delays before
dequeueFromWorkerQueue
calls accommodates the asynchronous nature of the new worker queue processing architecture.packages/redis-worker/src/queue.test.ts (1)
427-485
: Well-structured test forenqueueOnce
idempotency!The test comprehensively validates that:
- The first enqueue succeeds and returns
true
- Subsequent enqueues with the same ID return
false
- The original message properties are preserved (value and availability time)
- Queue size remains consistent
Good coverage of the idempotent enqueue functionality.
internal-packages/run-engine/src/engine/types.ts (4)
3-3
: Appropriate imports for enhanced observability!The added imports for
Meter
,Tracer
,Logger
, andLogLevel
support the new telemetry and logging capabilities.Also applies to: 15-15
33-35
: Queue configuration properly extended for sharding and worker processing!The new optional properties enable:
shardCount
: Supports the new sharded master queue architectureprocessWorkerQueueDebounceMs
: Allows control over worker queue processing timingworkerOptions
: Provides worker-specific concurrency configurationAll properties are optional, maintaining backward compatibility.
51-53
: Observability enhancements added correctly!The new optional properties enable comprehensive monitoring and logging configuration at the engine level.
100-100
: Consistent property renaming frommasterQueue
toworkerQueue
!This change aligns with the PR's objective of transitioning from the master queue paradigm to worker queues.
internal-packages/run-engine/src/run-queue/tests/migrateLegacyMasterQueue.test.ts (1)
1-131
: Comprehensive test coverage for legacy master queue migration!The test effectively validates:
- Creation of a realistic legacy master queue with multiple environments
- Correct distribution of queues across shards using jump consistent hashing
- Preservation of message timestamps during migration
- Proper handling of empty queues (queue8 is excluded)
- Verification that queues are correctly assigned to shards
Good test structure with proper setup and teardown.
apps/webapp/app/routes/engine.v1.dev.dequeue.ts (2)
8-8
: Good documentation of backward compatibility consideration!The comment clearly explains why the unused body parameter is retained.
12-16
: Significant simplification of dequeue logic!The refactored implementation removes the complex resource tracking and worker iteration logic in favor of a single
dequeueFromEnvironmentWorkerQueue
call. This aligns with the PR's performance improvement objectives and the new worker queue architecture.apps/webapp/test/engine/triggerTask.test.ts (1)
35-35
: LGTM: Import addition is appropriate.The
setTimeout
import fromnode:timers/promises
is correctly added to support the async delays introduced in the tests.internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts (5)
1-1
: LGTM: Import updates are appropriate.The addition of
assertNonNullable
import andsetTimeout
import supports the test refactoring correctly.Also applies to: 8-8
47-47
: Test suite name updated correctly.The rename from testing master queue to worker queue operations reflects the architectural change appropriately.
81-81
: Consistent use of workerQueue parameter.The change from using
masterQueues
array to a singleworkerQueue: "main"
parameter is applied consistently across all enqueue operations.Also applies to: 169-169, 182-182, 227-227, 233-233
112-112
: Version expectation updated appropriately.The change from expecting version
"1"
to"2"
aligns with the queue system refactor and message structure updates.
107-116
:❌ Incorrect review comment
Verify the method signature change and return type handling.
The dequeue method signature has changed significantly from multiple parameters to a single
workerQueue
parameter, and the return type appears to be a single message object rather than an array. The use ofassertNonNullable
and version check look correct.
🏁 Script executed:
#!/bin/bash # Description: Verify the dequeueMessageFromWorkerQueue method signature and return type # Check the method implementation ast-grep --pattern 'async dequeueMessageFromWorkerQueue($_, $_) { $$$ }' # Verify the return type structure rg -A 5 "DequeuedMessage" internal-packages/run-engine/src/run-queue/types.tsLength of output: 154
🏁 Script executed:
#!/bin/bash set -e # Locate the implementation of dequeueMessageFromWorkerQueue and its signature rg -n "dequeueMessageFromWorkerQueue" -C5 internal-packages/run-engine/src/run-queue # Locate the type definition for DequeuedMessage rg -n "DequeuedMessage" -C5 internal-packages/run-engine/src/run-queueLength of output: 53445
Clarify dequeueMessageFromWorkerQueue signature and return type
The public API still accepts two parameters—
consumerId: string
andworkerQueue: string
—and returns a singleDequeuedMessage | undefined
, not an array. The return type and the assertion of non-nullability plus version check in your tests are correct. No code changes are required here; you can ignore the part of the review comment about a “singleworkerQueue
parameter.”Likely an incorrect or invalid review comment.
internal-packages/tracing/src/index.ts (4)
1-7
: LGTM: Comprehensive metric type imports.The addition of
Meter
and metric-related types to the imports provides good coverage for OpenTelemetry metrics functionality.
13-26
: Excellent type export coverage for metrics.The exported metric types provide comprehensive coverage including counters, gauges, histograms, and observable metrics. This gives consumers full access to OpenTelemetry metrics capabilities.
28-39
: Import and export structure is well-organized.The addition of
metrics
andValueType
to imports and exports maintains consistency with the existing pattern and provides the necessary metric infrastructure.
45-47
: getMeter function follows good patterns.The
getMeter
function implementation mirrors the existinggetTracer
function pattern, providing a consistent API for obtaining metric instances.internal-packages/run-engine/src/engine/tests/checkpoints.test.ts (3)
27-28
: Queue configuration options applied consistently.The addition of
masterQueueConsumersDisabled: true
andprocessWorkerQueueDebounceMs: 50
(or 100) is applied consistently across all test cases, which is good for maintaining test stability during the queue system transition.Also applies to: 203-204, 293-294, 496-497, 647-648, 794-795, 992-993, 1124-1125
71-71
: workerQueue property added consistently to trigger payloads.The addition of
workerQueue: "main"
to trigger payloads is applied consistently across all test cases, aligning with the new queue architecture.Also applies to: 247-247, 335-335, 540-540, 691-691, 843-843, 900-900, 1074-1074, 1125-1125, 1152-1152
80-86
: Dequeue method calls updated correctly with proper assertions.The change from
dequeueFromMasterQueue
todequeueFromWorkerQueue
with updated parameters and the addition ofassertNonNullable
for type safety looks correct. The length checks ensure the expected behavior.Also applies to: 153-158, 346-351, 551-556, 702-707, 854-859, 911-916, 1085-1090
internal-packages/run-engine/src/run-queue/tests/nack.test.ts (2)
71-81
: LGTM! Consistent refactoring to worker queue pattern.The changes correctly update all test cases to use the new
workerQueue
parameter anddequeueMessageFromWorkerQueue
method, aligning with the broader architectural shift from master queues to worker queues.Also applies to: 123-127, 162-171, 188-192, 245-255
74-75
: Good addition of timing delays for test stability.The 1000ms delays before dequeue operations ensure the queue state is consistent, which is important for reliable async tests.
Also applies to: 165-166, 185-186, 248-249
internal-packages/run-engine/src/engine/tests/utils/engineTest.ts (2)
48-81
: Well-structured test fixture with sensible defaults.The
engineOptions
fixture provides comprehensive default configuration for engine tests. The empty object pattern on line 48 is a standard Vitest pattern for fixtures that don't need the task context - the static analysis warning can be safely ignored.🧰 Tools
🪛 Biome (1.9.4)
[error] 48-48: Unexpected empty object pattern.
(lint/correctness/noEmptyPattern)
83-136
: Excellent resource management in the engine fixture.The fixture properly handles RunEngine lifecycle with automatic cleanup in the finally block, ensuring no resource leaks between tests.
apps/webapp/app/env.server.ts (3)
331-334
: LGTM! OpenTelemetry metrics configuration looks well-structured.The new environment variables for OTEL metrics exporter are properly configured with appropriate types and sensible defaults. The 30-second interval default aligns with common metrics collection practices.
630-630
: Log level configuration follows good practices.The enum validation ensures only valid log levels are accepted, and the "info" default provides a good balance between verbosity and performance.
471-471
:❓ Verification inconclusive
Verify the default shard count aligns with performance expectations.
The default shard count of 4 should be validated against expected load patterns and Redis performance characteristics.
🏁 Script executed:
#!/bin/bash # Description: Check if the default shard count is used consistently across the codebase # Expected: Find references to shard count configuration and usage rg -A 3 -B 3 "RUN_ENGINE_RUN_QUEUE_SHARD_COUNT|shardCount|shard.*count" --type tsLength of output: 15022
Verify shard count default consistency and performance impact
Please confirm that the new default shard count of 4 is intentional and performant:
- In apps/webapp/app/env.server.ts you’ve set
RUN_ENGINE_RUN_QUEUE_SHARD_COUNT: z.coerce.number().int().default(4)
- Internally, run-queue/index.ts falls back to 2 if none is provided:
this.shardCount = options.shardCount ?? 2
but since the env var now always supplies 4, the internal default is never used.- Tests in
internal-packages/run-engine/src/run-queue/index.test.ts
and migration tests assume shard counts of 8 and 2 respectively.Action items:
- Verify that 4 shards aligns with your expected workload and Redis performance characteristics.
- Update benchmarks or tests if you need to validate throughput or adjust the default.
- If 4 is too low or too high, adjust the default here or surface it as a required configuration instead of a hardcoded default.
packages/redis-worker/src/queue.ts (1)
591-598
: TypeScript interface declaration is correct.The interface properly extends the Redis commander with the new command signature, maintaining type safety.
internal-packages/run-engine/src/run-queue/keyProducer.ts (3)
17-18
: Constant naming follows established patterns.The new constants for
MASTER_QUEUE_PART
andWORKER_QUEUE_PART
are consistent with the existing naming convention.
22-42
:✅ Verification successful
Sharding implementation looks correct but needs consistency validation.
The sharding methods properly implement environment-to-shard mapping using jumpHash. However, ensure the key formats are consistent with existing queue processing logic.
🏁 Script executed:
#!/bin/bash # Description: Verify that the new key formats are used consistently across the codebase # Expected: Find usage patterns of the new master queue and worker queue keys rg -A 2 -B 2 "masterQueueKeyFor|workerQueueKey|MASTER_QUEUE_PART|WORKER_QUEUE_PART" --type tsLength of output: 13890
Key format consistency verified
All uses of master and worker queue keys now route through the newkeys
methods. Searches in:
internal-packages/run-engine/src/run-queue/index.ts
internal-packages/run-engine/src/run-queue/types.ts
internal-packages/run-engine/src/run-queue/tests/migrateLegacyMasterQueue.test.ts
show only calls tomasterQueueKeyForEnvironment
/masterQueueKeyForShard
andworkerQueueKey
—no hard-coded strings. The format is consistent with existing queue logic.
3-3
:✅ Verification successful
Verify the jumpHash import path and implementation.
Ensure the jumpHash function is properly implemented and exported from the specified path.
🏁 Script executed:
#!/bin/bash # Description: Verify the jumpHash function exists and is properly exported # Expected: Find the jumpHash implementation and its tests fd "jumpHash" --type f --extension ts ast-grep --pattern 'export function jumpHash($_,$_) { $$$ }'Length of output: 180
🏁 Script executed:
#!/bin/bash # Inspect the export syntax of jumpHash in the serverOnly directory rg "export" packages/core/src/v3/serverOnly/jumpHash.ts -n rg "jumpHash" packages/core/src/v3/serverOnly/jumpHash.ts -nLength of output: 255
🏁 Script executed:
#!/bin/bash # Locate and display import of jumpHash in keyProducer.ts rg "import.*jumpHash" -n internal-packages/run-engine/src/run-queue/keyProducer.ts # Display the first 20 lines to verify the module path and surrounding context sed -n '1,20p' internal-packages/run-engine/src/run-queue/keyProducer.tsLength of output: 889
KeyProducer jumpHash Import & Export Verified
- The
jumpHash
function is correctly exported frompackages/core/src/v3/serverOnly/jumpHash.ts
:// packages/core/src/v3/serverOnly/jumpHash.ts export function jumpHash(key: string, buckets: number): number { … }- The import path in
keyProducer.ts
matches the package’s structure:import { jumpHash } from "@trigger.dev/core/v3/serverOnly";No mismatches found.
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (4)
2-10
: OpenTelemetry imports are appropriate for metrics integration.The imports include all necessary OpenTelemetry components for implementing observable gauges and metric collection.
50-50
: Meter integration follows OpenTelemetry best practices.The optional meter parameter with a sensible default using
getMeter("release-concurrency")
allows for dependency injection while providing a working default.Also applies to: 68-68, 89-89
106-126
: Observable gauge configuration follows OpenTelemetry conventions.The gauge definitions use appropriate metric names, descriptions, and value types. The naming convention with dots is consistent with OpenTelemetry standards.
142-148
: Metric callback implementations are correct.The callback methods properly observe the cached values, following the OpenTelemetry observable gauge pattern.
internal-packages/run-engine/src/engine/locking.ts (2)
40-67
: Well-structured telemetry integration!The metrics setup is clean and follows OpenTelemetry best practices. The fallback meter creation ensures backward compatibility, and the metric descriptions are clear.
109-154
: Excellent implementation of lock telemetry!The lock tracking and metrics recording implementation is robust:
- Proper cleanup in finally blocks ensures no memory leaks
- High-resolution timing with
performance.now()
- Metrics recorded for both success and failure paths
- Unique lock IDs prevent collisions
internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (2)
10-14
: Good test configuration!The scoped engine options appropriately disable master queue consumers and set a reasonable debounce interval for predictable test behavior.
170-171
:❌ Incorrect review comment
Inconsistent test cleanup.
Only this test calls
engine.quit()
while other tests in the file don't. Consider adding cleanup to all tests or removing it here for consistency.
🏁 Script executed:
#!/bin/bash # Check if other tests in this file or similar test files use engine.quit() rg -A 2 -B 2 "engine\.quit\(\)" internal-packages/run-engine/src/engine/tests/Length of output: 23382
engine.quit() usage is consistent across tests
After checking other test files, every test—including
releaseConcurrency.test.ts
—invokesawait engine.quit()
(typically in afinally
block or via the shared test utility). The original concern about missing cleanup does not apply.Likely an incorrect or invalid review comment.
packages/redis-worker/src/worker.ts (2)
129-200
: Clean migration to OpenTelemetry metrics!The observable gauges are well-suited for queue metrics that need periodic sampling. The callback implementations are efficient and properly scoped.
285-343
: Well-implemented idempotent enqueue method!The
enqueueOnce
method follows the same pattern asenqueue
with proper tracing and error handling. The requiredid
parameter correctly enforces uniqueness for idempotent operations.internal-packages/run-engine/src/run-queue/index.test.ts (6)
1-1
: LGTM! Good addition of the non-null assertion utility.The import of
assertNonNullable
is appropriate for the updated test assertions.
18-18
: Appropriate logger level for tests.Setting the logger to "debug" level in tests helps with troubleshooting test failures.
105-106
: Correctly updated to use single workerQueue instead of masterQueues array.The change from
masterQueues
array to singleworkerQueue
string aligns with the new queue processing model.
275-281
: Good test coverage for disabled consumer scenario.The test properly validates queue behavior when
masterQueueConsumersDisabled
is true with appropriate debounce timing.
378-483
: Excellent test for queue concurrency behavior.This test properly validates that queue concurrency limits are respected and that messages are processed sequentially when the limit is 1.
485-488
: Good coverage of sharded queue functionality.Testing with 8 shards validates the sharding implementation works correctly.
internal-packages/run-engine/src/run-queue/types.ts (2)
19-33
: Well-designed discriminated union for backward compatibility.The split of
OutputPayload
into V1 and V2 with a discriminated union pattern is an excellent approach for migrating from the legacy master queue model to the new worker queue model while maintaining backward compatibility.
60-66
: Comprehensive key generation methods for the new queue architecture.The new methods provide all necessary key generation capabilities for:
- Legacy master queue support
- Sharded master queues with consistent hashing
- Worker queue keys
This supports the migration path while enabling the new architecture.
apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts (3)
540-548
: Correctly updated to use the new worker queue dequeue method.The changes properly migrate from
dequeueFromMasterQueue
todequeueFromWorkerQueue
with the simplified parameter structure.
567-576
: Appropriate removal of version-specific dequeue for unmanaged workers.The simplification to only support latest deployment dequeuing aligns with the new architecture. The clear error message helps users understand the limitation.
579-597
: Method correctly updated to use the new environment worker queue API.The removal of
maxRunCount
and migration todequeueFromEnvironmentWorkerQueue
aligns with the simplified dequeue interface.internal-packages/run-engine/src/engine/index.ts (1)
80-80
: Good improvement for configurable loggingUsing the logLevel from options with a sensible default of "info" improves flexibility for different deployment scenarios.
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
Outdated
Show resolved
Hide resolved
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
Outdated
Show resolved
Hide resolved
apps/webapp/app/routes/engine.v1.worker-actions.deployments.$deploymentFriendlyId.dequeue.ts
Show resolved
Hide resolved
3afe0ca
to
7fd9eef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (7)
packages/redis-worker/src/worker.ts (1)
559-564
:⚠️ Potential issueFix timing unit inconsistency.
The histogram records duration in seconds (line 563) but the metrics are defined with unit "ms". Also, consider using
performance.now()
for better timing precision.- const start = Date.now(); + const start = performance.now(); try { return await promise; } finally { - const duration = (Date.now() - start) / 1000; // Convert to seconds - histogram.record(duration, { worker_name: this.options.name, ...labels }); + const duration = performance.now() - start; // Keep in milliseconds + histogram.record(duration, { worker_name: this.options.name, ...labels }); }internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (2)
81-81
: Update lock name to match the new method nameThe lock name still references the old "dequeueFromMasterQueue" method name. This was already flagged in a previous review.
91-91
: Update all log messages to use the new method nameMultiple log messages still reference "dequeueFromMasterQueue" instead of "dequeueFromWorkerQueue". This was already flagged in a previous review and needs to be addressed for consistency.
Also applies to: 138-138, 195-196, 205-205, 219-219, 237-237, 259-259, 292-292, 303-303, 376-376, 466-467, 484-484, 510-510, 573-573
internal-packages/run-engine/src/engine/index.ts (1)
577-579
: Clarify comment on prod vs. dev worker queuesThe comment is ambiguous about what distinguishes a "prod" worker queue from a dev queue. Please update it to clearly define the criteria (e.g., queue naming convention, configuration flag, environment type).
internal-packages/run-engine/src/run-queue/index.ts (3)
1598-1598
: 🛠️ Refactor suggestionUse more robust messageId extraction
The current regex is fragile if messageIds contain colons. Use a more specific pattern that matches the expected format.
-local messageId = messageKeyValue:match("([^:]+)$") +local messageId = messageKeyValue:match(":message:(.+)$")
1006-1006
: 🛠️ Refactor suggestionAdd error handling to protect the consumer loop
As noted by @matt-aitken, this method call should be wrapped in error handling to prevent the consumer from stopping if an error occurs.
- const results = await this.#processMasterQueueShard(shard, consumerId); + const [error, results] = await tryCatch(this.#processMasterQueueShard(shard, consumerId)); + + if (error) { + this.logger.error(`Failed to process master queue shard ${shard}`, { + error, + shard, + consumerId, + }); + continue; + }
1133-1134
: 🛠️ Refactor suggestionUpdate log message to reference the correct method name
As noted by @matt-aitken, the log message references "dequeueMessageInSharedQueue" which doesn't match the actual method name.
- `[dequeueMessageInSharedQueue][${this.name}] Failed to dequeue from queue ${queue}`, + `[processMasterQueueShard][${this.name}] Failed to dequeue from queue ${queue}`,
🧹 Nitpick comments (5)
internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts (1)
81-81
: Consider making timeout duration configurable.The hardcoded 1000ms delays may make tests unnecessarily slow. Consider using a shorter timeout or making it configurable through test options.
+const DEQUEUE_DELAY_MS = 100; // Configurable delay -await setTimeout(1000); +await setTimeout(DEQUEUE_DELAY_MS);Also applies to: 148-148, 212-212, 276-276
internal-packages/run-engine/src/run-queue/index.test.ts (2)
18-18
: Consider keeping logger level as "warn" for tests.While "debug" level provides more visibility during development, it may produce excessive output in CI/CD pipelines. Consider using "warn" for tests unless debugging specific issues.
131-132
: Document the reason for setTimeout delays.The 1-second delays before dequeue operations appear necessary for queue processing stabilization. Consider adding a comment explaining why these delays are needed to help future maintainers.
+ // Allow time for the master queue consumer to process the message await setTimeout(1000);
Also applies to: 237-238, 340-341, 544-545, 613-614, 692-693, 746-747, 794-795, 836-837, 927-928
apps/webapp/app/v3/tracer.server.ts (1)
321-356
: Great implementation using batch observable callback!The Prisma metrics implementation correctly uses
addBatchObservableCallback
to ensure only one database query per scrape cycle, which addresses the optimization concern from previous reviews.Consider adding error handling to prevent metrics collection failures from affecting the application:
meter.addBatchObservableCallback( async (res) => { - const { total, busy, free } = await readPoolCounters(); - res.observe(totalGauge, total); - res.observe(busyGauge, busy); - res.observe(freeGauge, free); + try { + const { total, busy, free } = await readPoolCounters(); + res.observe(totalGauge, total); + res.observe(busyGauge, busy); + res.observe(freeGauge, free); + } catch (error) { + console.error("Failed to collect Prisma metrics:", error); + // Optionally observe 0 values to maintain metric continuity + res.observe(totalGauge, 0); + res.observe(busyGauge, 0); + res.observe(freeGauge, 0); + } }, [totalGauge, busyGauge, freeGauge] );internal-packages/run-engine/src/run-queue/index.ts (1)
1376-1378
: Remove obsolete TODO commentThe blocking dequeue timeout is now configurable via
options.dequeueBlockingTimeoutSeconds
, so this TODO comment is no longer needed.//args this.options.redis.keyPrefix ?? "", - // TODO: make this configurable String(this.options.dequeueBlockingTimeoutSeconds ?? 10)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (69)
.vscode/launch.json
(1 hunks)apps/coordinator/tsconfig.json
(1 hunks)apps/docker-provider/tsconfig.json
(1 hunks)apps/kubernetes-provider/tsconfig.json
(1 hunks)apps/webapp/app/env.server.ts
(3 hunks)apps/webapp/app/presenters/v3/SpanPresenter.server.ts
(2 hunks)apps/webapp/app/routes/admin.api.v1.migrate-legacy-master-queues.ts
(1 hunks)apps/webapp/app/routes/engine.v1.dev.dequeue.ts
(1 hunks)apps/webapp/app/routes/engine.v1.worker-actions.deployments.$deploymentFriendlyId.dequeue.ts
(2 hunks)apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
(1 hunks)apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
(1 hunks)apps/webapp/app/runEngine/concerns/queues.server.ts
(1 hunks)apps/webapp/app/runEngine/services/triggerTask.server.ts
(2 hunks)apps/webapp/app/runEngine/types.ts
(1 hunks)apps/webapp/app/services/deleteProject.server.ts
(1 hunks)apps/webapp/app/v3/runEngine.server.ts
(4 hunks)apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
(4 hunks)apps/webapp/app/v3/tracer.server.ts
(8 hunks)apps/webapp/package.json
(1 hunks)apps/webapp/test/engine/triggerTask.test.ts
(12 hunks)internal-packages/database/prisma/schema.prisma
(1 hunks)internal-packages/run-engine/src/engine/index.ts
(13 hunks)internal-packages/run-engine/src/engine/locking.ts
(3 hunks)internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
(6 hunks)internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
(3 hunks)internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/systems.ts
(2 hunks)internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts
(19 hunks)internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts
(4 hunks)internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts
(13 hunks)internal-packages/run-engine/src/engine/tests/cancelling.test.ts
(8 hunks)internal-packages/run-engine/src/engine/tests/checkpoints.test.ts
(26 hunks)internal-packages/run-engine/src/engine/tests/delays.test.ts
(5 hunks)internal-packages/run-engine/src/engine/tests/dequeuing.test.ts
(6 hunks)internal-packages/run-engine/src/engine/tests/heartbeats.test.ts
(15 hunks)internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts
(6 hunks)internal-packages/run-engine/src/engine/tests/priority.test.ts
(5 hunks)internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts
(13 hunks)internal-packages/run-engine/src/engine/tests/trigger.test.ts
(6 hunks)internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts
(9 hunks)internal-packages/run-engine/src/engine/tests/ttl.test.ts
(2 hunks)internal-packages/run-engine/src/engine/tests/utils/engineTest.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts
(2 hunks)internal-packages/run-engine/src/engine/tests/waitpoints.test.ts
(23 hunks)internal-packages/run-engine/src/engine/types.ts
(5 hunks)internal-packages/run-engine/src/run-queue/index.test.ts
(24 hunks)internal-packages/run-engine/src/run-queue/index.ts
(40 hunks)internal-packages/run-engine/src/run-queue/keyProducer.ts
(2 hunks)internal-packages/run-engine/src/run-queue/tests/ack.test.ts
(4 hunks)internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts
(6 hunks)internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts
(3 hunks)internal-packages/run-engine/src/run-queue/tests/migrateLegacyMasterQueue.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/tests/nack.test.ts
(6 hunks)internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts
(6 hunks)internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts
(4 hunks)internal-packages/run-engine/src/run-queue/types.ts
(2 hunks)internal-packages/run-engine/tsconfig.build.json
(1 hunks)internal-packages/run-engine/tsconfig.src.json
(1 hunks)internal-packages/run-engine/tsconfig.test.json
(1 hunks)internal-packages/testcontainers/src/index.ts
(7 hunks)internal-packages/tracing/src/index.ts
(1 hunks)packages/core/src/logger.ts
(2 hunks)packages/core/src/v3/serverOnly/index.ts
(1 hunks)packages/core/src/v3/serverOnly/jumpHash.ts
(1 hunks)packages/core/test/jumpHash.test.ts
(1 hunks)packages/redis-worker/package.json
(1 hunks)packages/redis-worker/src/queue.test.ts
(1 hunks)packages/redis-worker/src/queue.ts
(3 hunks)packages/redis-worker/src/worker.ts
(7 hunks)
✅ Files skipped from review due to trivial changes (3)
- .vscode/launch.json
- apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
- apps/webapp/package.json
🚧 Files skipped from review as they are similar to previous changes (56)
- apps/kubernetes-provider/tsconfig.json
- apps/coordinator/tsconfig.json
- apps/docker-provider/tsconfig.json
- packages/redis-worker/package.json
- packages/core/src/v3/serverOnly/index.ts
- internal-packages/run-engine/tsconfig.build.json
- apps/webapp/app/routes/engine.v1.worker-actions.deployments.$deploymentFriendlyId.dequeue.ts
- internal-packages/run-engine/tsconfig.src.json
- apps/webapp/app/runEngine/services/triggerTask.server.ts
- internal-packages/run-engine/src/engine/tests/ttl.test.ts
- internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
- internal-packages/run-engine/src/run-queue/tests/ack.test.ts
- internal-packages/run-engine/src/engine/tests/cancelling.test.ts
- apps/webapp/app/runEngine/types.ts
- internal-packages/run-engine/src/engine/systems/systems.ts
- apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
- packages/core/test/jumpHash.test.ts
- internal-packages/run-engine/src/engine/tests/delays.test.ts
- apps/webapp/app/presenters/v3/SpanPresenter.server.ts
- apps/webapp/app/runEngine/concerns/queues.server.ts
- internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts
- internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts
- apps/webapp/app/v3/runEngine.server.ts
- apps/webapp/app/services/deleteProject.server.ts
- internal-packages/run-engine/src/engine/tests/dequeuing.test.ts
- internal-packages/run-engine/src/engine/tests/heartbeats.test.ts
- internal-packages/run-engine/src/engine/tests/waitpoints.test.ts
- internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts
- internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts
- internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts
- internal-packages/database/prisma/schema.prisma
- internal-packages/run-engine/src/run-queue/tests/nack.test.ts
- internal-packages/run-engine/src/engine/tests/trigger.test.ts
- packages/redis-worker/src/queue.test.ts
- internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts
- internal-packages/tracing/src/index.ts
- internal-packages/run-engine/src/run-queue/tests/migrateLegacyMasterQueue.test.ts
- apps/webapp/app/routes/admin.api.v1.migrate-legacy-master-queues.ts
- apps/webapp/test/engine/triggerTask.test.ts
- internal-packages/run-engine/src/engine/tests/checkpoints.test.ts
- packages/core/src/v3/serverOnly/jumpHash.ts
- internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts
- internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts
- packages/core/src/logger.ts
- internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts
- internal-packages/run-engine/src/engine/types.ts
- apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
- internal-packages/run-engine/src/run-queue/keyProducer.ts
- packages/redis-worker/src/queue.ts
- apps/webapp/app/env.server.ts
- internal-packages/run-engine/src/run-queue/types.ts
- internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
- internal-packages/run-engine/src/engine/tests/priority.test.ts
- internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts
- internal-packages/testcontainers/src/index.ts
- internal-packages/run-engine/tsconfig.test.json
🧰 Additional context used
🧬 Code Graph Analysis (3)
internal-packages/run-engine/src/engine/locking.ts (2)
internal-packages/tracing/src/index.ts (7)
Tracer
(14-14)Meter
(16-16)Histogram
(24-24)getMeter
(45-47)ValueType
(38-38)ObservableResult
(25-25)Attributes
(15-15)internal-packages/redis/src/index.ts (1)
Redis
(4-4)
internal-packages/run-engine/src/run-queue/index.test.ts (2)
internal-packages/testcontainers/src/index.ts (3)
assertNonNullable
(19-19)redisTest
(167-167)redisContainer
(116-130)internal-packages/run-engine/src/run-queue/index.ts (8)
workerQueue
(1348-1431)RunQueue
(109-1973)message
(1212-1254)message
(1433-1466)message
(1468-1510)message
(1512-1536)message
(1551-1564)redis
(1611-1972)
internal-packages/run-engine/src/run-queue/index.ts (9)
packages/core/src/logger.ts (3)
LogLevel
(15-15)Logger
(19-130)error
(66-70)packages/redis-worker/src/worker.ts (2)
WorkerConcurrencyOptions
(45-49)Worker
(638-638)internal-packages/tracing/src/index.ts (8)
Meter
(16-16)getMeter
(45-47)ValueType
(38-38)ObservableResult
(25-25)Attributes
(15-15)propagation
(32-32)context
(31-31)SpanKind
(34-34)internal-packages/run-engine/src/engine/workerCatalog.ts (1)
workerCatalog
(3-56)internal-packages/redis/src/index.ts (3)
Redis
(4-4)Callback
(4-4)Result
(4-4)internal-packages/run-engine/src/run-queue/types.ts (8)
RunQueueKeyProducer
(49-99)RunQueueSelectionStrategy
(106-111)InputPayload
(5-16)InputPayload
(17-17)OutputPayloadV2
(25-28)OutputPayloadV2
(29-29)OutputPayload
(31-31)OutputPayload
(33-33)internal-packages/run-engine/src/shared/index.ts (1)
MinimalAuthenticatedEnvironment
(8-18)internal-packages/run-engine/src/run-queue/keyProducer.ts (8)
queueKey
(76-102)legacyMasterQueueKey
(22-24)workerQueueKey
(40-42)messageKey
(184-188)envCurrentConcurrencyKey
(159-177)envQueueKey
(104-106)envConcurrencyLimitKey
(50-66)deadLetterQueueKey
(204-220)packages/core/src/v3/index.ts (1)
calculateNextRetryDelay
(50-50)
🪛 Biome (1.9.4)
internal-packages/run-engine/src/engine/tests/utils/engineTest.ts
[error] 48-48: Unexpected empty object pattern.
(lint/correctness/noEmptyPattern)
⏰ Context from checks skipped due to timeout of 90000ms (25)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (20)
internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts (2)
78-78
: API migration looks correct.The transition from
masterQueues: "main"
toworkerQueue: authenticatedEnvProd.id
aligns with the architectural shift to worker queue-based processing.Also applies to: 145-145, 209-209, 273-273, 299-299
83-87
: Excellent defensive programming with null checks.The transition to
dequeueMessageFromWorkerQueue
with proper null assertion usingassertNonNullable
ensures test reliability.Also applies to: 150-154, 214-218, 278-282
apps/webapp/app/routes/engine.v1.dev.dequeue.ts (2)
8-8
: Good backwards compatibility approach.Maintaining the request body parameter while transitioning to the simplified API ensures existing clients won't break immediately.
13-16
: Excellent API simplification.The migration from complex resource management to a single
dequeueFromEnvironmentWorkerQueue
call significantly reduces complexity while maintaining functionality. This aligns well with the performance improvement objectives.internal-packages/run-engine/src/engine/tests/utils/engineTest.ts (4)
48-48
: Static analysis false positive - pattern is valid.The empty object destructuring
{}
for unused TaskContext properties is legitimate TypeScript syntax. The linter warning can be safely ignored in this context.🧰 Tools
🪛 Biome (1.9.4)
[error] 48-48: Unexpected empty object pattern.
(lint/correctness/noEmptyPattern)
22-46
: Well-designed configuration structure.The EngineOptions type provides comprehensive configuration with sensible defaults for worker concurrency, queue processing, machine specifications, and concurrency release controls. This supports flexible testing scenarios.
96-127
: Excellent engine configuration mapping.The RunEngine constructor properly maps all the configuration options with appropriate fallbacks. The integration of tracing, Redis, and Prisma dependencies is well-structured.
131-135
: Proper cleanup implementation.The try-finally block ensures engine cleanup even if tests fail, preventing resource leaks in test environments.
internal-packages/run-engine/src/engine/locking.ts (5)
19-23
: Well-defined semantic attributes.The semantic attributes follow OpenTelemetry naming conventions with proper namespacing using
run_engine
prefix.
37-37
: Excellent metrics instrumentation design.The combination of observable gauge for active lock counts and histogram for lock durations provides comprehensive observability. The metric names and descriptions are clear and follow best practices.
Also applies to: 53-67
69-84
: Efficient active locks aggregation.The callback-based approach for observable gauge updates is performant and provides real-time visibility into lock contention by type.
109-111
: Comprehensive lock duration tracking.The implementation correctly tracks lock duration from acquisition to release, recording both successful and failed operations with appropriate attributes. The cleanup in the finally block ensures metrics are always recorded.
Also applies to: 136-144, 149-155
121-124
: Active lock tracking implementation.The lock tracking correctly associates lock IDs with lock types and resources, enabling accurate metrics reporting.
packages/redis-worker/src/worker.ts (2)
1-13
: LGTM! Clean migration to OpenTelemetry metrics.The migration from Prometheus to OpenTelemetry metrics is well implemented:
- Import statements properly updated
- WorkerOptions correctly uses
Meter
instead of PrometheusRegistry
- Observable gauges with appropriate callbacks for queue metrics
- Proper meter initialization with fallback to global meter
Also applies to: 64-65, 81-81, 106-106, 129-200
285-343
: Well-implemented idempotent enqueue method.The
enqueueOnce
method properly mirrors the existingenqueue
method's structure while correctly requiring an ID parameter for idempotent operations. Good consistency in tracing and metrics recording.internal-packages/run-engine/src/run-queue/index.test.ts (3)
105-106
: Good adaptation to the new worker queue architecture.The test properly reflects the API changes:
- Using
workerQueue
property instead ofmasterQueues
array- Handling single message returns from
dequeueMessageFromWorkerQueue
- Proper null safety with
assertNonNullable
- Correct message version "2" assertions
Also applies to: 133-147
275-376
: Good test coverage for disabled master queue consumers.This test properly verifies that messages can still be enqueued and dequeued when master queue consumers are disabled, which is an important edge case for the new architecture.
485-579
: Excellent test for sharded queue functionality.The test properly verifies that the queue system works correctly with 8 shards, ensuring the sharding implementation is functioning as expected.
apps/webapp/app/v3/tracer.server.ts (2)
14-16
: Excellent integration of OpenTelemetry metrics.The metrics implementation is well-structured:
- Clean integration alongside existing tracing and logging
- Proper service instance ID for distinguishing between process instances
- Function rename to
setupTelemetry
better reflects its expanded responsibilities- Consistent meter initialization with proper fallbacks
Also applies to: 24-30, 42-44, 52-53, 57-58, 101-106, 171-171, 179-179, 191-191, 286-289
291-319
: Well-implemented metrics setup with proper configuration.The
setupMetrics
function:
- Respects environment configuration for enabling/disabling metrics
- Properly configures the MeterProvider with resource attributes
- Sets up periodic metric export with configurable intervals
- Correctly integrates Prisma metrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
internal-packages/run-engine/src/engine/index.ts (2)
564-596
: Verify method return type consistency across codebaseThe
dequeueFromWorkerQueue
method now returnsDequeuedMessage[]
consistently, wrapping single results in an array. This is a breaking change that requires all callers to handle arrays.
579-579
: Clarify the comment about "prod" vs dev worker queuesThe comment about only observing "prod" worker queues is ambiguous. Please specify the exact criteria that distinguish a "prod" worker queue from a dev queue (e.g., naming convention, environment variable, configuration flag).
internal-packages/run-engine/src/run-queue/index.ts (2)
1371-1401
: Address TODO: Make blocking dequeue timeout configurable
1637-1637
: Fragile messageId extraction from Redis key
🧹 Nitpick comments (1)
internal-packages/run-engine/src/run-queue/index.ts (1)
1018-1018
: Consider using more robust error handlingThe
tryCatch
usage is good, but consider whether the consumer should continue processing other shards if one fails, or if there should be circuit breaker logic to prevent cascading failures.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
apps/webapp/app/env.server.ts
(3 hunks)apps/webapp/app/v3/runEngine.server.ts
(4 hunks)internal-packages/run-engine/src/engine/index.ts
(13 hunks)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
(2 hunks)internal-packages/run-engine/src/engine/systems/ttlSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/types.ts
(5 hunks)internal-packages/run-engine/src/run-queue/index.test.ts
(24 hunks)internal-packages/run-engine/src/run-queue/index.ts
(38 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- apps/webapp/app/v3/runEngine.server.ts
- internal-packages/run-engine/src/engine/types.ts
- apps/webapp/app/env.server.ts
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal-packages/run-engine/src/engine/index.ts (4)
internal-packages/run-engine/src/engine/locking.ts (1)
RunLocker
(31-179)packages/core/src/logger.ts (1)
Logger
(19-130)internal-packages/tracing/src/index.ts (2)
Meter
(16-16)getMeter
(45-47)internal-packages/run-engine/src/run-queue/index.ts (1)
workerQueue
(1371-1454)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
- GitHub Check: typecheck / typecheck
🔇 Additional comments (42)
internal-packages/run-engine/src/engine/systems/ttlSystem.ts (1)
109-115
: LGTM: Correctly implements new worker queue acknowledgment semantics.The addition of
{ removeFromWorkerQueue: true }
aligns with the architectural shift from master queues to worker queues. This ensures proper cleanup when acknowledging expired runs.internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (2)
1053-1055
: LGTM: Properly removes cancelled runs from worker queue.The addition of
{ removeFromWorkerQueue: true }
ensures that cancelled runs are completely removed from the worker queue during acknowledgment, preventing them from being processed again.
1238-1240
: LGTM: Correctly handles worker queue cleanup for permanently failed runs.The explicit
removeFromWorkerQueue: true
option ensures that permanently failed runs are properly removed from the worker queue, maintaining queue integrity in the new architecture.internal-packages/run-engine/src/run-queue/index.test.ts (6)
18-18
: Appropriate logger level change for development.Changing from "warn" to "debug" level provides better visibility during testing and development of the new worker queue architecture.
105-105
: Correct use of workerQueue field in new architecture.The addition of
workerQueue
field to the enqueue operation aligns with the shift from master queues to worker queues. Using the environment ID as the worker queue identifier is appropriate for development environments.
131-146
: Well-implemented transition to worker queue dequeue pattern.The changes correctly implement the new worker queue architecture:
- Added
setTimeout(1000)
to allow asynchronous queue processing to stabilize- Changed from
dequeueMessageFromMasterQueue
todequeueMessageFromWorkerQueue
- Updated assertions to handle single message objects instead of arrays
- Proper use of
assertNonNullable
for null safetyThe timeout suggests the new architecture processes queues asynchronously, which is a reasonable architectural choice.
275-295
: Good test coverage for disabled master queue consumers.The addition of
masterQueueConsumersDisabled: true
andprocessWorkerQueueDebounceMs: 50
options provides important test coverage for the new worker queue processing model. This ensures the system works correctly when master queue processing is disabled.
485-502
: Excellent test coverage for sharded queue architecture.The addition of
shardCount: 8
test validates that the new sharded master queue architecture works correctly with multiple shards, which is crucial for the performance improvements mentioned in the PR objectives.
799-801
: Proper testing of new removeFromWorkerQueue option.The test correctly validates the new
removeFromWorkerQueue: true
option behavior, ensuring that acknowledged messages are properly removed from the worker queue when specified.internal-packages/run-engine/src/engine/index.ts (9)
2-2
: LGTM: Enhanced tracing imports for metrics supportThe addition of
getMeter
andMeter
imports aligns with the PR objective to add observability metrics to the queue system.
80-80
: Good: Configurable logging level implementationThe logger initialization now uses configurable log levels from options or defaults to "info", which is more flexible than hardcoded debug levels.
100-100
: LGTM: Meter integration with RunLockerPassing the meter to RunLocker enables distributed lock metrics collection, supporting the observability improvements mentioned in the PR objectives.
118-130
: Good: Comprehensive worker queue configuration optionsThe new configuration options provide fine-grained control over the worker queue behavior including sharding, consumer management, and debouncing.
134-134
: Good: More descriptive worker instance nameChanging from generic "worker" to "run-engine-worker" improves observability and debugging by making the worker's purpose clearer in logs and metrics.
194-194
: LGTM: Graceful fallback for meter initializationThe meter initialization provides a sensible fallback using
getMeter("run-engine")
when no meter is provided in options.
339-339
: Consistent parameter name change aligns with architecture migrationThe rename from
masterQueue
toworkerQueue
in the trigger method signature is consistent with the overall migration from master queue to worker queue architecture.
750-764
: Parameter rename improves clarityChanging from
masterQueue
toruntimeEnvironmentId
inremoveEnvironmentQueuesFromMasterQueue
makes the parameter's purpose clearer and aligns with the new architecture.
1100-1131
: Well-structured migration method with comprehensive loggingThe
migrateLegacyMasterQueues
method provides good logging before and after each migration operation, which will help with monitoring the migration process in production.internal-packages/run-engine/src/run-queue/index.ts (24)
11-16
: LGTM: Enhanced observability importsThe addition of meter and observability types supports the comprehensive metrics collection being implemented in this refactor.
47-54
: Good: Well-defined semantic attributes for observabilityThe new semantic attributes for worker queues, master queue shards, and consumer IDs will provide valuable insights for monitoring and debugging the sharded queue system.
66-81
: Comprehensive configuration options for worker queue architectureThe new options provide excellent control over sharding, worker behavior, and timeouts. The optional nature of most options with sensible defaults is well-designed.
97-106
: Simple and focused worker catalog definitionThe worker catalog is well-structured with appropriate schema validation and visibility timeout for the queue processing job.
125-125
: Good default shard countSetting the default shard count to 2 provides a reasonable starting point for load distribution without over-complicating the initial setup.
138-157
: Well-implemented observable metrics setupThe observable gauges for worker queue length and master queue shard length provide essential monitoring capabilities for the new architecture.
167-194
: Solid worker integration with proper lifecycle managementThe Worker setup includes appropriate job handlers, configuration, and conditional startup based on options. The async job processing for queue management is well-designed.
208-210
: Observable worker queue registration methodThe
registerObservableWorkerQueue
method provides a clean way to track which worker queues should be monitored for metrics.
468-491
: Efficient debounced worker job enqueueingThe worker job enqueueing with debouncing (500ms default) is a smart approach to avoid excessive queue processing while ensuring timely message movement to worker queues.
573-587
: Consistent worker job enqueueing in acknowledge flowAdding the same debounced worker job enqueueing logic in the acknowledge message flow ensures consistency and helps maintain queue processing efficiency.
640-640
: Good: Worker queue extraction from messageThe use of
#getWorkerQueueFromMessage
helper method provides consistent worker queue identification across different message processing flows.
794-797
: Consistent shard calculation for environment removalUsing the same sharding logic (
masterQueueKeyForEnvironment
) ensures consistent placement and removal of environment queues across shards.
828-839
: Comprehensive cleanup in quit methodThe quit method properly handles cleanup of all components including the abort controller, subscribers, worker, and Redis clients with appropriate error handling.
844-847
: Useful debugging method for worker queuesThe
peekAllOnWorkerQueue
method provides valuable debugging capabilities for inspecting worker queue contents without modifying them.
966-978
: Excellent debug infrastructure for Lua scriptsThe Lua debug subscriber channel provides valuable debugging capabilities for troubleshooting Redis Lua script execution, which will be helpful for this complex queue system.
980-996
: Well-designed master queue consumer startupThe configurable consumer startup with per-shard consumers and proper error handling provides good scalability and reliability for the sharded architecture.
1060-1110
: Comprehensive legacy migration implementationThe migration logic properly groups queues by their target shards and uses Redis pipelines for efficient bulk operations. The extensive logging will help monitor migration progress.
1590-1603
: Robust worker queue extraction logicThe
#getWorkerQueueFromMessage
method handles both v1 and v2 message formats gracefully, with special handling for development environments. This provides good backward compatibility during the migration.
1605-1648
: Well-implemented blocking dequeue clientThe blocking dequeue client with proper Lua script implementation provides efficient message retrieval from worker queues. The abort controller integration ensures proper cleanup.
1651-1672
: Solid legacy migration Redis commandThe
migrateLegacyMasterQueues
Lua script properly rebalances queues into the new sharded system with appropriate queue rebalancing logic.
1674-1711
: Enhanced enqueueMessage command with master queue shardingThe updated Lua script now includes master queue key and properly handles queue rebalancing in the sharded architecture.
1713-1805
: Efficient batch dequeue implementationThe
dequeueMessagesFromQueue
command now supports batch processing with proper concurrency limit checking and optimal Redis operations. The rebalancing logic maintains queue consistency.
1808-1850
: Enhanced acknowledge command with worker queue supportThe updated acknowledge command includes worker queue removal logic and maintains consistency across all queue types.
2022-2144
: Comprehensive Redis command interface updatesThe updated Redis command interfaces properly reflect all the architectural changes including sharding, worker queues, and batch operations. The type definitions are complete and accurate.
…r queue migration stuff
…gnature to take runtimeEnvironmentId instead of masterQueue parameter • Added automatic master queue shard calculation using this.keys.masterQueueKeyForEnvironment(runtimeEnvironmentId, this.shardCount) • Updated RunEngine wrapper method to use new runtimeEnvironmentId parameter • Updated DeleteProjectService to call the method once per environment instead of once per master queue • Simplified API by encapsulating master queue sharding logic within RunQueue class
…rics for run engine and redis-worker
…d make the consumer interval configurable via an env var
26ec667
to
79e7a15
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
packages/redis-worker/src/worker.ts (1)
559-564
: Fix timing unit inconsistency.The histogram records duration in seconds (line 563) but the metrics are defined with unit "ms". Also, consider using
performance.now()
for better timing precision.- const start = Date.now(); + const start = performance.now(); try { return await promise; } finally { - const duration = (Date.now() - start) / 1000; // Convert to seconds - histogram.record(duration, { worker_name: this.options.name, ...labels }); + const duration = performance.now() - start; // Keep in milliseconds + histogram.record(duration, { worker_name: this.options.name, ...labels }); }apps/webapp/app/v3/tracer.server.ts (1)
321-356
: Metrics optimization correctly implementedThe current implementation using
addBatchObservableCallback
properly addresses the performance concern from previous reviews. This approach ensures Prisma metrics are queried only once per scrape cycle, with all three gauges observing from the same dataset.internal-packages/run-engine/src/run-queue/index.ts (1)
1654-1654
: Fragile messageId extraction from Redis keyThis is the same issue identified in previous reviews. The regex
messageKeyValue:match("([^:]+)$")
could break if messageIds contain colons.As suggested previously, use a more robust pattern:
local messageId = messageKeyValue:match(":message:(.+)$")
🧹 Nitpick comments (5)
internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts (1)
84-88
: Consider explaining the purpose of the 500ms delays.The consistent addition of
setTimeout(500)
before each dequeue operation suggests there are timing dependencies in the new worker queue architecture. Consider adding comments to explain why these delays are necessary for test reliability.+ // Allow time for worker queue processing await setTimeout(500); const dequeued = await engine.dequeueFromWorkerQueue({
Also applies to: 197-201, 262-266, 438-442, 496-500
internal-packages/run-engine/src/engine/tests/utils/engineTest.ts (1)
48-48
: Address the empty object pattern.The static analysis tool flagged the empty destructuring pattern. Since
TaskContext
is not used, consider using an underscore parameter instead.-const engineOptions = async ({}: TaskContext, use: Use<EngineOptions>) => { +const engineOptions = async (_: TaskContext, use: Use<EngineOptions>) => {🧰 Tools
🪛 Biome (1.9.4)
[error] 48-48: Unexpected empty object pattern.
(lint/correctness/noEmptyPattern)
internal-packages/run-engine/src/engine/index.ts (1)
1100-1131
: Migration utility for legacy queuesThe new
migrateLegacyMasterQueues
method provides a systematic approach to migrating from the old master queue system to the new worker queue architecture. The implementation includes proper logging and iterates through managed worker instance groups.Consider adding error handling for individual migration failures to prevent one failed migration from stopping the entire process.
for (const workerGroup of workerGroups) { this.logger.info("Migrating legacy master queue", { workerGroupId: workerGroup.id, workerGroupName: workerGroup.name, workerGroupMasterQueue: workerGroup.masterQueue, }); + try { await this.runQueue.migrateLegacyMasterQueue(workerGroup.masterQueue); this.logger.info("Migrated legacy master queue", { workerGroupId: workerGroup.id, workerGroupName: workerGroup.name, workerGroupMasterQueue: workerGroup.masterQueue, }); + } catch (error) { + this.logger.error("Failed to migrate legacy master queue", { + workerGroupId: workerGroup.id, + workerGroupName: workerGroup.name, + workerGroupMasterQueue: workerGroup.masterQueue, + error, + }); + // Continue with other migrations instead of failing completely + } }internal-packages/run-engine/src/run-queue/index.ts (2)
68-81
: Consider adding validation for configuration options.The new configuration options provide good flexibility, but some combinations might be invalid or suboptimal. Consider adding validation logic to ensure:
shardCount
is a positive integer- Worker concurrency options are reasonable
- Timeout values are within acceptable ranges
1622-1665
: Consider connection reuse for blocking dequeue operations.Creating a new Redis client for each blocking dequeue operation introduces connection overhead. For high-throughput scenarios, consider:
- Maintaining a pool of blocking clients
- Reusing a single blocking client per worker thread
- Implementing proper connection lifecycle management
This could significantly improve performance under load.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (71)
.vscode/launch.json
(1 hunks)apps/coordinator/tsconfig.json
(1 hunks)apps/docker-provider/tsconfig.json
(1 hunks)apps/kubernetes-provider/tsconfig.json
(1 hunks)apps/webapp/app/env.server.ts
(3 hunks)apps/webapp/app/presenters/v3/SpanPresenter.server.ts
(2 hunks)apps/webapp/app/routes/admin.api.v1.migrate-legacy-master-queues.ts
(1 hunks)apps/webapp/app/routes/engine.v1.dev.dequeue.ts
(1 hunks)apps/webapp/app/routes/engine.v1.worker-actions.deployments.$deploymentFriendlyId.dequeue.ts
(2 hunks)apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
(1 hunks)apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
(1 hunks)apps/webapp/app/runEngine/concerns/queues.server.ts
(1 hunks)apps/webapp/app/runEngine/services/triggerTask.server.ts
(2 hunks)apps/webapp/app/runEngine/types.ts
(1 hunks)apps/webapp/app/services/deleteProject.server.ts
(1 hunks)apps/webapp/app/v3/runEngine.server.ts
(4 hunks)apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
(4 hunks)apps/webapp/app/v3/tracer.server.ts
(8 hunks)apps/webapp/package.json
(1 hunks)apps/webapp/test/engine/triggerTask.test.ts
(12 hunks)internal-packages/database/prisma/schema.prisma
(1 hunks)internal-packages/run-engine/src/engine/index.ts
(13 hunks)internal-packages/run-engine/src/engine/locking.ts
(3 hunks)internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
(6 hunks)internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
(3 hunks)internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
(2 hunks)internal-packages/run-engine/src/engine/systems/systems.ts
(2 hunks)internal-packages/run-engine/src/engine/systems/ttlSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts
(19 hunks)internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts
(4 hunks)internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts
(13 hunks)internal-packages/run-engine/src/engine/tests/cancelling.test.ts
(8 hunks)internal-packages/run-engine/src/engine/tests/checkpoints.test.ts
(26 hunks)internal-packages/run-engine/src/engine/tests/delays.test.ts
(5 hunks)internal-packages/run-engine/src/engine/tests/dequeuing.test.ts
(6 hunks)internal-packages/run-engine/src/engine/tests/heartbeats.test.ts
(15 hunks)internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts
(7 hunks)internal-packages/run-engine/src/engine/tests/priority.test.ts
(5 hunks)internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts
(13 hunks)internal-packages/run-engine/src/engine/tests/trigger.test.ts
(6 hunks)internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts
(9 hunks)internal-packages/run-engine/src/engine/tests/ttl.test.ts
(2 hunks)internal-packages/run-engine/src/engine/tests/utils/engineTest.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts
(2 hunks)internal-packages/run-engine/src/engine/tests/waitpoints.test.ts
(23 hunks)internal-packages/run-engine/src/engine/types.ts
(5 hunks)internal-packages/run-engine/src/run-queue/index.test.ts
(24 hunks)internal-packages/run-engine/src/run-queue/index.ts
(40 hunks)internal-packages/run-engine/src/run-queue/keyProducer.ts
(2 hunks)internal-packages/run-engine/src/run-queue/tests/ack.test.ts
(4 hunks)internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts
(6 hunks)internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts
(3 hunks)internal-packages/run-engine/src/run-queue/tests/migrateLegacyMasterQueue.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/tests/nack.test.ts
(6 hunks)internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts
(6 hunks)internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts
(4 hunks)internal-packages/run-engine/src/run-queue/types.ts
(2 hunks)internal-packages/run-engine/tsconfig.build.json
(1 hunks)internal-packages/run-engine/tsconfig.src.json
(1 hunks)internal-packages/run-engine/tsconfig.test.json
(1 hunks)internal-packages/testcontainers/src/index.ts
(7 hunks)internal-packages/tracing/src/index.ts
(1 hunks)packages/core/src/logger.ts
(2 hunks)packages/core/src/v3/serverOnly/index.ts
(1 hunks)packages/core/src/v3/serverOnly/jumpHash.ts
(1 hunks)packages/core/test/jumpHash.test.ts
(1 hunks)packages/redis-worker/package.json
(1 hunks)packages/redis-worker/src/queue.test.ts
(1 hunks)packages/redis-worker/src/queue.ts
(3 hunks)packages/redis-worker/src/worker.ts
(7 hunks)
✅ Files skipped from review due to trivial changes (3)
- apps/kubernetes-provider/tsconfig.json
- .vscode/launch.json
- packages/redis-worker/package.json
🚧 Files skipped from review as they are similar to previous changes (61)
- apps/coordinator/tsconfig.json
- apps/docker-provider/tsconfig.json
- apps/webapp/app/services/deleteProject.server.ts
- internal-packages/run-engine/tsconfig.src.json
- internal-packages/run-engine/tsconfig.test.json
- apps/webapp/app/runEngine/services/triggerTask.server.ts
- internal-packages/run-engine/src/engine/tests/ttl.test.ts
- apps/webapp/app/runEngine/types.ts
- packages/core/src/v3/serverOnly/index.ts
- internal-packages/run-engine/tsconfig.build.json
- internal-packages/run-engine/src/engine/tests/cancelling.test.ts
- apps/webapp/app/presenters/v3/SpanPresenter.server.ts
- internal-packages/run-engine/src/run-queue/tests/migrateLegacyMasterQueue.test.ts
- apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
- internal-packages/run-engine/src/engine/tests/delays.test.ts
- internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
- apps/webapp/app/runEngine/concerns/queues.server.ts
- internal-packages/run-engine/src/engine/tests/heartbeats.test.ts
- apps/webapp/package.json
- internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts
- internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts
- internal-packages/run-engine/src/engine/systems/ttlSystem.ts
- apps/webapp/app/v3/runEngine.server.ts
- internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
- packages/core/test/jumpHash.test.ts
- apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
- internal-packages/run-engine/src/run-queue/tests/ack.test.ts
- internal-packages/run-engine/src/run-queue/tests/nack.test.ts
- internal-packages/run-engine/src/engine/tests/waitpointRace.test.ts
- internal-packages/run-engine/src/engine/tests/checkpoints.test.ts
- internal-packages/run-engine/src/engine/tests/waitpoints.test.ts
- internal-packages/run-engine/src/engine/tests/trigger.test.ts
- internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts
- apps/webapp/app/routes/engine.v1.dev.dequeue.ts
- internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts
- internal-packages/run-engine/src/engine/systems/systems.ts
- packages/redis-worker/src/queue.test.ts
- apps/webapp/app/routes/engine.v1.worker-actions.deployments.$deploymentFriendlyId.dequeue.ts
- internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts
- internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts
- internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts
- internal-packages/run-engine/src/engine/tests/dequeuing.test.ts
- apps/webapp/app/routes/admin.api.v1.migrate-legacy-master-queues.ts
- internal-packages/tracing/src/index.ts
- internal-packages/run-engine/src/engine/tests/priority.test.ts
- apps/webapp/test/engine/triggerTask.test.ts
- packages/core/src/v3/serverOnly/jumpHash.ts
- internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts
- internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
- packages/core/src/logger.ts
- internal-packages/run-engine/src/engine/types.ts
- internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts
- internal-packages/run-engine/src/run-queue/keyProducer.ts
- internal-packages/run-engine/src/run-queue/index.test.ts
- apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts
- apps/webapp/app/env.server.ts
- internal-packages/testcontainers/src/index.ts
- packages/redis-worker/src/queue.ts
- internal-packages/run-engine/src/run-queue/types.ts
- internal-packages/database/prisma/schema.prisma
- internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
🧰 Additional context used
🧬 Code Graph Analysis (2)
internal-packages/run-engine/src/engine/locking.ts (2)
internal-packages/tracing/src/index.ts (7)
Tracer
(14-14)Meter
(16-16)Histogram
(24-24)getMeter
(45-47)ValueType
(38-38)ObservableResult
(25-25)Attributes
(15-15)internal-packages/redis/src/index.ts (1)
Redis
(4-4)
apps/webapp/app/v3/tracer.server.ts (4)
packages/core/src/v3/serverOnly/singleton.ts (1)
singleton
(1-8)apps/webapp/app/env.server.ts (1)
env
(793-793)internal-packages/tracing/src/index.ts (3)
trace
(30-30)metrics
(37-37)Meter
(16-16)internal-packages/run-engine/src/run-queue/index.ts (1)
meter
(204-206)
🪛 Biome (1.9.4)
internal-packages/run-engine/src/engine/tests/utils/engineTest.ts
[error] 48-48: Unexpected empty object pattern.
(lint/correctness/noEmptyPattern)
⏰ Context from checks skipped due to timeout of 90000ms (15)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: typecheck / typecheck
🔇 Additional comments (32)
internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts (2)
26-27
: Good: Test configuration aligns with new worker queue architecture.The added configuration options properly disable master queue consumers and set appropriate debounce timing for the test environment.
75-75
: Verify consistency of worker queue naming.All instances correctly use
workerQueue: "main"
which is consistent with the new architecture. This unified naming approach improves maintainability.Also applies to: 87-87, 123-123, 150-150, 429-429, 441-441, 479-479, 499-499, 557-557
internal-packages/run-engine/src/engine/tests/utils/engineTest.ts (2)
22-46
: Excellent: Well-structured test utility with sensible defaults.The
EngineOptions
type and default configuration provide a clean abstraction for test setup. The defaults are appropriate for test environments with proper debounce timing and disabled consumers.Also applies to: 49-81
96-127
: Good: Proper resource management and cleanup.The engine fixture correctly creates the RunEngine instance with all necessary configuration and ensures proper cleanup with
engine.quit()
in the finally block.Also applies to: 134-134
internal-packages/run-engine/src/engine/locking.ts (4)
8-17
: Good: Comprehensive telemetry imports and semantic attributes.The imports include all necessary OpenTelemetry types and the semantic attributes follow a consistent naming pattern for the run engine domain.
Also applies to: 19-23
36-38
: Excellent: Optional meter parameter with sensible default.The constructor design allows for dependency injection while providing a reasonable fallback using
getMeter("run-engine")
.Also applies to: 40-40, 51-51
53-67
: Good: Proper metrics instrumentation setup.The observable gauge and histogram are correctly configured with appropriate descriptions, units, and value types. The callback method efficiently aggregates active locks by type.
Also applies to: 69-84
109-155
: Excellent: Comprehensive lock operation tracking.The enhanced lock method properly tracks timing, manages the active locks map, and records detailed metrics with success/failure attributes. The use of
performance.now()
provides high-precision timing measurements.packages/redis-worker/src/worker.ts (3)
1-13
: Good: Comprehensive migration from Prometheus to OpenTelemetry.The import changes and WorkerOptions update properly replace the Prometheus metrics registry with the OpenTelemetry Meter API.
Also applies to: 64-64, 81-81, 106-106
129-173
: Excellent: Well-structured observable gauge metrics.The observable gauges provide valuable insights into queue size, dead letter queue size, and concurrency metrics. The callback methods are efficiently implemented with proper attributes.
Also applies to: 175-200
285-343
: Good: Useful addition of idempotent enqueue functionality.The
enqueueOnce
method provides important deduplication capabilities with a requiredid
parameter. The implementation follows the same pattern as the regularenqueue
method with proper tracing and metrics.apps/webapp/app/v3/tracer.server.ts (6)
14-15
: LGTM: Clean metrics API importsThe imports correctly include the necessary OpenTelemetry metrics components. The addition of
randomUUID
andprisma
imports supports the new metrics functionality.Also applies to: 24-29, 52-53
57-57
: Good practice: Unique service instance identificationUsing a random UUID for service instance identification is the correct approach for distinguishing between multiple instances of the same service in metrics and tracing data.
171-179
: Function rename reflects expanded scopeThe rename from
getTracer
tosetupTelemetry
better reflects the function's expanded responsibility of setting up tracing, logging, and metrics. The consistent inclusion ofmeter
in both disabled and enabled paths ensures API consistency.Also applies to: 286-286
291-319
: Well-structured metrics setup with proper configurationThe metrics setup follows OpenTelemetry best practices:
- Proper resource attribution with service name and instance ID
- Environment-controlled enablement
- Configurable export intervals
- Clean separation of concerns
The fallback to a no-op meter when disabled ensures consistent API behavior.
394-422
: Robust metrics exporter configurationThe metrics exporter setup provides good flexibility:
- OTLP exporter with configurable URL and headers
- Console exporter fallback
- Proper timeout configuration
- Informative logging for debugging
The 30-second timeout is appropriate for metrics export operations.
101-106
:❌ Incorrect review comment
Breaking change: Function signature updated
The singleton export has been updated to include the
meter
object, which is a breaking change for consumers. Ensure all consumers are updated to handle the new structure.Let me verify if all consumers have been updated:
🏁 Script executed:
#!/bin/bash # Check for consumers of the tracer singleton to ensure they handle the new meter property rg -A 3 "from.*tracer\.server" --type tsLength of output: 6858
Ignore the breaking-change warning for the new
meter
exportAdding
meter
as an additional named export alongsidetracer
,otelLogger
(formerlylogger
), andprovider
is backward-compatible. Existing consumers that import onlytracer
,logger
(orotelLogger
), orprovider
will continue working without any code changes—no further updates are needed.Likely an incorrect or invalid review comment.
internal-packages/run-engine/src/engine/index.ts (7)
2-2
: LGTM: Consistent meter integrationThe meter is properly imported, declared as a property, initialized with fallback, and passed to system resources. This follows the telemetry enhancement pattern consistently.
Also applies to: 60-60, 194-194, 214-214
58-58
: Improved logger configurabilityThe logger initialization now supports configurable log levels instead of hardcoded "debug". This provides better production flexibility and follows logging best practices.
Also applies to: 80-80, 115-115
118-130
: Enhanced queue configuration optionsThe new worker and queue configuration options provide granular control over queue processing behavior. The addition of parameters like
masterQueueConsumersDisabled
,processWorkerQueueDebounceMs
, anddequeueBlockingTimeoutSeconds
suggests sophisticated queue management capabilities.
134-134
: Worker name change for clarityChanging the worker name from "worker" to "run-engine-worker" provides better identification in logs and monitoring systems.
339-339
: Consistent queue terminology migrationThe parameter and field names have been consistently updated from
masterQueue
/secondaryMasterQueue
toworkerQueue
, reflecting the architectural changes mentioned in the PR objectives.Also applies to: 411-411
598-621
: Environment-specific dequeue delegationThe
dequeueFromEnvironmentWorkerQueue
method provides a clean abstraction for environment-specific dequeuing by using the environment ID as the worker queue identifier.
750-764
: Parameter name updated for consistencyThe
removeEnvironmentQueuesFromMasterQueue
method parameter was updated frommasterQueue
toruntimeEnvironmentId
, which better reflects its actual usage and improves clarity.internal-packages/run-engine/src/run-queue/index.ts (8)
138-158
: Well-structured metrics implementation.The observable gauges provide good visibility into queue state and follow proper OpenTelemetry patterns.
167-194
: Good worker initialization with proper conditional logic.The worker setup correctly respects the disabled flag and passes through appropriate configuration options.
508-542
: Clean method signature aligned with new architecture.The transition from complex queue selection to simple worker queue dequeue simplifies the interface and aligns with the performance improvements described in the PR.
997-1075
: Robust master queue consumer implementation.The error handling with
tryCatch
protects against consumer failures as requested in previous reviews. The abort controller pattern ensures clean shutdown of background processes.
1417-1417
: Blocking timeout is now properly configurable.Good implementation of the configurable timeout as requested in previous reviews. This allows tuning based on deployment requirements.
1767-1777
: Efficient batch processing with proper concurrency control.The script correctly calculates available capacity and respects both environment and queue-level concurrency limits. The batch processing approach should improve performance compared to single-message dequeue.
844-856
: Comprehensive cleanup logic for new architecture.The quit method properly handles cleanup of all the new components (worker, additional Redis connections, abort controller) introduced in this refactor.
1219-1250
: Core worker queue processing logic is well-implemented.The batch enqueueing to worker queues with pipeline operations and proper tracing provides the foundation for the performance improvements described in the PR objectives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal-packages/run-engine/src/engine/tests/priority.test.ts (1)
92-99
: Consider removing debug console logs before merging.The console logs are helpful for debugging but should typically be removed or converted to proper test logging before merging to production.
- console.log( - "runs", - runs.map((r) => r.friendlyId) - ); - console.log( - "dequeued run IDs", - dequeue.map((d) => d.run.friendlyId) - );
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal-packages/run-engine/src/engine/tests/priority.test.ts
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (24)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
🔇 Additional comments (7)
internal-packages/run-engine/src/engine/tests/priority.test.ts (7)
30-31
: Queue configuration updated for new worker queue architecture.The addition of
processWorkerQueueDebounceMs: 50
andmasterQueueConsumersDisabled: true
aligns with the new worker queue architecture where manual queue processing is required in tests.
79-79
: Manual queue processing added for worker queue architecture.The explicit call to
processMasterQueueForEnvironment
is necessary because master queue consumers are disabled in the test configuration, requiring manual processing to move runs to worker queues.
84-87
: Dequeue method updated to use worker queue.The migration from
dequeueFromMasterQueue
todequeueFromWorkerQueue
and parameter change frommasterQueue
toworkerQueue
is consistent with the new architecture.
109-113
: Consistent pattern for manual queue processing and dequeue.The pattern of calling
processMasterQueueForEnvironment
followed bydequeueFromWorkerQueue
is consistent with the earlier usage and necessary for the new architecture.
196-198
: Dequeue method consistently updated.The change to
dequeueFromWorkerQueue
withworkerQueue: "main"
is consistent with the architectural migration.
247-247
: Trigger function updated to use worker queue.The change from
masterQueue
toworkerQueue
in the trigger function aligns with the new queue architecture and ensures runs are properly routed to worker queues.
190-190
:✅ Verification successful
Verify timeout duration is appropriate.
The 500ms timeout replaces the previous explicit queue length checks. Ensure this duration is sufficient for queue processing in all test environments.
🏁 Script executed:
#!/bin/bash # Description: Check if other test files use similar timeout patterns to validate consistency # Expected: Similar timeout patterns in other test files rg -A 2 -B 2 "setTimeout.*[0-9]+.*\)" --type ts internal-packages/run-engine/src/engine/tests/Length of output: 75680
Timeout duration is consistent with the suite
I checked over 50 tests ininternal-packages/run-engine/src/engine/tests/
(e.g.waitpoints.test.ts
,triggerAndWait.test.ts
,releaseConcurrency.test.ts
) and 500 ms is the de facto standard delay for queue polling. This change aligns with existing patterns—no adjustment is needed unless you observe flakes on slower CI nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal-packages/run-engine/src/engine/tests/dequeuing.test.ts (1)
65-65
: Consider adding a delay after master queue processing.The explicit call to
processMasterQueueForEnvironment
is correct for the new architecture. However, consider adding a small delay after this call to ensure runs are fully processed into worker queues before dequeuing.await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id, 5); +// Allow time for runs to be processed into worker queues +await new Promise(resolve => setTimeout(resolve, 100));
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal-packages/run-engine/src/engine/tests/dequeuing.test.ts
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (25)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (4)
internal-packages/run-engine/src/engine/tests/dequeuing.test.ts (4)
3-3
: LGTM: Import updated for new architecture.The import of
DequeuedMessage
is appropriate for the new worker queue dequeuing functionality.
27-28
: LGTM: Configuration correctly implements new architecture.The configuration changes properly disable master queue consumers and set worker queue debounce interval, aligning with the architectural shift to worker queues.
70-73
: LGTM: Dequeue method updated correctly for worker queue architecture.The change from
dequeueFromMasterQueue
todequeueFromWorkerQueue
and the parameter name change frommasterQueue
toworkerQueue
properly implements the new architecture.
111-111
: LGTM: Trigger parameter updated consistently.The parameter change from
masterQueue
toworkerQueue
maintains consistency with the new architecture throughout the test flow.
This change restructures how dequeuing works in v4, increasing efficiency by an order of magnitude or more, by splitting a worker dequeuing a run and the concurrency calculations.
Previously, every single "dequeue" call from a worker (in dev, the worker is the dev CLI command, in deployed tasks, the worker is the QueueRaider), would have to search through queues until it found one with concurrency available, and then it would dequeue a single run from that queue. This mean that there was a LOT of wasted searching for eligible queues.
Now we've introduced a new data structure called a worker queue, that is a Redis list. Workers when dequeuing just do a BLPOP on their list and don't have to worry about doing any concurrency calculations.
Separately, we have internal consumers that are constantly searching for queues that have capacity and when they do, bulk dequeue messages and add them to their worker queue lists. The concurrency consumers now don't have to stop as soon as they get a single message from a single queue, they can just continuously search for runs to add to worker queues.
In addition to the consumers, we now can eagerly move runs to worker queues when runs are triggered or completed (e.g. enqueued or acked), which means even LESS searching than before since it's more targeted.
Having the worker lists also gives us the ability to know almost exactly how many runs are ready to be executed which haven't executed yet (the size of the worker queue list), giving us handy metrics and something we can eventually use for auto-scaling.