From 9e0ecf71188a3b04b63d801e84222e590c628eac Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 20 May 2025 12:30:41 +0100 Subject: [PATCH 1/4] Refill release concurrency tokens when a run is cancelled --- apps/webapp/app/components/admin/debugRun.tsx | 62 ++++++- .../resources.taskruns.$runParam.debug.ts | 157 ++++++++++++++---- .../run-engine/src/engine/index.ts | 1 + .../src/engine/systems/runAttemptSystem.ts | 6 + 4 files changed, 196 insertions(+), 30 deletions(-) diff --git a/apps/webapp/app/components/admin/debugRun.tsx b/apps/webapp/app/components/admin/debugRun.tsx index 894a2a3bc9..5d5386a58a 100644 --- a/apps/webapp/app/components/admin/debugRun.tsx +++ b/apps/webapp/app/components/admin/debugRun.tsx @@ -66,7 +66,15 @@ function DebugRunContent({ friendlyId }: { friendlyId: string }) { ); } -function DebugRunData({ +function DebugRunData(props: UseDataFunctionReturn) { + if (props.engine === "V1") { + return ; + } + + return ; +} + +function DebugRunDataEngineV1({ run, queueConcurrencyLimit, queueCurrentConcurrency, @@ -338,3 +346,55 @@ function DebugRunData({ ); } + +function DebugRunDataEngineV2({ + run, + queueConcurrencyLimit, + queueCurrentConcurrency, + envConcurrencyLimit, + envCurrentConcurrency, + keys, +}: UseDataFunctionReturn) { + return ( + + + ID + + + + + + Queue current concurrency + + {queueCurrentConcurrency ?? "0"} + + + + Queue concurrency limit + + {queueConcurrencyLimit ?? "Not set"} + + + + Env current concurrency + + {envCurrentConcurrency ?? "0"} + + + + Env concurrency limit + + {envConcurrencyLimit ?? "Not set"} + + + {keys.map((key) => ( + + {key.label} + + + + + ))} + + ); +} diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts index 3d8681b1be..bd98255b61 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts @@ -4,6 +4,7 @@ import { z } from "zod"; import { $replica } from "~/db.server"; import { requireUserId } from "~/services/session.server"; import { marqs } from "~/v3/marqs/index.server"; +import { engine } from "~/v3/runEngine.server"; const ParamSchema = z.object({ runParam: z.string(), @@ -17,6 +18,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { where: { friendlyId: runParam, project: { organization: { members: { some: { userId } } } } }, select: { id: true, + engine: true, friendlyId: true, queue: true, concurrencyKey: true, @@ -27,6 +29,8 @@ export async function loader({ request, params }: LoaderFunctionArgs) { type: true, slug: true, organizationId: true, + project: true, + maximumConcurrencyLimit: true, organization: { select: { id: true, @@ -41,33 +45,128 @@ export async function loader({ request, params }: LoaderFunctionArgs) { throw new Response("Not Found", { status: 404 }); } - const queueConcurrencyLimit = await marqs.getQueueConcurrencyLimit( - run.runtimeEnvironment, - run.queue - ); - const envConcurrencyLimit = await marqs.getEnvConcurrencyLimit(run.runtimeEnvironment); - const queueCurrentConcurrency = await marqs.currentConcurrencyOfQueue( - run.runtimeEnvironment, - run.queue, - run.concurrencyKey ?? undefined - ); - const envCurrentConcurrency = await marqs.currentConcurrencyOfEnvironment(run.runtimeEnvironment); - - const queueReserveConcurrency = await marqs.reserveConcurrencyOfQueue( - run.runtimeEnvironment, - run.queue, - run.concurrencyKey ?? undefined - ); - - const envReserveConcurrency = await marqs.reserveConcurrencyOfEnvironment(run.runtimeEnvironment); - - return typedjson({ - run, - queueConcurrencyLimit, - envConcurrencyLimit, - queueCurrentConcurrency, - envCurrentConcurrency, - queueReserveConcurrency, - envReserveConcurrency, - }); + if (run.engine === "V1") { + const queueConcurrencyLimit = await marqs.getQueueConcurrencyLimit( + run.runtimeEnvironment, + run.queue + ); + const envConcurrencyLimit = await marqs.getEnvConcurrencyLimit(run.runtimeEnvironment); + const queueCurrentConcurrency = await marqs.currentConcurrencyOfQueue( + run.runtimeEnvironment, + run.queue, + run.concurrencyKey ?? undefined + ); + const envCurrentConcurrency = await marqs.currentConcurrencyOfEnvironment( + run.runtimeEnvironment + ); + + const queueReserveConcurrency = await marqs.reserveConcurrencyOfQueue( + run.runtimeEnvironment, + run.queue, + run.concurrencyKey ?? undefined + ); + + const envReserveConcurrency = await marqs.reserveConcurrencyOfEnvironment( + run.runtimeEnvironment + ); + + return typedjson({ + engine: "V1", + run, + queueConcurrencyLimit, + envConcurrencyLimit, + queueCurrentConcurrency, + envCurrentConcurrency, + queueReserveConcurrency, + envReserveConcurrency, + keys: [], + }); + } else { + const queueConcurrencyLimit = await engine.runQueue.getQueueConcurrencyLimit( + run.runtimeEnvironment, + run.queue + ); + + const envConcurrencyLimit = await engine.runQueue.getEnvConcurrencyLimit( + run.runtimeEnvironment + ); + + const queueCurrentConcurrency = await engine.runQueue.currentConcurrencyOfQueue( + run.runtimeEnvironment, + run.queue, + run.concurrencyKey ?? undefined + ); + + const envCurrentConcurrency = await engine.runQueue.currentConcurrencyOfEnvironment( + run.runtimeEnvironment + ); + + const queueCurrentConcurrencyKey = engine.runQueue.keys.currentConcurrencyKey( + run.runtimeEnvironment, + run.queue, + run.concurrencyKey ?? undefined + ); + + const envCurrentConcurrencyKey = engine.runQueue.keys.envCurrentConcurrencyKey( + run.runtimeEnvironment + ); + + const queueConcurrencyLimitKey = engine.runQueue.keys.queueConcurrencyLimitKey( + run.runtimeEnvironment, + run.queue + ); + + const envConcurrencyLimitKey = engine.runQueue.keys.envConcurrencyLimitKey( + run.runtimeEnvironment + ); + + const releaseConcurrencyBucketKey = `engine:release-concurrency:org:${run.runtimeEnvironment.organizationId}:proj:${run.runtimeEnvironment.project.id}:env:${run.runtimeEnvironment.id}:bucket`; + const releaseConcurrencyQueueKey = `engine:release-concurrency:org:${run.runtimeEnvironment.organizationId}:proj:${run.runtimeEnvironment.project.id}:env:${run.runtimeEnvironment.id}:queue`; + const releaseConcurrencyMetadataKey = `engine:release-concurrency:org:${run.runtimeEnvironment.organizationId}:proj:${run.runtimeEnvironment.project.id}:env:${run.runtimeEnvironment.id}:metadata`; + + const withPrefix = (key: string) => `engine:runqueue:${key}`; + + const keys = [ + { + label: "Queue current concurrency set", + key: withPrefix(queueCurrentConcurrencyKey), + }, + { + label: "Env current concurrency set", + key: withPrefix(envCurrentConcurrencyKey), + }, + { + label: "Queue concurrency limit", + key: withPrefix(queueConcurrencyLimitKey), + }, + { + label: "Env concurrency limit", + key: withPrefix(envConcurrencyLimitKey), + }, + { + label: "Release concurrency bucket", + key: releaseConcurrencyBucketKey, + }, + { + label: "Release concurrency queue", + key: releaseConcurrencyQueueKey, + }, + { + label: "Release concurrency metadata", + key: releaseConcurrencyMetadataKey, + }, + ]; + + return typedjson({ + engine: "V2", + run, + queueConcurrencyLimit, + envConcurrencyLimit, + queueCurrentConcurrency, + envCurrentConcurrency, + queueReserveConcurrency: undefined, + envReserveConcurrency: undefined, + keys, + }); + } } diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index e8eebe50fb..68716b8c89 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -306,6 +306,7 @@ export class RunEngine { delayedRunSystem: this.delayedRunSystem, machines: this.options.machines, retryWarmStartThresholdMs: this.options.retryWarmStartThresholdMs, + releaseConcurrencySystem: this.releaseConcurrencySystem, }); this.dequeueSystem = new DequeueSystem({ diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index bb3dce4fea..a63110c970 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -33,6 +33,7 @@ import { import { SystemResources } from "./systems.js"; import { WaitpointSystem } from "./waitpointSystem.js"; import { DelayedRunSystem } from "./delayedRunSystem.js"; +import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js"; export type RunAttemptSystemOptions = { resources: SystemResources; @@ -40,6 +41,7 @@ export type RunAttemptSystemOptions = { batchSystem: BatchSystem; waitpointSystem: WaitpointSystem; delayedRunSystem: DelayedRunSystem; + releaseConcurrencySystem: ReleaseConcurrencySystem; retryWarmStartThresholdMs?: number; machines: RunEngineOptions["machines"]; }; @@ -50,6 +52,7 @@ export class RunAttemptSystem { private readonly batchSystem: BatchSystem; private readonly waitpointSystem: WaitpointSystem; private readonly delayedRunSystem: DelayedRunSystem; + private readonly releaseConcurrencySystem: ReleaseConcurrencySystem; constructor(private readonly options: RunAttemptSystemOptions) { this.$ = options.resources; @@ -57,6 +60,7 @@ export class RunAttemptSystem { this.batchSystem = options.batchSystem; this.waitpointSystem = options.waitpointSystem; this.delayedRunSystem = options.delayedRunSystem; + this.releaseConcurrencySystem = options.releaseConcurrencySystem; } public async startRunAttempt({ @@ -1037,6 +1041,8 @@ export class RunAttemptSystem { //remove it from the queue and release concurrency await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId); + await this.releaseConcurrencySystem.refillTokensForSnapshot(latestSnapshot); + //if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status if (isExecuting(latestSnapshot.executionStatus)) { const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { From 2bd533d75a8d250cd5375cc5a95519c89fb5e694 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 20 May 2025 22:12:57 +0100 Subject: [PATCH 2/4] Improved release concurrency accounting system + a sweeper to auto-refill tokens for snapshots that are no longer the latest snapshot on a run (e.g. the run has moved to a new snapshot state) --- apps/webapp/app/env.server.ts | 5 + .../resources.taskruns.$runParam.debug.ts | 4 + apps/webapp/app/v3/runEngine.server.ts | 2 + .../run-engine/src/engine/index.ts | 30 +-- .../releaseConcurrencyTokenBucketQueue.ts | 152 +++++++++++++-- .../src/engine/systems/checkpointSystem.ts | 14 +- .../systems/releaseConcurrencySystem.ts | 115 ++++++++--- .../engine/tests/releaseConcurrency.test.ts | 181 ++++++++++++++++++ ...releaseConcurrencyTokenBucketQueue.test.ts | 39 ++-- .../run-engine/src/engine/types.ts | 2 + 10 files changed, 452 insertions(+), 92 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index e2325e4694..ab89f6edbb 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -605,6 +605,11 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"), RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"), RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1), + RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_MAX_AGE: z.coerce + .number() + .int() + .default(60_000 * 30), + RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_POLL_INTERVAL: z.coerce.number().int().default(60_000), RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES: z.coerce.number().int().default(3), RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT: z.coerce.number().int().default(1), RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL: z.coerce.number().int().default(500), diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts index bd98255b61..c900ca568e 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts @@ -155,6 +155,10 @@ export async function loader({ request, params }: LoaderFunctionArgs) { label: "Release concurrency metadata", key: releaseConcurrencyMetadataKey, }, + { + label: "Release concurrency releasings", + key: "engine:release-concurrency:releasings", + }, ]; return typedjson({ diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 01a9d3e5d4..28b447fb12 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -81,6 +81,8 @@ function createRunEngine() { disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0", disableConsumers: env.RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS === "1", maxTokensRatio: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO, + releasingsMaxAge: env.RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_MAX_AGE, + releasingsPollInterval: env.RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_POLL_INTERVAL, maxRetries: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES, consumersCount: env.RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT, pollInterval: env.RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 68716b8c89..1e7185f15e 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -201,6 +201,9 @@ export class RunEngine { this.releaseConcurrencySystem = new ReleaseConcurrencySystem({ resources, + maxTokensRatio: options.releaseConcurrency?.maxTokensRatio, + releasingsMaxAge: options.releaseConcurrency?.releasingsMaxAge, + releasingsPollInterval: options.releaseConcurrency?.releasingsPollInterval, queueOptions: typeof options.releaseConcurrency?.disabled === "boolean" && options.releaseConcurrency.disabled @@ -223,33 +226,6 @@ export class RunEngine { consumersCount: options.releaseConcurrency?.consumersCount ?? 1, pollInterval: options.releaseConcurrency?.pollInterval ?? 1000, batchSize: options.releaseConcurrency?.batchSize ?? 10, - executor: async (descriptor, snapshotId) => { - return await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot( - snapshotId - ); - }, - maxTokens: async (descriptor) => { - const environment = await this.prisma.runtimeEnvironment.findFirstOrThrow({ - where: { id: descriptor.envId }, - select: { - maximumConcurrencyLimit: true, - }, - }); - - return ( - environment.maximumConcurrencyLimit * - (options.releaseConcurrency?.maxTokensRatio ?? 1.0) - ); - }, - keys: { - fromDescriptor: (descriptor) => - `org:${descriptor.orgId}:proj:${descriptor.projectId}:env:${descriptor.envId}`, - toDescriptor: (name) => ({ - orgId: name.split(":")[1], - projectId: name.split(":")[3], - envId: name.split(":")[5], - }), - }, tracer: this.tracer, }, }); diff --git a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts index 48ed249326..69f0051cfd 100644 --- a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts +++ b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts @@ -14,12 +14,23 @@ export type ReleaseConcurrencyQueueRetryOptions = { }; }; +export type ReleaseConcurrencyValidatorResult = { + releaseQueue: T; + releaserId: string; + shouldRefill: boolean; +}; + export type ReleaseConcurrencyQueueOptions = { redis: RedisOptions; /** * @returns true if the run was successful, false if the token should be returned to the bucket */ executor: (releaseQueue: T, releaserId: string) => Promise; + validateReleaserId?: ( + releaserId: string + ) => Promise | undefined>; + releasingsMaxAge?: number; + releasingsPollInterval?: number; keys: { fromDescriptor: (releaseQueue: T) => string; toDescriptor: (releaseQueue: string) => T; @@ -47,6 +58,7 @@ export class ReleaseConcurrencyTokenBucketQueue { private logger: Logger; private abortController: AbortController; private consumers: ReleaseConcurrencyQueueConsumer[]; + private sweeper?: ReleaseConcurrencyReleasingsSweeper; private keyPrefix: string; private masterQueuesKey: string; @@ -61,7 +73,7 @@ export class ReleaseConcurrencyTokenBucketQueue { constructor(private readonly options: ReleaseConcurrencyQueueOptions) { this.redis = createRedisClient(options.redis); this.keyPrefix = options.redis.keyPrefix ?? "re2:release-concurrency-queue:"; - this.logger = options.logger ?? new Logger("ReleaseConcurrencyQueue"); + this.logger = options.logger ?? new Logger("ReleaseConcurrencyQueue", "debug"); this.abortController = new AbortController(); this.consumers = []; @@ -83,6 +95,7 @@ export class ReleaseConcurrencyTokenBucketQueue { if (!options.disableConsumers) { this.#startConsumers(); this.#startMetricsProducer(); + this.#startReleasingsSweeper(); } } @@ -118,6 +131,7 @@ export class ReleaseConcurrencyTokenBucketQueue { this.#bucketKey(releaseQueue), this.#queueKey(releaseQueue), this.#metadataKey(releaseQueue), + this.#releasingsKey(), releaseQueue, releaserId, String(maxTokens), @@ -172,6 +186,7 @@ export class ReleaseConcurrencyTokenBucketQueue { this.#bucketKey(releaseQueue), this.#queueKey(releaseQueue), this.#metadataKey(releaseQueue), + this.#releasingsKey(), releaseQueue, releaserId, String(maxTokens), @@ -204,6 +219,7 @@ export class ReleaseConcurrencyTokenBucketQueue { this.#bucketKey(releaseQueue), this.#queueKey(releaseQueue), this.#metadataKey(releaseQueue), + this.#releasingsKey(), releaseQueue, releaserId ); @@ -270,10 +286,10 @@ export class ReleaseConcurrencyTokenBucketQueue { } /** - * Refill a token only if the releaserId is not in the release queue. - * Returns true if the token was refilled, false if the releaserId was found in the queue. + * Refill a token only if the releaserId is in the releasings set. + * Returns true if the token was refilled, false if the releaserId was not found in the releasings set. */ - public async refillTokenIfNotInQueue( + public async refillTokenIfInReleasings( releaseQueueDescriptor: T, releaserId: string ): Promise { @@ -291,17 +307,18 @@ export class ReleaseConcurrencyTokenBucketQueue { return false; } - const result = await this.redis.refillTokenIfNotInQueue( + const result = await this.redis.refillTokenIfInReleasings( this.masterQueuesKey, this.#bucketKey(releaseQueue), this.#queueKey(releaseQueue), this.#metadataKey(releaseQueue), + this.#releasingsKey(), releaseQueue, releaserId, String(maxTokens) ); - this.logger.debug("Attempted to refill token if not in queue", { + this.logger.debug("Attempted to refill token if in releasings", { releaseQueueDescriptor, releaserId, maxTokens, @@ -360,6 +377,7 @@ export class ReleaseConcurrencyTokenBucketQueue { this.#bucketKey(releaseQueue), this.#queueKey(releaseQueue), this.#metadataKey(releaseQueue), + this.#releasingsKey(), releaseQueue, releaserId ); @@ -381,6 +399,7 @@ export class ReleaseConcurrencyTokenBucketQueue { this.#bucketKey(releaseQueue), this.#queueKey(releaseQueue), this.#metadataKey(releaseQueue), + this.#releasingsKey(), releaseQueue, releaserId ); @@ -434,6 +453,10 @@ export class ReleaseConcurrencyTokenBucketQueue { return `${releaseQueue}:metadata`; } + #releasingsKey() { + return "releasings"; + } + #startConsumers() { const consumerCount = this.consumersCount; @@ -452,6 +475,20 @@ export class ReleaseConcurrencyTokenBucketQueue { } } + #startReleasingsSweeper() { + if (this.options.validateReleaserId) { + this.sweeper = new ReleaseConcurrencyReleasingsSweeper( + this, + this.options.validateReleaserId, + this.options.releasingsMaxAge ?? 60_000 * 30, + this.options.releasingsPollInterval ?? 60_000, + this.abortController.signal, + this.logger + ); + this.sweeper.start(); + } + } + async #startMetricsProducer() { try { // Produce metrics every 60 seconds, using a tracer span @@ -615,14 +652,28 @@ export class ReleaseConcurrencyTokenBucketQueue { return promise; } + async getReleasings(maxAge: number) { + const releasings = await this.redis.zrangebyscore( + this.#releasingsKey(), + 0, + Date.now() - maxAge + ); + return releasings; + } + + async removeReleaserIdFromReleasings(releaserId: string) { + await this.redis.zrem(this.#releasingsKey(), releaserId); + } + #registerCommands() { this.redis.defineCommand("consumeToken", { - numberOfKeys: 4, + numberOfKeys: 5, lua: ` local masterQueuesKey = KEYS[1] local bucketKey = KEYS[2] local queueKey = KEYS[3] local metadataKey = KEYS[4] +local releasingsKey = KEYS[5] local releaseQueue = ARGV[1] local releaserId = ARGV[2] @@ -638,6 +689,7 @@ if currentTokens >= 1 then redis.call("SET", bucketKey, newCurrentTokens) redis.call("ZREM", queueKey, releaserId) + redis.call("ZADD", releasingsKey, score, releaserId) -- Clean up metadata when successfully consuming redis.call("HDEL", metadataKey, releaserId) @@ -802,21 +854,28 @@ else redis.call("ZREM", masterQueuesKey, releaseQueue) end -return true +return redis.status_reply("true") `, }); this.redis.defineCommand("returnTokenOnly", { - numberOfKeys: 4, + numberOfKeys: 5, lua: ` local masterQueuesKey = KEYS[1] local bucketKey = KEYS[2] local queueKey = KEYS[3] local metadataKey = KEYS[4] +local releasingsKey = KEYS[5] local releaseQueue = ARGV[1] local releaserId = ARGV[2] +local removedFromReleasings = redis.call("ZREM", releasingsKey, releaserId) + +if removedFromReleasings == 0 then + return redis.status_reply("false") +end + -- Return the token to the bucket local currentTokens = tonumber(redis.call("GET", bucketKey)) local remainingTokens = currentTokens + 1 @@ -833,26 +892,26 @@ else redis.call("ZREM", masterQueuesKey, releaseQueue) end -return true +return redis.status_reply("true") `, }); - this.redis.defineCommand("refillTokenIfNotInQueue", { - numberOfKeys: 4, + this.redis.defineCommand("refillTokenIfInReleasings", { + numberOfKeys: 5, lua: ` local masterQueuesKey = KEYS[1] local bucketKey = KEYS[2] local queueKey = KEYS[3] local metadataKey = KEYS[4] +local releasingsKey = KEYS[5] local releaseQueue = ARGV[1] local releaserId = ARGV[2] local maxTokens = tonumber(ARGV[3]) --- Check if the releaserId is in the queue -local score = redis.call("ZSCORE", queueKey, releaserId) -if score then - -- Item is in queue, don't refill token +local removedFromReleasings = redis.call("ZREM", releasingsKey, releaserId) + +if removedFromReleasings == 0 then return redis.status_reply("false") end @@ -891,6 +950,7 @@ declare module "@internal/redis" { bucketKey: string, queueKey: string, metadataKey: string, + releasingsKey: string, releaseQueue: string, releaserId: string, maxTokens: string, @@ -933,16 +993,18 @@ declare module "@internal/redis" { bucketKey: string, queueKey: string, metadataKey: string, + releasingsKey: string, releaseQueue: string, releaserId: string, callback?: Callback ): Result; - refillTokenIfNotInQueue( + refillTokenIfInReleasings( masterQueuesKey: string, bucketKey: string, queueKey: string, metadataKey: string, + releasingsKey: string, releaseQueue: string, releaserId: string, maxTokens: string, @@ -982,3 +1044,59 @@ class ReleaseConcurrencyQueueConsumer { } } } + +class ReleaseConcurrencyReleasingsSweeper { + private readonly logger: Logger; + + constructor( + private readonly queue: ReleaseConcurrencyTokenBucketQueue, + private readonly validateReleaserId: ( + releaserId: string + ) => Promise | undefined>, + private readonly pollInterval: number, + private readonly maxAge: number, + private readonly signal: AbortSignal, + logger?: Logger + ) { + this.queue = queue; + this.logger = logger ?? new Logger("ReleaseConcurrencyReleasingsSweeper"); + } + + async start() { + try { + for await (const _ of setInterval(this.pollInterval, null, { signal: this.signal })) { + try { + await this.sweep(); + } catch (error) { + this.logger.error("Error sweeping releasings:", { error }); + } + } + } catch (error) { + if (error instanceof Error && error.name !== "AbortError") { + this.logger.error("Sweeper loop error:", { error }); + } + } + } + + private async sweep() { + const releasings = await this.queue.getReleasings(this.maxAge); + + this.logger.debug("Sweeping releasings:", { releasings }); + + for (const releaserId of releasings) { + const result = await this.validateReleaserId(releaserId); + + this.logger.debug("Validated releaserId:", { releaserId, result }); + + if (!result) { + // We need to remove the releaserId from the releasings set + await this.queue.removeReleaserIdFromReleasings(releaserId); + continue; + } + + if (result.shouldRefill) { + await this.queue.refillTokenIfInReleasings(result.releaseQueue, result.releaserId); + } + } + } +} diff --git a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts index 8677d55ff4..3fe96ea238 100644 --- a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts @@ -195,8 +195,14 @@ export class CheckpointSystem { checkpointId: taskRunCheckpoint.id, }); + this.$.logger.debug("Refilling token bucket for release concurrency queue", { + snapshot, + }); + // Refill the token bucket for the release concurrency queue - await this.releaseConcurrencySystem.checkpointCreatedOnEnvironment(run.runtimeEnvironment); + await this.releaseConcurrencySystem.refillTokensForSnapshot( + snapshot.previousSnapshotId ?? snapshot.id + ); return { ok: true as const, @@ -227,8 +233,12 @@ export class CheckpointSystem { runnerId, }); + this.$.logger.debug("Refilling token bucket for release concurrency queue", { + snapshot, + }); + // Refill the token bucket for the release concurrency queue - await this.releaseConcurrencySystem.checkpointCreatedOnEnvironment(run.runtimeEnvironment); + await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot.id); return { ok: true as const, diff --git a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts index 9cd721d1a1..0f9583df4f 100644 --- a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts +++ b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts @@ -1,12 +1,12 @@ -import { RuntimeEnvironment, TaskRunExecutionSnapshot } from "@trigger.dev/database"; -import { SystemResources } from "./systems.js"; -import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js"; -import { canReleaseConcurrency } from "../statuses.js"; +import { TaskRunExecutionSnapshot } from "@trigger.dev/database"; import { z } from "zod"; import { ReleaseConcurrencyQueueOptions, ReleaseConcurrencyTokenBucketQueue, } from "../releaseConcurrencyTokenBucketQueue.js"; +import { canReleaseConcurrency } from "../statuses.js"; +import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js"; +import { SystemResources } from "./systems.js"; const ReleaseConcurrencyMetadata = z.object({ releaseConcurrency: z.boolean().optional(), @@ -16,11 +16,17 @@ type ReleaseConcurrencyMetadata = z.infer; export type ReleaseConcurrencySystemOptions = { resources: SystemResources; - queueOptions?: ReleaseConcurrencyQueueOptions<{ - orgId: string; - projectId: string; - envId: string; - }>; + maxTokensRatio?: number; + releasingsMaxAge?: number; + releasingsPollInterval?: number; + queueOptions?: Omit< + ReleaseConcurrencyQueueOptions<{ + orgId: string; + projectId: string; + envId: string; + }>, + "executor" | "validateReleaserId" | "keys" | "maxTokens" + >; }; export class ReleaseConcurrencySystem { @@ -35,10 +41,79 @@ export class ReleaseConcurrencySystem { this.$ = options.resources; if (options.queueOptions) { - this.releaseConcurrencyQueue = new ReleaseConcurrencyTokenBucketQueue(options.queueOptions); + this.releaseConcurrencyQueue = new ReleaseConcurrencyTokenBucketQueue({ + ...options.queueOptions, + releasingsMaxAge: this.options.releasingsMaxAge, + releasingsPollInterval: this.options.releasingsPollInterval, + executor: async (descriptor, snapshotId) => { + return await this.executeReleaseConcurrencyForSnapshot(snapshotId); + }, + keys: { + fromDescriptor: (descriptor) => + `org:${descriptor.orgId}:proj:${descriptor.projectId}:env:${descriptor.envId}`, + toDescriptor: (name) => ({ + orgId: name.split(":")[1], + projectId: name.split(":")[3], + envId: name.split(":")[5], + }), + }, + maxTokens: async (descriptor) => { + const environment = await this.$.prisma.runtimeEnvironment.findFirstOrThrow({ + where: { id: descriptor.envId }, + select: { + maximumConcurrencyLimit: true, + }, + }); + + return environment.maximumConcurrencyLimit * (this.options.maxTokensRatio ?? 1.0); + }, + validateReleaserId: async (releaserId) => { + return this.validateSnapshotShouldRefillToken(releaserId); + }, + }); } } + async validateSnapshotShouldRefillToken(releaserId: string) { + const snapshot = await this.$.prisma.taskRunExecutionSnapshot.findFirst({ + where: { id: releaserId }, + select: { + id: true, + run: { + select: { + id: true, + status: true, + }, + }, + organizationId: true, + projectId: true, + environmentId: true, + executionStatus: true, + }, + }); + + if (!snapshot) { + return; + } + + const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.run.id); + + this.$.logger.debug("Checking if snapshot should refill", { + snapshot, + latestSnapshot, + }); + + return { + releaseQueue: { + orgId: snapshot.organizationId, + projectId: snapshot.projectId, + envId: snapshot.environmentId, + }, + releaserId: snapshot.id, + shouldRefill: latestSnapshot.id !== snapshot.id, + }; + } + public async consumeToken( descriptor: { orgId: string; projectId: string; envId: string }, releaserId: string @@ -50,6 +125,9 @@ export class ReleaseConcurrencySystem { await this.releaseConcurrencyQueue.consumeToken(descriptor, releaserId); } + /** + * This is used in tests only + */ public async returnToken( descriptor: { orgId: string; projectId: string; envId: string }, releaserId: string @@ -105,7 +183,7 @@ export class ReleaseConcurrencySystem { return; } - await this.releaseConcurrencyQueue.refillTokenIfNotInQueue( + await this.releaseConcurrencyQueue.refillTokenIfInReleasings( { orgId: snapshot.organizationId, projectId: snapshot.projectId, @@ -115,21 +193,6 @@ export class ReleaseConcurrencySystem { ); } - public async checkpointCreatedOnEnvironment(environment: RuntimeEnvironment) { - if (!this.releaseConcurrencyQueue) { - return; - } - - await this.releaseConcurrencyQueue.refillTokens( - { - orgId: environment.organizationId, - projectId: environment.projectId, - envId: environment.id, - }, - 1 - ); - } - public async releaseConcurrencyForSnapshot(snapshot: TaskRunExecutionSnapshot) { if (!this.releaseConcurrencyQueue) { this.$.logger.debug("Release concurrency queue not enabled, skipping release", { diff --git a/internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts b/internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts index 3602810e6a..41a4fb7abc 100644 --- a/internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts +++ b/internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts @@ -1504,4 +1504,185 @@ describe("RunEngine Releasing Concurrency", () => { expect(queueMetricsAfterSecondFinished?.currentTokens).toBe(10); } ); + + containerTest( + "refills token bucket after the run has a new snapshot created by the release concurrency sweeper system", + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + releaseConcurrency: { + maxTokensRatio: 1, + maxRetries: 3, + consumersCount: 1, + pollInterval: 500, + releasingsPollInterval: 500, + batchSize: 1, + releasingsMaxAge: 2_000, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + const taskIdentifier = "test-task"; + + await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier, + undefined, + undefined, + { + concurrencyLimit: 1, + } + ); + + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queue: `task/${taskIdentifier}`, + isTest: false, + tags: [], + }, + prisma + ); + + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + const queueConcurrency = await engine.runQueue.currentConcurrencyOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + + expect(queueConcurrency).toBe(1); + + const envConcurrency = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + + expect(envConcurrency).toBe(1); + + // create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + expect(attemptResult.snapshot.executionStatus).toBe("EXECUTING"); + + // create a manual waitpoint + const result = await engine.createManualWaitpoint({ + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + }); + + // Block the run, specifying the release concurrency option as true + const executingWithWaitpointSnapshot = await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: result.waitpoint.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + releaseConcurrency: true, + }); + + expect(executingWithWaitpointSnapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + // Now confirm the environment concurrency has been released + const envConcurrencyAfter = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + + expect(envConcurrencyAfter).toBe(0); + + const queueConcurrencyAfter = await engine.runQueue.currentConcurrencyOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + + expect(queueConcurrencyAfter).toBe(0); + + // And confirm the release concurrency system has consumed the token + const queueMetrics = + await engine.releaseConcurrencySystem.releaseConcurrencyQueue?.getReleaseQueueMetrics({ + orgId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + envId: authenticatedEnvironment.id, + }); + + expect(queueMetrics?.currentTokens).toBe(9); + + await setTimeout(3_000); + + const queueMetricsAfter = + await engine.releaseConcurrencySystem.releaseConcurrencyQueue?.getReleaseQueueMetrics({ + orgId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + envId: authenticatedEnvironment.id, + }); + + expect(queueMetricsAfter?.currentTokens).toBe(9); + + // Now we create a new snapshot for the run, which will cause the sweeper system to refill the token bucket + await engine.executionSnapshotSystem.createExecutionSnapshot(prisma, { + run, + snapshot: { + executionStatus: "PENDING_CANCEL", + description: "Pending cancel", + }, + environmentId: authenticatedEnvironment.id, + environmentType: "PRODUCTION", + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + + await setTimeout(3_000); + + const queueMetricsAfterRefill = + await engine.releaseConcurrencySystem.releaseConcurrencyQueue?.getReleaseQueueMetrics({ + orgId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + envId: authenticatedEnvironment.id, + }); + + expect(queueMetricsAfterRefill?.currentTokens).toBe(10); + } + ); }); diff --git a/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts b/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts index 1de58a2ccd..49541a9a68 100644 --- a/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts +++ b/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts @@ -388,7 +388,7 @@ describe("ReleaseConcurrencyQueue", () => { maxRetries: 2, // Set max retries to 2 (will attempt 3 times total: initial + 2 retries) backoff: { minDelay: 100, - maxDelay: 1000, + maxDelay: 200, factor: 1, }, }, @@ -715,26 +715,22 @@ describe("ReleaseConcurrencyQueue", () => { ); redisTest( - "refillTokenIfNotInQueue should refill token when releaserId is not in queue", + "refillTokenIfInReleasings should refill token when releaserId is in the releasings set", async ({ redisContainer }) => { const { queue, executedRuns } = createReleaseConcurrencyQueue(redisContainer, 2); try { // Use up all tokens await queue.attemptToRelease({ name: "test-queue" }, "run1"); - await queue.attemptToRelease({ name: "test-queue" }, "run2"); - - // Verify tokens were used - expect(executedRuns).toHaveLength(2); // Try to refill token for a releaserId that's not in queue - const wasRefilled = await queue.refillTokenIfNotInQueue({ name: "test-queue" }, "run3"); + const wasRefilled = await queue.refillTokenIfInReleasings({ name: "test-queue" }, "run1"); expect(wasRefilled).toBe(true); // Verify we can now execute a new run - await queue.attemptToRelease({ name: "test-queue" }, "run3"); + await queue.attemptToRelease({ name: "test-queue" }, "run2"); await setTimeout(100); - expect(executedRuns).toHaveLength(3); + expect(executedRuns).toHaveLength(2); } finally { await queue.quit(); } @@ -742,7 +738,7 @@ describe("ReleaseConcurrencyQueue", () => { ); redisTest( - "refillTokenIfNotInQueue should not refill token when releaserId is in queue", + "refillTokenIfInReleasings should not refill token when releaserId is not in the releasings set", async ({ redisContainer }) => { const { queue, executedRuns } = createReleaseConcurrencyQueue(redisContainer, 1); @@ -756,11 +752,11 @@ describe("ReleaseConcurrencyQueue", () => { expect(executedRuns).toHaveLength(1); // run2 is queued // Try to refill token for run2 which is in queue - const wasRefilled = await queue.refillTokenIfNotInQueue({ name: "test-queue" }, "run2"); + const wasRefilled = await queue.refillTokenIfInReleasings({ name: "test-queue" }, "run2"); expect(wasRefilled).toBe(false); // Verify run2 is still queued by refilling a token normally - await queue.refillTokens({ name: "test-queue" }, 1); + await queue.refillTokenIfInReleasings({ name: "test-queue" }, "run1"); await setTimeout(100); expect(executedRuns).toHaveLength(2); expect(executedRuns[1]).toEqual({ releaseQueue: "test-queue", runId: "run2" }); @@ -771,7 +767,7 @@ describe("ReleaseConcurrencyQueue", () => { ); redisTest( - "refillTokenIfNotInQueue should handle multiple queues independently", + "refillTokenIfInReleasings should handle multiple queues independently", async ({ redisContainer }) => { const { queue, executedRuns } = createReleaseConcurrencyQueue(redisContainer, 1); @@ -787,10 +783,10 @@ describe("ReleaseConcurrencyQueue", () => { expect(executedRuns).toHaveLength(2); // run3 and run4 are queued // Try to refill tokens for different releaserIds - const wasRefilled1 = await queue.refillTokenIfNotInQueue({ name: "queue1" }, "run5"); - const wasRefilled2 = await queue.refillTokenIfNotInQueue({ name: "queue2" }, "run4"); + const wasRefilled1 = await queue.refillTokenIfInReleasings({ name: "queue1" }, "run1"); + const wasRefilled2 = await queue.refillTokenIfInReleasings({ name: "queue2" }, "run4"); - expect(wasRefilled1).toBe(true); // run5 not in queue1 + expect(wasRefilled1).toBe(true); // run1 not in queue1 expect(wasRefilled2).toBe(false); // run4 is in queue2 // Verify queue1 can execute a new run with the refilled token @@ -804,17 +800,20 @@ describe("ReleaseConcurrencyQueue", () => { } ); - redisTest("refillTokenIfNotInQueue should not exceed maxTokens", async ({ redisContainer }) => { + redisTest("refillTokenIfInReleasings should not exceed maxTokens", async ({ redisContainer }) => { const { queue } = createReleaseConcurrencyQueue(redisContainer, 1); try { + // First consume a token + await queue.attemptToRelease({ name: "test-queue" }, "run1"); + // First refill should work - const firstRefill = await queue.refillTokenIfNotInQueue({ name: "test-queue" }, "run1"); + const firstRefill = await queue.refillTokenIfInReleasings({ name: "test-queue" }, "run1"); expect(firstRefill).toBe(true); // Second refill should work but not exceed maxTokens - const secondRefill = await queue.refillTokenIfNotInQueue({ name: "test-queue" }, "run2"); - expect(secondRefill).toBe(true); + const secondRefill = await queue.refillTokenIfInReleasings({ name: "test-queue" }, "run2"); + expect(secondRefill).toBe(false); // Get metrics to verify token count const metrics = await queue.getReleaseQueueMetrics({ name: "test-queue" }); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 14b38dc0e9..9e31f9c926 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -47,6 +47,8 @@ export type RunEngineOptions = { releaseConcurrency?: { disabled?: boolean; maxTokensRatio?: number; + releasingsMaxAge?: number; + releasingsPollInterval?: number; redis?: Partial; maxRetries?: number; consumersCount?: number; From 7602efacfd2dea7aac318ce8f40621ef7de21e1b Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 21 May 2025 09:18:27 +0100 Subject: [PATCH 3/4] Fix order of arguments to the releasings sweeper --- .../run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts index 69f0051cfd..6c146d4baa 100644 --- a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts +++ b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts @@ -480,8 +480,8 @@ export class ReleaseConcurrencyTokenBucketQueue { this.sweeper = new ReleaseConcurrencyReleasingsSweeper( this, this.options.validateReleaserId, - this.options.releasingsMaxAge ?? 60_000 * 30, this.options.releasingsPollInterval ?? 60_000, + this.options.releasingsMaxAge ?? 60_000 * 30, this.abortController.signal, this.logger ); From 9754bed12c1d7f93d4ae50801122c3500c8bcaab Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 21 May 2025 11:32:37 +0100 Subject: [PATCH 4/4] Add a heartbeat for SUSPENDED snapshots, where when stalled will attempt to continue the run if unblocked --- apps/webapp/app/env.server.ts | 4 + apps/webapp/app/v3/runEngine.server.ts | 1 + .../run-engine/src/engine/index.ts | 27 +++- .../engine/systems/executionSnapshotSystem.ts | 37 ++++- .../src/engine/systems/waitpointSystem.ts | 12 +- .../src/engine/tests/heartbeats.test.ts | 143 ++++++++++++++++++ .../run-engine/src/engine/types.ts | 1 + 7 files changed, 218 insertions(+), 7 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index ab89f6edbb..f4134672f1 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -452,6 +452,10 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_TIMEOUT_PENDING_CANCEL: z.coerce.number().int().default(60_000), RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(60_000), RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000), + RUN_ENGINE_TIMEOUT_SUSPENDED: z.coerce + .number() + .int() + .default(60_000 * 10), RUN_ENGINE_DEBUG_WORKER_NOTIFICATIONS: z.coerce.boolean().default(false), RUN_ENGINE_PARENT_QUEUE_LIMIT: z.coerce.number().int().default(1000), RUN_ENGINE_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 28b447fb12..bd619391e1 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -76,6 +76,7 @@ function createRunEngine() { PENDING_CANCEL: env.RUN_ENGINE_TIMEOUT_PENDING_CANCEL, EXECUTING: env.RUN_ENGINE_TIMEOUT_EXECUTING, EXECUTING_WITH_WAITPOINTS: env.RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS, + SUSPENDED: env.RUN_ENGINE_TIMEOUT_SUSPENDED, }, releaseConcurrency: { disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 1e7185f15e..5aad070707 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -182,6 +182,7 @@ export class RunEngine { PENDING_CANCEL: 60_000, EXECUTING: 60_000, EXECUTING_WITH_WAITPOINTS: 60_000, + SUSPENDED: 60_000 * 10, }; this.heartbeatTimeouts = { ...defaultHeartbeatTimeouts, @@ -1274,9 +1275,29 @@ export class RunEngine { break; } case "SUSPENDED": { - //todo should we do a periodic check here for whether waitpoints are actually still blocking? - //we could at least log some things out if a run has been in this state for a long time - throw new NotImplementedError("Not implemented SUSPENDED"); + const result = await this.waitpointSystem.continueRunIfUnblocked({ runId }); + + this.logger.info("handleStalledSnapshot SUSPENDED continueRunIfUnblocked", { + runId, + result, + snapshotId: latestSnapshot.id, + }); + + switch (result) { + case "blocked": { + // Reschedule the heartbeat + await this.executionSnapshotSystem.restartHeartbeatForRun({ + runId, + }); + break; + } + case "unblocked": + case "skipped": { + break; + } + } + + break; } case "PENDING_CANCEL": { //if the run is waiting to cancel but the worker hasn't confirmed that, diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index 314fb23f8e..8ab2de2da0 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -364,7 +364,7 @@ export class ExecutionSnapshotSystem { return executionResultFromSnapshot(latestSnapshot); } - if (latestSnapshot.workerId !== workerId) { + if (latestSnapshot.workerId && latestSnapshot.workerId !== workerId) { this.$.logger.debug("heartbeatRun: worker ID does not match the latest snapshot", { runId, snapshotId, @@ -394,6 +394,38 @@ export class ExecutionSnapshotSystem { return executionResultFromSnapshot(latestSnapshot); } + public async restartHeartbeatForRun({ + runId, + tx, + }: { + runId: string; + tx?: PrismaClientOrTransaction; + }): Promise { + const prisma = tx ?? this.$.prisma; + + const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); + + //extending the heartbeat + const intervalMs = this.#getHeartbeatIntervalMs(latestSnapshot.executionStatus); + + if (intervalMs !== null) { + this.$.logger.debug("restartHeartbeatForRun: enqueuing heartbeat", { + runId, + snapshotId: latestSnapshot.id, + intervalMs, + }); + + await this.$.worker.enqueue({ + id: `heartbeatSnapshot.${runId}`, + job: "heartbeatSnapshot", + payload: { snapshotId: latestSnapshot.id, runId }, + availableAt: new Date(Date.now() + intervalMs), + }); + } + + return executionResultFromSnapshot(latestSnapshot); + } + #getHeartbeatIntervalMs(status: TaskRunExecutionStatus): number | null { switch (status) { case "PENDING_EXECUTING": { @@ -408,6 +440,9 @@ export class ExecutionSnapshotSystem { case "EXECUTING_WITH_WAITPOINTS": { return this.heartbeatTimeouts.EXECUTING_WITH_WAITPOINTS; } + case "SUSPENDED": { + return this.heartbeatTimeouts.SUSPENDED; + } default: { return null; } diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index cfbc2caaa4..92f0885c07 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -490,7 +490,11 @@ export class WaitpointSystem { }); } - public async continueRunIfUnblocked({ runId }: { runId: string }) { + public async continueRunIfUnblocked({ + runId, + }: { + runId: string; + }): Promise<"blocked" | "unblocked" | "skipped"> { this.$.logger.debug(`continueRunIfUnblocked: start`, { runId, }); @@ -516,7 +520,7 @@ export class WaitpointSystem { runId, blockingWaitpoints, }); - return; + return "blocked"; } // 3. Get the run with environment @@ -553,7 +557,7 @@ export class WaitpointSystem { runId, snapshot, }); - return; + return "skipped"; } //run is still executing, send a message to the worker @@ -677,6 +681,8 @@ export class WaitpointSystem { runId, }); } + + return "unblocked"; } public async createRunAssociatedWaitpoint( diff --git a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts index ba6010da2c..5f2ef0325c 100644 --- a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts +++ b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts @@ -479,6 +479,149 @@ describe("RunEngine heartbeats", () => { } }); + containerTest("Suspended", async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const heartbeatTimeout = 1000; + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + heartbeatTimeoutsMs: { + SUSPENDED: heartbeatTimeout, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + const backgroundWorker = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier + ); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //cancel run + //create a manual waitpoint + const waitpointResult = await engine.createManualWaitpoint({ + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + }); + expect(waitpointResult.waitpoint.status).toBe("PENDING"); + + //block the run + const blockedResult = await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: waitpointResult.waitpoint.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + + const blockedExecutionData = await engine.getRunExecutionData({ runId: run.id }); + expect(blockedExecutionData?.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + // Create a checkpoint + const checkpointResult = await engine.createCheckpoint({ + runId: run.id, + snapshotId: blockedResult.id, + checkpoint: { + type: "DOCKER", + reason: "TEST_CHECKPOINT", + location: "test-location", + imageRef: "test-image-ref", + }, + }); + + expect(checkpointResult.ok).toBe(true); + + const snapshot = checkpointResult.ok ? checkpointResult.snapshot : null; + + assertNonNullable(snapshot); + + // Verify checkpoint creation + expect(snapshot.executionStatus).toBe("SUSPENDED"); + + // Now wait for the heartbeat to timeout, but it should retry later + await setTimeout(heartbeatTimeout * 1.5); + + // Simulate a suspended run without any blocking waitpoints by deleting any blocking task run waitpoints + await prisma.taskRunWaitpoint.deleteMany({ + where: { + taskRunId: run.id, + }, + }); + + // Now wait for the heartbeat to timeout again + await setTimeout(heartbeatTimeout * 2); + + // Expect the run to be queued + const executionData2 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData2); + expect(executionData2.snapshot.executionStatus).toBe("QUEUED"); + } finally { + await engine.quit(); + } + }); + containerTest("Heartbeat keeps run alive", async ({ prisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 9e31f9c926..84281bc505 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -68,6 +68,7 @@ export type HeartbeatTimeouts = { PENDING_CANCEL: number; EXECUTING: number; EXECUTING_WITH_WAITPOINTS: number; + SUSPENDED: number; }; export type TriggerParams = {