Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),

RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"),
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES: z.coerce.number().int().default(3),
RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT: z.coerce.number().int().default(1),
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ function createRunEngine() {
},
releaseConcurrency: {
disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0",
disableConsumers: env.RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS === "1",
maxTokensRatio: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO,
maxRetries: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES,
consumersCount: env.RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export type ReleaseConcurrencyQueueOptions<T> = {
pollInterval?: number;
batchSize?: number;
retry?: ReleaseConcurrencyQueueRetryOptions;
disableConsumers?: boolean;
};

const QueueItemMetadata = z.object({
Expand Down Expand Up @@ -74,7 +75,10 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
};

this.#registerCommands();
this.#startConsumers();

if (!options.disableConsumers) {
this.#startConsumers();
}
}

public async quit() {
Expand All @@ -93,6 +97,12 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor);

if (maxTokens === 0) {
this.logger.debug("No tokens available, skipping release", {
releaseQueueDescriptor,
releaserId,
maxTokens,
});

return;
}

Expand All @@ -109,6 +119,14 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
String(Date.now())
);

this.logger.debug("Consumed token in attemptToRelease", {
releaseQueueDescriptor,
releaserId,
maxTokens,
result,
releaseQueue,
});

if (!!result) {
await this.#callExecutor(releaseQueueDescriptor, releaserId, {
retryCount: 0,
Expand All @@ -119,6 +137,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
releaseQueueDescriptor,
releaserId,
maxTokens,
releaseQueue,
});
}
}
Expand All @@ -130,13 +149,19 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
*/
public async consumeToken(releaseQueueDescriptor: T, releaserId: string) {
const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor);
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);

if (maxTokens === 0) {
this.logger.debug("No tokens available, skipping consume", {
releaseQueueDescriptor,
releaserId,
maxTokens,
releaseQueue,
});

return;
}

const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);

await this.redis.consumeToken(
this.masterQueuesKey,
this.#bucketKey(releaseQueue),
Expand All @@ -147,6 +172,13 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
String(maxTokens),
String(Date.now())
);

this.logger.debug("Consumed token in consumeToken", {
releaseQueueDescriptor,
releaserId,
maxTokens,
releaseQueue,
});
}

/**
Expand All @@ -157,6 +189,11 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
public async returnToken(releaseQueueDescriptor: T, releaserId: string) {
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);

this.logger.debug("Returning token in returnToken", {
releaseQueueDescriptor,
releaserId,
});

await this.redis.returnTokenOnly(
this.masterQueuesKey,
this.#bucketKey(releaseQueue),
Expand All @@ -165,6 +202,12 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
releaseQueue,
releaserId
);

this.logger.debug("Returned token in returnToken", {
releaseQueueDescriptor,
releaserId,
releaseQueue,
});
}

/**
Expand All @@ -177,10 +220,20 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);

if (amount < 0) {
this.logger.debug("Cannot refill with negative tokens", {
releaseQueueDescriptor,
amount,
});

throw new Error("Cannot refill with negative tokens");
}

if (amount === 0) {
this.logger.debug("Cannot refill with 0 tokens", {
releaseQueueDescriptor,
amount,
});

return [];
}

Expand All @@ -192,6 +245,13 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
String(amount),
String(maxTokens)
);

this.logger.debug("Refilled tokens in refillTokens", {
releaseQueueDescriptor,
releaseQueue,
amount,
maxTokens,
});
}

/**
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export type RunEngineOptions = {
maxDelay?: number; // Defaults to 60000
factor?: number; // Defaults to 2
};
disableConsumers?: boolean;
};
};

Expand Down
Loading