Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal-packages/run-engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@
"build": "pnpm run clean && tsc -p tsconfig.build.json",
"dev": "tsc --watch -p tsconfig.build.json"
}
}
}
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ export class RunEngine {
executionSnapshotSystem: this.executionSnapshotSystem,
runAttemptSystem: this.runAttemptSystem,
machines: this.options.machines,
releaseConcurrencySystem: this.releaseConcurrencySystem,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,59 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
});
}

public async getReleaseQueueMetrics(releaseQueueDescriptor: T) {
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
const currentTokensRaw = await this.redis.get(this.#bucketKey(releaseQueue));
const queueLength = await this.redis.zcard(this.#queueKey(releaseQueue));

const currentTokens = currentTokensRaw ? Number(currentTokensRaw) : undefined;

return { currentTokens, queueLength };
}

/**
* Refill a token only if the releaserId is not in the release queue.
* Returns true if the token was refilled, false if the releaserId was found in the queue.
*/
public async refillTokenIfNotInQueue(
releaseQueueDescriptor: T,
releaserId: string
): Promise<boolean> {
const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor);
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);

if (maxTokens === 0) {
this.logger.debug("No tokens available, skipping refill", {
releaseQueueDescriptor,
releaserId,
maxTokens,
releaseQueue,
});

return false;
}

const result = await this.redis.refillTokenIfNotInQueue(
this.masterQueuesKey,
this.#bucketKey(releaseQueue),
this.#queueKey(releaseQueue),
this.#metadataKey(releaseQueue),
releaseQueue,
releaserId,
String(maxTokens)
);

this.logger.debug("Attempted to refill token if not in queue", {
releaseQueueDescriptor,
releaserId,
maxTokens,
releaseQueue,
result,
});

return result === "true";
}

/**
* Get the next queue that has available capacity and process one item from it
* Returns true if an item was processed, false if no items were available
Expand Down Expand Up @@ -783,6 +836,51 @@ end
return true
`,
});

this.redis.defineCommand("refillTokenIfNotInQueue", {
numberOfKeys: 4,
lua: `
local masterQueuesKey = KEYS[1]
local bucketKey = KEYS[2]
local queueKey = KEYS[3]
local metadataKey = KEYS[4]

local releaseQueue = ARGV[1]
local releaserId = ARGV[2]
local maxTokens = tonumber(ARGV[3])

-- Check if the releaserId is in the queue
local score = redis.call("ZSCORE", queueKey, releaserId)
if score then
-- Item is in queue, don't refill token
return redis.status_reply("false")
end

-- Return the token to the bucket
local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
local remainingTokens = currentTokens + 1

-- Don't exceed maxTokens
if remainingTokens > maxTokens then
remainingTokens = maxTokens
end

redis.call("SET", bucketKey, remainingTokens)

-- Clean up any metadata just in case
redis.call("HDEL", metadataKey, releaserId)

-- Update the master queue based on remaining queue length
local queueLength = redis.call("ZCARD", queueKey)
if queueLength > 0 then
redis.call("ZADD", masterQueuesKey, remainingTokens, releaseQueue)
else
redis.call("ZREM", masterQueuesKey, releaseQueue)
end

return redis.status_reply("true")
`,
});
}
}

Expand Down Expand Up @@ -839,6 +937,17 @@ declare module "@internal/redis" {
releaserId: string,
callback?: Callback<void>
): Result<void, Context>;

refillTokenIfNotInQueue(
masterQueuesKey: string,
bucketKey: string,
queueKey: string,
metadataKey: string,
releaseQueue: string,
releaserId: string,
maxTokens: string,
callback?: Callback<string>
): Result<string, Context>;
}
}

Expand Down
10 changes: 10 additions & 0 deletions internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,27 @@ import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./execution
import { RunAttemptSystem } from "./runAttemptSystem.js";
import { SystemResources } from "./systems.js";
import { sendNotificationToWorker } from "../eventBus.js";
import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js";

export type DequeueSystemOptions = {
resources: SystemResources;
machines: RunEngineOptions["machines"];
executionSnapshotSystem: ExecutionSnapshotSystem;
runAttemptSystem: RunAttemptSystem;
releaseConcurrencySystem: ReleaseConcurrencySystem;
};

export class DequeueSystem {
private readonly $: SystemResources;
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
private readonly runAttemptSystem: RunAttemptSystem;
private readonly releaseConcurrencySystem: ReleaseConcurrencySystem;

constructor(private readonly options: DequeueSystemOptions) {
this.$ = options.resources;
this.executionSnapshotSystem = options.executionSnapshotSystem;
this.runAttemptSystem = options.runAttemptSystem;
this.releaseConcurrencySystem = options.releaseConcurrencySystem;
}

/**
Expand Down Expand Up @@ -158,6 +162,12 @@ export class DequeueSystem {
}
);

if (snapshot.previousSnapshotId) {
await this.releaseConcurrencySystem.refillTokensForSnapshot(
snapshot.previousSnapshotId
);
}

await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,52 @@ export class ReleaseConcurrencySystem {
await this.releaseConcurrencyQueue.quit();
}

public async refillTokensForSnapshot(snapshotId: string | undefined): Promise<void>;
public async refillTokensForSnapshot(snapshot: TaskRunExecutionSnapshot): Promise<void>;
public async refillTokensForSnapshot(
snapshotOrId: TaskRunExecutionSnapshot | string | undefined
) {
if (!this.releaseConcurrencyQueue) {
return;
}

if (typeof snapshotOrId === "undefined") {
return;
}

const snapshot =
typeof snapshotOrId === "string"
? await this.$.prisma.taskRunExecutionSnapshot.findFirst({
where: { id: snapshotOrId },
})
: snapshotOrId;

if (!snapshot) {
this.$.logger.error("Snapshot not found", {
snapshotId: snapshotOrId,
});

return;
}

if (snapshot.executionStatus !== "EXECUTING_WITH_WAITPOINTS") {
this.$.logger.debug("Snapshot is not in a valid state to refill tokens", {
snapshot,
});

return;
}

await this.releaseConcurrencyQueue.refillTokenIfNotInQueue(
{
orgId: snapshot.organizationId,
projectId: snapshot.projectId,
envId: snapshot.environmentId,
},
snapshot.id
);
}

public async checkpointCreatedOnEnvironment(environment: RuntimeEnvironment) {
if (!this.releaseConcurrencyQueue) {
return;
Expand All @@ -86,11 +132,19 @@ export class ReleaseConcurrencySystem {

public async releaseConcurrencyForSnapshot(snapshot: TaskRunExecutionSnapshot) {
if (!this.releaseConcurrencyQueue) {
this.$.logger.debug("Release concurrency queue not enabled, skipping release", {
snapshotId: snapshot.id,
});

return;
}

// Go ahead and release concurrency immediately if the run is in a development environment
if (snapshot.environmentType === "DEVELOPMENT") {
this.$.logger.debug("Immediate release of concurrency for development environment", {
snapshotId: snapshot.id,
});

return await this.executeReleaseConcurrencyForSnapshot(snapshot.id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,11 @@ export class WaitpointSystem {
await this.$.runLock.lock([runId], 5000, async () => {
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);

this.$.logger.debug("continueRunIfUnblocked", {
runId,
snapshot,
});

//run is still executing, send a message to the worker
if (isExecuting(snapshot.executionStatus)) {
const result = await this.$.runQueue.reacquireConcurrency(
Expand Down Expand Up @@ -543,6 +548,8 @@ export class WaitpointSystem {
}
);

await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);

await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
Expand Down
Loading
Loading