Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
"type": "node-terminal",
"request": "launch",
"name": "Debug RunEngine tests",
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyQueue.test.ts -t 'Should manage token bucket and queue correctly'",
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts -t 'Should provide metrics about queues via getQueueMetrics'",
"cwd": "${workspaceFolder}/internal-packages/run-engine",
"sourceMaps": true
},
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ export class RunEngine {
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
batchSize: options.releaseConcurrency?.batchSize ?? 10,
executor: async (descriptor, snapshotId) => {
await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
return await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
snapshotId
);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ export type ReleaseConcurrencyQueueRetryOptions = {

export type ReleaseConcurrencyQueueOptions<T> = {
redis: RedisOptions;
executor: (releaseQueue: T, releaserId: string) => Promise<void>;
/**
* @returns true if the run was successful, false if the token should be returned to the bucket
*/
executor: (releaseQueue: T, releaserId: string) => Promise<boolean>;
keys: {
fromDescriptor: (releaseQueue: T) => string;
toDescriptor: (releaseQueue: string) => T;
Expand Down Expand Up @@ -119,7 +122,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
String(Date.now())
);

this.logger.debug("Consumed token in attemptToRelease", {
this.logger.info("Consumed token in attemptToRelease", {
releaseQueueDescriptor,
releaserId,
maxTokens,
Expand Down Expand Up @@ -270,7 +273,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
return false;
}

await Promise.all(
await Promise.allSettled(
result.map(([queue, releaserId, metadata]) => {
const itemMetadata = QueueItemMetadata.parse(JSON.parse(metadata));
const releaseQueueDescriptor = this.keys.toDescriptor(queue);
Expand All @@ -283,9 +286,29 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {

async #callExecutor(releaseQueueDescriptor: T, releaserId: string, metadata: QueueItemMetadata) {
try {
this.logger.info("Executing run:", { releaseQueueDescriptor, releaserId });
this.logger.info("Calling executor for release", { releaseQueueDescriptor, releaserId });

const released = await this.options.executor(releaseQueueDescriptor, releaserId);

if (released) {
this.logger.info("Executor released concurrency", { releaseQueueDescriptor, releaserId });
} else {
this.logger.info("Executor did not release concurrency", {
releaseQueueDescriptor,
releaserId,
});

await this.options.executor(releaseQueueDescriptor, releaserId);
// Return the token but don't requeue
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
await this.redis.returnTokenOnly(
this.masterQueuesKey,
this.#bucketKey(releaseQueue),
this.#queueKey(releaseQueue),
this.#metadataKey(releaseQueue),
releaseQueue,
releaserId
);
}
} catch (error) {
this.logger.error("Error executing run:", { error });

Expand Down Expand Up @@ -401,7 +424,9 @@ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)

-- If we have enough tokens, then consume them
if currentTokens >= 1 then
redis.call("SET", bucketKey, currentTokens - 1)
newCurrentTokens = currentTokens - 1

redis.call("SET", bucketKey, newCurrentTokens)
redis.call("ZREM", queueKey, releaserId)

-- Clean up metadata when successfully consuming
Expand All @@ -411,8 +436,8 @@ if currentTokens >= 1 then
local queueLength = redis.call("ZCARD", queueKey)

-- If we still have tokens and items in queue, update available queues
if currentTokens > 0 and queueLength > 0 then
redis.call("ZADD", masterQueuesKey, currentTokens, releaseQueue)
if newCurrentTokens > 0 and queueLength > 0 then
redis.call("ZADD", masterQueuesKey, newCurrentTokens, releaseQueue)
else
redis.call("ZREM", masterQueuesKey, releaseQueue)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ export class ReleaseConcurrencySystem {
);
}

public async executeReleaseConcurrencyForSnapshot(snapshotId: string) {
public async executeReleaseConcurrencyForSnapshot(snapshotId: string): Promise<boolean> {
if (!this.releaseConcurrencyQueue) {
return;
return false;
}

this.$.logger.debug("Executing released concurrency", {
Expand Down Expand Up @@ -136,14 +136,14 @@ export class ReleaseConcurrencySystem {
snapshotId,
});

return;
return false;
}

// - Runlock the run
// - Get latest snapshot
// - If the run is non suspended or going to be, then bail
// - If the run is suspended or going to be, then release the concurrency
await this.$.runLock.lock([snapshot.runId], 5_000, async () => {
return await this.$.runLock.lock([snapshot.runId], 5_000, async () => {
const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId);

const isValidSnapshot =
Expand All @@ -159,7 +159,7 @@ export class ReleaseConcurrencySystem {
snapshot,
});

return;
return false;
}

if (!canReleaseConcurrency(latestSnapshot.executionStatus)) {
Expand All @@ -168,20 +168,21 @@ export class ReleaseConcurrencySystem {
snapshot: latestSnapshot,
});

return;
return false;
}

const metadata = this.#parseMetadata(snapshot.metadata);

if (typeof metadata.releaseConcurrency === "boolean") {
if (metadata.releaseConcurrency) {
return await this.$.runQueue.releaseAllConcurrency(
snapshot.organizationId,
snapshot.runId
);
await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);

return true;
}

return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);

return true;
}

// Get the locked queue
Expand All @@ -198,10 +199,14 @@ export class ReleaseConcurrencySystem {
(typeof taskQueue.concurrencyLimit === "undefined" ||
taskQueue.releaseConcurrencyOnWaitpoint)
) {
return await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);
await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);

return true;
}

return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);

return true;
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { redisTest, StartedRedisContainer } from "@internal/testcontainers";
import { ReleaseConcurrencyTokenBucketQueue } from "../releaseConcurrencyTokenBucketQueue.js";
import { setTimeout } from "node:timers/promises";
import { createRedisClient, Redis } from "@internal/redis";

type TestQueueDescriptor = {
name: string;
Expand All @@ -20,6 +21,7 @@ function createReleaseConcurrencyQueue(
},
executor: async (releaseQueue, runId) => {
executedRuns.push({ releaseQueue: releaseQueue.name, runId });
return true;
},
maxTokens: async (_) => maxTokens,
keys: {
Expand Down Expand Up @@ -221,6 +223,7 @@ describe("ReleaseConcurrencyQueue", () => {
throw new Error("Executor failed");
}
executedRuns.push({ releaseQueue, runId });
return true;
},
maxTokens: async (_) => 2,
keys: {
Expand Down Expand Up @@ -299,6 +302,7 @@ describe("ReleaseConcurrencyQueue", () => {
// Add small delay to simulate work
await setTimeout(10);
executedRuns.push({ releaseQueue, runId });
return true;
},
keys: {
fromDescriptor: (descriptor) => descriptor,
Expand Down Expand Up @@ -419,6 +423,7 @@ describe("ReleaseConcurrencyQueue", () => {
},
executor: async (releaseQueue, runId) => {
secondRunAttempted = true;
return true;
},
keys: {
fromDescriptor: (descriptor) => descriptor,
Expand Down Expand Up @@ -516,6 +521,63 @@ describe("ReleaseConcurrencyQueue", () => {
}
});

redisTest(
"Should return token but not requeue when executor returns false",
async ({ redisContainer }) => {
const executedRuns: { releaseQueue: string; runId: string }[] = [];
const runResults: Record<string, boolean> = {
run1: true, // This will succeed
run2: false, // This will return false, returning the token without requeuing
run3: true, // This should execute immediately when run2's token is returned
};

const queue = new ReleaseConcurrencyTokenBucketQueue<string>({
redis: {
keyPrefix: "release-queue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
executor: async (releaseQueue, runId) => {
const success = runResults[runId];

executedRuns.push({ releaseQueue, runId });

return success;
},
keys: {
fromDescriptor: (descriptor) => descriptor,
toDescriptor: (name) => name,
},
maxTokens: async (_) => 2, // Only 2 tokens available at a time
pollInterval: 100,
});

try {
// First run should execute and succeed
await queue.attemptToRelease("test-queue", "run1");
expect(executedRuns).toHaveLength(1);
expect(executedRuns[0]).toEqual({ releaseQueue: "test-queue", runId: "run1" });

// Second run should execute but return false, returning the token
await queue.attemptToRelease("test-queue", "run2");
expect(executedRuns).toHaveLength(2);
expect(executedRuns[1]).toEqual({ releaseQueue: "test-queue", runId: "run2" });

// Third run should be able to execute immediately since run2 returned its token
await queue.attemptToRelease("test-queue", "run3");

expect(executedRuns).toHaveLength(3);
expect(executedRuns[2]).toEqual({ releaseQueue: "test-queue", runId: "run3" });

// Verify that run2 was not retried (it should have been skipped)
const run2Attempts = executedRuns.filter((r) => r.runId === "run2");
expect(run2Attempts).toHaveLength(1); // Only executed once, not retried
} finally {
await queue.quit();
}
}
);

redisTest("Should implement exponential backoff between retries", async ({ redisContainer }) => {
const executionTimes: number[] = [];
let startTime: number;
Expand Down
Loading