diff --git a/.vscode/launch.json b/.vscode/launch.json index e8bb90b562..e2659d148a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -138,7 +138,7 @@ "type": "node-terminal", "request": "launch", "name": "Debug RunEngine tests", - "command": "pnpm run test ./src/engine/tests/releaseConcurrencyQueue.test.ts -t 'Should manage token bucket and queue correctly'", + "command": "pnpm run test ./src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts -t 'Should retrieve metrics for all queues via getQueueMetrics'", "cwd": "${workspaceFolder}/internal-packages/run-engine", "sourceMaps": true }, diff --git a/internal-packages/redis/src/index.ts b/internal-packages/redis/src/index.ts index fd0d4d25d6..bdd69315cf 100644 --- a/internal-packages/redis/src/index.ts +++ b/internal-packages/redis/src/index.ts @@ -8,7 +8,7 @@ const defaultOptions: Partial = { const delay = Math.min(times * 50, 1000); return delay; }, - maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 1 : 20, + maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 5 : 20, }; const logger = new Logger("Redis", "debug"); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 55ac2f258c..21993d809e 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -219,7 +219,7 @@ export class RunEngine { pollInterval: options.releaseConcurrency?.pollInterval ?? 1000, batchSize: options.releaseConcurrency?.batchSize ?? 10, executor: async (descriptor, snapshotId) => { - await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot( + return await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot( snapshotId ); }, diff --git a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts index 44849445b1..d932803694 100644 --- a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts +++ b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts @@ -1,8 +1,9 @@ import { Callback, createRedisClient, Redis, Result, type RedisOptions } from "@internal/redis"; -import { Tracer } from "@internal/tracing"; +import { startSpan, Tracer } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; -import { setInterval } from "node:timers/promises"; import { z } from "zod"; +import { setInterval } from "node:timers/promises"; +import { flattenAttributes } from "@trigger.dev/core/v3"; export type ReleaseConcurrencyQueueRetryOptions = { maxRetries?: number; @@ -15,7 +16,10 @@ export type ReleaseConcurrencyQueueRetryOptions = { export type ReleaseConcurrencyQueueOptions = { redis: RedisOptions; - executor: (releaseQueue: T, releaserId: string) => Promise; + /** + * @returns true if the run was successful, false if the token should be returned to the bucket + */ + executor: (releaseQueue: T, releaserId: string) => Promise; keys: { fromDescriptor: (releaseQueue: T) => string; toDescriptor: (releaseQueue: string) => T; @@ -78,6 +82,7 @@ export class ReleaseConcurrencyTokenBucketQueue { if (!options.disableConsumers) { this.#startConsumers(); + this.#startMetricsProducer(); } } @@ -119,7 +124,7 @@ export class ReleaseConcurrencyTokenBucketQueue { String(Date.now()) ); - this.logger.debug("Consumed token in attemptToRelease", { + this.logger.info("Consumed token in attemptToRelease", { releaseQueueDescriptor, releaserId, maxTokens, @@ -270,7 +275,7 @@ export class ReleaseConcurrencyTokenBucketQueue { return false; } - await Promise.all( + await Promise.allSettled( result.map(([queue, releaserId, metadata]) => { const itemMetadata = QueueItemMetadata.parse(JSON.parse(metadata)); const releaseQueueDescriptor = this.keys.toDescriptor(queue); @@ -283,9 +288,29 @@ export class ReleaseConcurrencyTokenBucketQueue { async #callExecutor(releaseQueueDescriptor: T, releaserId: string, metadata: QueueItemMetadata) { try { - this.logger.info("Executing run:", { releaseQueueDescriptor, releaserId }); + this.logger.info("Calling executor for release", { releaseQueueDescriptor, releaserId }); + + const released = await this.options.executor(releaseQueueDescriptor, releaserId); - await this.options.executor(releaseQueueDescriptor, releaserId); + if (released) { + this.logger.info("Executor released concurrency", { releaseQueueDescriptor, releaserId }); + } else { + this.logger.info("Executor did not release concurrency", { + releaseQueueDescriptor, + releaserId, + }); + + // Return the token but don't requeue + const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor); + await this.redis.returnTokenOnly( + this.masterQueuesKey, + this.#bucketKey(releaseQueue), + this.#queueKey(releaseQueue), + this.#metadataKey(releaseQueue), + releaseQueue, + releaserId + ); + } } catch (error) { this.logger.error("Error executing run:", { error }); @@ -374,6 +399,30 @@ export class ReleaseConcurrencyTokenBucketQueue { } } + async #startMetricsProducer() { + try { + // Produce metrics every 60 seconds, using a tracer span + for await (const _ of setInterval(60_000)) { + const metrics = await this.getQueueMetrics(); + this.logger.info("Queue metrics:", { metrics }); + + await startSpan( + this.options.tracer, + "ReleaseConcurrencyTokenBucketQueue.metrics", + async (span) => {}, + { + attributes: { + ...flattenAttributes(metrics, "queues"), + forceRecording: true, + }, + } + ); + } + } catch (error) { + this.logger.error("Error starting metrics producer:", { error }); + } + } + #calculateBackoffScore(item: QueueItemMetadata): string { const delay = Math.min( this.backoff.maxDelay, @@ -382,6 +431,137 @@ export class ReleaseConcurrencyTokenBucketQueue { return String(Date.now() + delay); } + async getQueueMetrics(): Promise< + Array<{ releaseQueue: string; currentTokens: number; queueLength: number }> + > { + const streamRedis = this.redis.duplicate(); + const queuePattern = `${this.keyPrefix}*:queue`; + const stream = streamRedis.scanStream({ + match: queuePattern, + type: "zset", + count: 100, + }); + + let resolvePromise: ( + value: Array<{ releaseQueue: string; currentTokens: number; queueLength: number }> + ) => void; + let rejectPromise: (reason?: any) => void; + + const promise = new Promise< + Array<{ releaseQueue: string; currentTokens: number; queueLength: number }> + >((resolve, reject) => { + resolvePromise = resolve; + rejectPromise = reject; + }); + + const metrics: Map< + string, + { releaseQueue: string; currentTokens: number; queueLength: number } + > = new Map(); + + async function getMetricsForKeys(queueKeys: string[]) { + if (queueKeys.length === 0) { + return []; + } + + const pipeline = streamRedis.pipeline(); + + queueKeys.forEach((queueKey) => { + const releaseQueue = queueKey + .replace(":queue", "") + .replace(streamRedis.options.keyPrefix ?? "", ""); + const bucketKey = `${releaseQueue}:bucket`; + + pipeline.get(bucketKey); + pipeline.zcard(`${releaseQueue}:queue`); + }); + + const result = await pipeline.exec(); + + if (!result) { + return []; + } + + const results = result.map(([resultError, queueLengthOrCurrentTokens]) => { + if (resultError) { + return null; + } + + return queueLengthOrCurrentTokens ? Number(queueLengthOrCurrentTokens) : 0; + }); + + // Now zip the results with the queue keys + const zippedResults = queueKeys.map((queueKey, index) => { + const releaseQueue = queueKey + .replace(":queue", "") + .replace(streamRedis.options.keyPrefix ?? "", ""); + + // Current tokens are at indexes 0, 2, 4, 6, etc. + // Queue length are at indexes 1, 3, 5, 7, etc. + + const currentTokens = results[index * 2]; + const queueLength = results[index * 2 + 1]; + + if (typeof currentTokens !== "number" || typeof queueLength !== "number") { + return null; + } + + return { + releaseQueue, + currentTokens: currentTokens, + queueLength: queueLength, + }; + }); + + return zippedResults.filter((result) => result !== null); + } + + stream.on("end", () => { + streamRedis.quit(); + resolvePromise(Array.from(metrics.values())); + }); + + stream.on("error", (error) => { + this.logger.error("Error getting queue metrics:", { error }); + + stream.pause(); + streamRedis.quit(); + rejectPromise(error); + }); + + stream.on("data", async (keys) => { + stream.pause(); + + const uniqueKeys = Array.from(new Set(keys)); + + if (uniqueKeys.length === 0) { + stream.resume(); + return; + } + + const unresolvedKeys = uniqueKeys.filter((key) => !metrics.has(key)); + + if (unresolvedKeys.length === 0) { + stream.resume(); + return; + } + + this.logger.debug("Fetching queue metrics for keys", { keys: uniqueKeys }); + + await getMetricsForKeys(unresolvedKeys).then((results) => { + results.forEach((result) => { + if (result) { + metrics.set(result.releaseQueue, result); + } + }); + + stream.resume(); + }); + }); + + return promise; + } + #registerCommands() { this.redis.defineCommand("consumeToken", { numberOfKeys: 4, @@ -401,7 +581,9 @@ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens) -- If we have enough tokens, then consume them if currentTokens >= 1 then - redis.call("SET", bucketKey, currentTokens - 1) + local newCurrentTokens = currentTokens - 1 + + redis.call("SET", bucketKey, newCurrentTokens) redis.call("ZREM", queueKey, releaserId) -- Clean up metadata when successfully consuming @@ -411,8 +593,8 @@ if currentTokens >= 1 then local queueLength = redis.call("ZCARD", queueKey) -- If we still have tokens and items in queue, update available queues - if currentTokens > 0 and queueLength > 0 then - redis.call("ZADD", masterQueuesKey, currentTokens, releaseQueue) + if newCurrentTokens > 0 and queueLength > 0 then + redis.call("ZADD", masterQueuesKey, newCurrentTokens, releaseQueue) else redis.call("ZREM", masterQueuesKey, releaseQueue) end diff --git a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts index bac9be1412..6798780b4a 100644 --- a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts +++ b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts @@ -104,9 +104,9 @@ export class ReleaseConcurrencySystem { ); } - public async executeReleaseConcurrencyForSnapshot(snapshotId: string) { + public async executeReleaseConcurrencyForSnapshot(snapshotId: string): Promise { if (!this.releaseConcurrencyQueue) { - return; + return false; } this.$.logger.debug("Executing released concurrency", { @@ -136,14 +136,14 @@ export class ReleaseConcurrencySystem { snapshotId, }); - return; + return false; } // - Runlock the run // - Get latest snapshot // - If the run is non suspended or going to be, then bail // - If the run is suspended or going to be, then release the concurrency - await this.$.runLock.lock([snapshot.runId], 5_000, async () => { + return await this.$.runLock.lock([snapshot.runId], 5_000, async () => { const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId); const isValidSnapshot = @@ -159,7 +159,7 @@ export class ReleaseConcurrencySystem { snapshot, }); - return; + return false; } if (!canReleaseConcurrency(latestSnapshot.executionStatus)) { @@ -168,20 +168,21 @@ export class ReleaseConcurrencySystem { snapshot: latestSnapshot, }); - return; + return false; } const metadata = this.#parseMetadata(snapshot.metadata); if (typeof metadata.releaseConcurrency === "boolean") { if (metadata.releaseConcurrency) { - return await this.$.runQueue.releaseAllConcurrency( - snapshot.organizationId, - snapshot.runId - ); + await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId); + + return true; } - return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId); + await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId); + + return true; } // Get the locked queue @@ -198,10 +199,14 @@ export class ReleaseConcurrencySystem { (typeof taskQueue.concurrencyLimit === "undefined" || taskQueue.releaseConcurrencyOnWaitpoint) ) { - return await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId); + await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId); + + return true; } - return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId); + await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId); + + return true; }); } diff --git a/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts b/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts index 0c416457d9..caed8ccff9 100644 --- a/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts +++ b/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts @@ -20,6 +20,7 @@ function createReleaseConcurrencyQueue( }, executor: async (releaseQueue, runId) => { executedRuns.push({ releaseQueue: releaseQueue.name, runId }); + return true; }, maxTokens: async (_) => maxTokens, keys: { @@ -221,6 +222,7 @@ describe("ReleaseConcurrencyQueue", () => { throw new Error("Executor failed"); } executedRuns.push({ releaseQueue, runId }); + return true; }, maxTokens: async (_) => 2, keys: { @@ -299,6 +301,7 @@ describe("ReleaseConcurrencyQueue", () => { // Add small delay to simulate work await setTimeout(10); executedRuns.push({ releaseQueue, runId }); + return true; }, keys: { fromDescriptor: (descriptor) => descriptor, @@ -419,6 +422,7 @@ describe("ReleaseConcurrencyQueue", () => { }, executor: async (releaseQueue, runId) => { secondRunAttempted = true; + return true; }, keys: { fromDescriptor: (descriptor) => descriptor, @@ -516,6 +520,63 @@ describe("ReleaseConcurrencyQueue", () => { } }); + redisTest( + "Should return token but not requeue when executor returns false", + async ({ redisContainer }) => { + const executedRuns: { releaseQueue: string; runId: string }[] = []; + const runResults: Record = { + run1: true, // This will succeed + run2: false, // This will return false, returning the token without requeuing + run3: true, // This should execute immediately when run2's token is returned + }; + + const queue = new ReleaseConcurrencyTokenBucketQueue({ + redis: { + keyPrefix: "release-queue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + executor: async (releaseQueue, runId) => { + const success = runResults[runId]; + + executedRuns.push({ releaseQueue, runId }); + + return success; + }, + keys: { + fromDescriptor: (descriptor) => descriptor, + toDescriptor: (name) => name, + }, + maxTokens: async (_) => 2, // Only 2 tokens available at a time + pollInterval: 100, + }); + + try { + // First run should execute and succeed + await queue.attemptToRelease("test-queue", "run1"); + expect(executedRuns).toHaveLength(1); + expect(executedRuns[0]).toEqual({ releaseQueue: "test-queue", runId: "run1" }); + + // Second run should execute but return false, returning the token + await queue.attemptToRelease("test-queue", "run2"); + expect(executedRuns).toHaveLength(2); + expect(executedRuns[1]).toEqual({ releaseQueue: "test-queue", runId: "run2" }); + + // Third run should be able to execute immediately since run2 returned its token + await queue.attemptToRelease("test-queue", "run3"); + + expect(executedRuns).toHaveLength(3); + expect(executedRuns[2]).toEqual({ releaseQueue: "test-queue", runId: "run3" }); + + // Verify that run2 was not retried (it should have been skipped) + const run2Attempts = executedRuns.filter((r) => r.runId === "run2"); + expect(run2Attempts).toHaveLength(1); // Only executed once, not retried + } finally { + await queue.quit(); + } + } + ); + redisTest("Should implement exponential backoff between retries", async ({ redisContainer }) => { const executionTimes: number[] = []; let startTime: number; @@ -618,4 +679,38 @@ describe("ReleaseConcurrencyQueue", () => { await queue.quit(); } }); + + redisTest( + "Should retrieve metrics for all queues via getQueueMetrics", + async ({ redisContainer }) => { + const { queue } = createReleaseConcurrencyQueue(redisContainer, 1); + + // Set up multiple queues with different states + await queue.attemptToRelease({ name: "metrics-queue1" }, "run1"); // Consume 1 token from queue1 + + // Add more items to queue1 that will be queued due to no tokens + await queue.attemptToRelease({ name: "metrics-queue1" }, "run2"); // This will be queued + await queue.attemptToRelease({ name: "metrics-queue1" }, "run3"); // This will be queued + await queue.attemptToRelease({ name: "metrics-queue1" }, "run4"); // This will be queued + + const metrics = await queue.getQueueMetrics(); + + expect(metrics).toHaveLength(1); + expect(metrics[0].releaseQueue).toBe("metrics-queue1"); + expect(metrics[0].currentTokens).toBe(0); + expect(metrics[0].queueLength).toBe(3); + + // Now add 10 items to 100 different queues + for (let i = 0; i < 100; i++) { + for (let j = 0; j < 10; j++) { + await queue.attemptToRelease({ name: `metrics-queue2-${i}` }, `run${i}-${j}`); + } + } + + const metrics2 = await queue.getQueueMetrics(); + expect(metrics2.length).toBeGreaterThan(90); + + await queue.quit(); + } + ); });